PeakeCoin Bot - Universal place_order

in Synergy Builders2 days ago

Suppose you're attempting to run multiple trading bots and want more control. This is where I have ended up.

The main issues I've run into there are some automated minimums, so you may need to play with some values around where I did a lil' if token == "SWAP.USDT" find that and adjust to you see fit.

place_order.py



import time
import json as jsonlib
import requests
import os
import math

_last_tx_time = 0
_TX_MIN_DELAY = 10  # seconds

def _enforce_tx_delay():
    global _last_tx_time
    now = time.time()
    elapsed = now - _last_tx_time
    if elapsed < _TX_MIN_DELAY:
        time.sleep(_TX_MIN_DELAY - elapsed)
    _last_tx_time = time.time()

def get_hive_instance(posting_key=None, active_key=None, nodes=None):
    """Return a Hive instance for the given keys and nodes."""
    from beem import Hive
    if nodes is None:
        nodes = ["https://api.hive.blog", "https://anyx.io"]
    keys = []
    if posting_key:
        keys.append(posting_key)
    if active_key:
        keys.append(active_key)
    return Hive(node=nodes, keys=keys)


def get_account_instance(account_name, hive_instance):
    from beem.account import Account
    return Account(account_name, blockchain_instance=hive_instance)


def validate_order_payload(token, quantity, price):
    """Ensure decimals and min order size are correct for token (esp. SWAP.USDT)."""
    if token == "SWAP.USDT":
        min_qty = 0.01
        quantity = max(float(quantity), min_qty)
        quantity = round(quantity, 2)
        price = round(price, 6)
    else:
        quantity = round(float(quantity), 8)
        price = round(price, 6)
    return str(quantity), str(price)


def get_balance(account_name, token):
    payload = {
        "jsonrpc": "2.0",
        "method": "find",
        "params": {
            "contract": "tokens",
            "table": "balances",
            "query": {"account": account_name, "symbol": token},
        },
        "id": 1,
    }
    r = requests.post("https://api.hive-engine.com/rpc/contracts", json=payload)
    if r.status_code == 200:
        result = r.json()
        if result["result"]:
            return float(result["result"][0]["balance"])
    print(f"[get_balance] No balance found for {account_name} {token}, returning 0.0. Raw result: {r.text}")
    return 0.0


def get_open_orders(account_name, token=None, nodes=None):
    """
    Fetch open buy and sell orders for the account by querying buyBook and sellBook tables.
    If token is None, fetch all tokens. Adds a 'type' field to each order.
    Only prints concise status messages.
    Tries multiple nodes if provided.
    """
    if nodes is None:
        nodes = [
            "https://api.hive-engine.com/rpc/contracts",
            "https://herpc.dtools.dev",
            "https://engine.rishipanthee.com/rpc",
            "https://api2.hive-engine.com/rpc/contracts",
        ]
    tables = [
        ("buyBook", "buy"),
        ("sellBook", "sell")
    ]
    all_orders = []
    for table, order_type in tables:
        payload = {
            "jsonrpc": "2.0",
            "method": "find",
            "params": {
                "contract": "market",
                "table": table,
                "query": {"account": account_name},
                "limit": 1000
            },
            "id": 1
        }
        if token:
            payload["params"]["query"]["symbol"] = token
        for node in nodes:
            try:
                print(f"Checking {order_type} orders for {account_name} {token or '[all tokens]'} on {node}...")
                r = requests.post(node, json=payload, timeout=10)
                if r.status_code == 200:
                    result = r.json()
                    orders = result.get("result", [])
                    for order in orders:
                        order["type"] = order_type
                    all_orders.extend(orders)
                    break  # Only use the first working node
            except Exception as e:
                print(f"Error querying {order_type} orders on {node}: {e}")
    return all_orders


def cancel_order(account_name, order_id, verbose=True, nodes=None, posting_key=None, active_key=None):
    """Cancel an order by its orderId. Returns (success, txid, error). Tries multiple nodes if provided."""
    from beem import Hive
    from beem.transactionbuilder import TransactionBuilder
    from beembase.operations import Custom_json
    if nodes is None:
        nodes = [
            "https://api.hive.blog",
            "https://anyx.io",
            "https://api.openhive.network",
        ]
    payload = {
        "contractName": "market",
        "contractAction": "cancel",
        "contractPayload": {"orderId": str(order_id)},
    }
    for node in nodes:
        try:
            hive = Hive(node=node, keys=[k for k in [posting_key, active_key] if k])
            tx = TransactionBuilder(blockchain_instance=hive)
            op = Custom_json(
                required_auths=[account_name],
                required_posting_auths=[],
                id="ssc-mainnet-hive",
                json=jsonlib.dumps(payload),
            )
            if verbose:
                print(f"[CANCEL DEBUG] Using node: {node}")
                print(f"[CANCEL DEBUG] required_auths: {[account_name]}, required_posting_auths: [] (should be active only)")
            tx.appendOps([op])
            tx.appendSigner(account_name, "active")
            tx.sign()
            broadcast_result = tx.broadcast()
            if verbose:
                print(f"[CANCEL DEBUG] Full broadcast result: {broadcast_result}")
            tx_id = None
            if isinstance(broadcast_result, dict):
                tx_id = (
                    broadcast_result.get('id') or
                    broadcast_result.get('txid') or
                    broadcast_result.get('transaction_id') or
                    (broadcast_result.get('result') and (
                        broadcast_result['result'].get('id') or
                        broadcast_result['result'].get('txid') or
                        broadcast_result['result'].get('transaction_id')
                    ))
                )
            if isinstance(broadcast_result, dict) and broadcast_result.get('error'):
                error_msg = broadcast_result['error']
                continue  # Try next node
            return (True, tx_id, None)
        except Exception as e:
            if verbose:
                print(f"[CANCEL ERROR] Exception during cancel on {node}: {e}")
            continue  # Try next node
    return (False, None, 'All nodes failed to broadcast cancel order')


def build_and_send_op(account_name, symbol, price, quantity, order_type, posting_key=None, active_key=None, nodes=None):
    from beem.transactionbuilder import TransactionBuilder
    from beembase.operations import Custom_json
    from beem.instance import set_shared_blockchain_instance
    hive = get_hive_instance(posting_key, active_key, nodes)
    set_shared_blockchain_instance(hive)
    # Validate decimals and min order size
    quantity, price = validate_order_payload(symbol, quantity, price)
    payload = {
        "contractName": "market",
        "contractAction": order_type,
        "contractPayload": {
            "symbol": symbol,
            "quantity": quantity,
            "price": price,
        },
    }
    tx = TransactionBuilder(blockchain_instance=hive)
    op = Custom_json(
        required_auths=[account_name],
        required_posting_auths=[],
        id="ssc-mainnet-hive",
        json=jsonlib.dumps(payload),
    )
    tx.appendOps([op])
    tx.appendSigner(account_name, "active")
    try:
        tx.sign()
        print("[DEBUG] Transaction signed successfully.")
        broadcast_result = tx.broadcast()
        print(f"[DEBUG] Full broadcast result: {broadcast_result}")
        tx_id = None
        if isinstance(broadcast_result, dict):
            tx_id = (
                broadcast_result.get('id') or
                broadcast_result.get('txid') or
                broadcast_result.get('transaction_id') or
                (broadcast_result.get('result') and (
                    broadcast_result['result'].get('id') or
                    broadcast_result['result'].get('txid') or
                    broadcast_result['result'].get('transaction_id')
                ))
            )
        print(f"Broadcasted {order_type.upper()} order for {symbol} at {price}. TXID: {tx_id if tx_id else '[not found]'}")
        return tx_id
    except Exception as e:
        print(f"[ERROR] Exception during sign/broadcast: {e}")
        import traceback
        traceback.print_exc()
        return None


def place_order(account_name, token, price, quantity, order_type="buy", posting_key=None, active_key=None, nodes=None):
    _enforce_tx_delay()
    """Generic, safe order placement for any account/keys/nodes."""
    # Use env vars as fallback if not provided
    posting_key = posting_key or os.environ.get("HIVE_POSTING_KEY")
    active_key = active_key or os.environ.get("HIVE_ACTIVE_KEY")
    nodes = nodes or ["https://api.hive.blog", "https://anyx.io"]
    print(f"Placing {order_type.upper()} order for {quantity} {token} at {price}...")
    # Use SWAP.HIVE as the base token for all buy orders
    token_used = "SWAP.HIVE" if order_type == "buy" else token
    available = get_balance(account_name, token_used)
    print(f"[INFO] {token_used} balance before {order_type}: {available}")
    if available < quantity:
        print(f"[WARN] Not enough balance for {token_used}. Adjusting order.")
        quantity = max(available * 0.95, 0.00001)
    if quantity <= 0:
        print(f"[ERROR] Order skipped: quantity too small or no balance.")
        return False
    try:
        tx_id = build_and_send_op(account_name, token, price, quantity, order_type, posting_key, active_key, nodes)
        if tx_id:
            print(f"Order successful. TXID: {tx_id}")
        else:
            print(f"Order broadcasted, but TXID not available.")
        return True
    except Exception as e:
        print(f"Order failed: {e}")
        import traceback
        traceback.print_exc()
        return False


# Default gas settings (can be changed in one place for all bots)
DEFAULT_GAS_TOKEN = "SWAP.MATIC"
DEFAULT_GAS_AMOUNT = 0.01
DEFAULT_GAS_PRICE = 1.0  # Set a reasonable default, can be overridden

def buy_gas(account_name, gas_token=None, gas_amount=None, gas_price=None, posting_key=None, active_key=None, nodes=None):
    _enforce_tx_delay()
    """Generic function to buy gas (any token) for the account. Uses defaults if not specified."""
    token = gas_token if gas_token is not None else DEFAULT_GAS_TOKEN
    amount = gas_amount if gas_amount is not None else DEFAULT_GAS_AMOUNT
    price = gas_price if gas_price is not None else DEFAULT_GAS_PRICE
    print(f"[GAS] Placing gas buy order: {amount} {token} at {price} for {account_name} (token={token}, amount={amount}, price={price})")
    return place_order(
        account_name,
        token,
        price,
        amount,
        order_type="buy",
        posting_key=posting_key,
        active_key=active_key,
        nodes=nodes,
    )


PEAKECOIN_GAS_TOKEN = "PEK"
PEAKECOIN_GAS_AMOUNT = 1
PEAKECOIN_GAS_PRICE = 0.000001
PEAKECOIN_GAS_OFFSET = 62  # e.g. 1m2s offset from start
PEAKECOIN_GAS_INTERVAL = 3600  # every hour

def buy_peakecoin_gas(account_name, posting_key=None, active_key=None, nodes=None):
    _enforce_tx_delay()
    """Place a PEK buy order at 0.000001 HIVE for 1 PEK, offset from other gas buys."""
    print(f"[PEK GAS] Placing PEK gas buy order: {PEAKECOIN_GAS_AMOUNT} {PEAKECOIN_GAS_TOKEN} at {PEAKECOIN_GAS_PRICE} for {account_name}")
    return place_order(
        account_name,
        PEAKECOIN_GAS_TOKEN,
        PEAKECOIN_GAS_PRICE,
        PEAKECOIN_GAS_AMOUNT,
        order_type="buy",
        posting_key=posting_key,
        active_key=active_key,
        nodes=nodes,
    )

def next_peakecoin_gas_time(start_time, interval=PEAKECOIN_GAS_INTERVAL, offset=PEAKECOIN_GAS_OFFSET):
    """Return the next PEK gas buy time (epoch seconds), offset by 'offset' seconds from start, then every 'interval' seconds after."""
    now = time.time()
    if now < start_time + offset:
        return start_time + offset
    cycles = math.ceil((now - (start_time + offset)) / interval)
    return start_time + offset + cycles * interval

def should_buy_peakecoin_gas(last_gas_time, interval=PEAKECOIN_GAS_INTERVAL, offset=PEAKECOIN_GAS_OFFSET):
    """Return True if it's time to buy PEK gas (offset from start by 'offset', then every 'interval')."""
    now = time.time()
    if last_gas_time == 0:
        return now >= (now // interval) * interval + offset
    return now - last_gas_time >= interval

def next_gas_time(start_time, interval=3600, offset=20):
    """Return the next gas buy time (epoch seconds), offset by 'offset' seconds from start, then every 'interval' seconds after."""
    now = time.time()
    if now < start_time + offset:
        return start_time + offset
    cycles = math.ceil((now - (start_time + offset)) / interval)
    return start_time + offset + cycles * interval

def should_buy_gas(last_gas_time, interval=3600, offset=20):
    """Return True if it's time to buy gas (offset from start by 'offset', then every 'interval')."""
    now = time.time()
    if last_gas_time == 0:
        return now >= (now // interval) * interval + offset
    return now - last_gas_time >= interval

__all__ = [
    'place_order', 'get_open_orders', 'cancel_order', 'get_balance',
    'buy_gas', 'should_buy_gas', 'buy_peakecoin_gas', 'should_buy_peakecoin_gas',
]