
Suppose you're attempting to run multiple trading bots and want more control. This is where I have ended up.
The main issues I've run into there are some automated minimums, so you may need to play with some values around where I did a lil' if token == "SWAP.USDT"
find that and adjust to you see fit.
place_order.py
import time
import json as jsonlib
import requests
import os
import math
_last_tx_time = 0
_TX_MIN_DELAY = 10 # seconds
def _enforce_tx_delay():
global _last_tx_time
now = time.time()
elapsed = now - _last_tx_time
if elapsed < _TX_MIN_DELAY:
time.sleep(_TX_MIN_DELAY - elapsed)
_last_tx_time = time.time()
def get_hive_instance(posting_key=None, active_key=None, nodes=None):
"""Return a Hive instance for the given keys and nodes."""
from beem import Hive
if nodes is None:
nodes = ["https://api.hive.blog", "https://anyx.io"]
keys = []
if posting_key:
keys.append(posting_key)
if active_key:
keys.append(active_key)
return Hive(node=nodes, keys=keys)
def get_account_instance(account_name, hive_instance):
from beem.account import Account
return Account(account_name, blockchain_instance=hive_instance)
def validate_order_payload(token, quantity, price):
"""Ensure decimals and min order size are correct for token (esp. SWAP.USDT)."""
if token == "SWAP.USDT":
min_qty = 0.01
quantity = max(float(quantity), min_qty)
quantity = round(quantity, 2)
price = round(price, 6)
else:
quantity = round(float(quantity), 8)
price = round(price, 6)
return str(quantity), str(price)
def get_balance(account_name, token):
payload = {
"jsonrpc": "2.0",
"method": "find",
"params": {
"contract": "tokens",
"table": "balances",
"query": {"account": account_name, "symbol": token},
},
"id": 1,
}
r = requests.post("https://api.hive-engine.com/rpc/contracts", json=payload)
if r.status_code == 200:
result = r.json()
if result["result"]:
return float(result["result"][0]["balance"])
print(f"[get_balance] No balance found for {account_name} {token}, returning 0.0. Raw result: {r.text}")
return 0.0
def get_open_orders(account_name, token=None, nodes=None):
"""
Fetch open buy and sell orders for the account by querying buyBook and sellBook tables.
If token is None, fetch all tokens. Adds a 'type' field to each order.
Only prints concise status messages.
Tries multiple nodes if provided.
"""
if nodes is None:
nodes = [
"https://api.hive-engine.com/rpc/contracts",
"https://herpc.dtools.dev",
"https://engine.rishipanthee.com/rpc",
"https://api2.hive-engine.com/rpc/contracts",
]
tables = [
("buyBook", "buy"),
("sellBook", "sell")
]
all_orders = []
for table, order_type in tables:
payload = {
"jsonrpc": "2.0",
"method": "find",
"params": {
"contract": "market",
"table": table,
"query": {"account": account_name},
"limit": 1000
},
"id": 1
}
if token:
payload["params"]["query"]["symbol"] = token
for node in nodes:
try:
print(f"Checking {order_type} orders for {account_name} {token or '[all tokens]'} on {node}...")
r = requests.post(node, json=payload, timeout=10)
if r.status_code == 200:
result = r.json()
orders = result.get("result", [])
for order in orders:
order["type"] = order_type
all_orders.extend(orders)
break # Only use the first working node
except Exception as e:
print(f"Error querying {order_type} orders on {node}: {e}")
return all_orders
def cancel_order(account_name, order_id, verbose=True, nodes=None, posting_key=None, active_key=None):
"""Cancel an order by its orderId. Returns (success, txid, error). Tries multiple nodes if provided."""
from beem import Hive
from beem.transactionbuilder import TransactionBuilder
from beembase.operations import Custom_json
if nodes is None:
nodes = [
"https://api.hive.blog",
"https://anyx.io",
"https://api.openhive.network",
]
payload = {
"contractName": "market",
"contractAction": "cancel",
"contractPayload": {"orderId": str(order_id)},
}
for node in nodes:
try:
hive = Hive(node=node, keys=[k for k in [posting_key, active_key] if k])
tx = TransactionBuilder(blockchain_instance=hive)
op = Custom_json(
required_auths=[account_name],
required_posting_auths=[],
id="ssc-mainnet-hive",
json=jsonlib.dumps(payload),
)
if verbose:
print(f"[CANCEL DEBUG] Using node: {node}")
print(f"[CANCEL DEBUG] required_auths: {[account_name]}, required_posting_auths: [] (should be active only)")
tx.appendOps([op])
tx.appendSigner(account_name, "active")
tx.sign()
broadcast_result = tx.broadcast()
if verbose:
print(f"[CANCEL DEBUG] Full broadcast result: {broadcast_result}")
tx_id = None
if isinstance(broadcast_result, dict):
tx_id = (
broadcast_result.get('id') or
broadcast_result.get('txid') or
broadcast_result.get('transaction_id') or
(broadcast_result.get('result') and (
broadcast_result['result'].get('id') or
broadcast_result['result'].get('txid') or
broadcast_result['result'].get('transaction_id')
))
)
if isinstance(broadcast_result, dict) and broadcast_result.get('error'):
error_msg = broadcast_result['error']
continue # Try next node
return (True, tx_id, None)
except Exception as e:
if verbose:
print(f"[CANCEL ERROR] Exception during cancel on {node}: {e}")
continue # Try next node
return (False, None, 'All nodes failed to broadcast cancel order')
def build_and_send_op(account_name, symbol, price, quantity, order_type, posting_key=None, active_key=None, nodes=None):
from beem.transactionbuilder import TransactionBuilder
from beembase.operations import Custom_json
from beem.instance import set_shared_blockchain_instance
hive = get_hive_instance(posting_key, active_key, nodes)
set_shared_blockchain_instance(hive)
# Validate decimals and min order size
quantity, price = validate_order_payload(symbol, quantity, price)
payload = {
"contractName": "market",
"contractAction": order_type,
"contractPayload": {
"symbol": symbol,
"quantity": quantity,
"price": price,
},
}
tx = TransactionBuilder(blockchain_instance=hive)
op = Custom_json(
required_auths=[account_name],
required_posting_auths=[],
id="ssc-mainnet-hive",
json=jsonlib.dumps(payload),
)
tx.appendOps([op])
tx.appendSigner(account_name, "active")
try:
tx.sign()
print("[DEBUG] Transaction signed successfully.")
broadcast_result = tx.broadcast()
print(f"[DEBUG] Full broadcast result: {broadcast_result}")
tx_id = None
if isinstance(broadcast_result, dict):
tx_id = (
broadcast_result.get('id') or
broadcast_result.get('txid') or
broadcast_result.get('transaction_id') or
(broadcast_result.get('result') and (
broadcast_result['result'].get('id') or
broadcast_result['result'].get('txid') or
broadcast_result['result'].get('transaction_id')
))
)
print(f"Broadcasted {order_type.upper()} order for {symbol} at {price}. TXID: {tx_id if tx_id else '[not found]'}")
return tx_id
except Exception as e:
print(f"[ERROR] Exception during sign/broadcast: {e}")
import traceback
traceback.print_exc()
return None
def place_order(account_name, token, price, quantity, order_type="buy", posting_key=None, active_key=None, nodes=None):
_enforce_tx_delay()
"""Generic, safe order placement for any account/keys/nodes."""
# Use env vars as fallback if not provided
posting_key = posting_key or os.environ.get("HIVE_POSTING_KEY")
active_key = active_key or os.environ.get("HIVE_ACTIVE_KEY")
nodes = nodes or ["https://api.hive.blog", "https://anyx.io"]
print(f"Placing {order_type.upper()} order for {quantity} {token} at {price}...")
# Use SWAP.HIVE as the base token for all buy orders
token_used = "SWAP.HIVE" if order_type == "buy" else token
available = get_balance(account_name, token_used)
print(f"[INFO] {token_used} balance before {order_type}: {available}")
if available < quantity:
print(f"[WARN] Not enough balance for {token_used}. Adjusting order.")
quantity = max(available * 0.95, 0.00001)
if quantity <= 0:
print(f"[ERROR] Order skipped: quantity too small or no balance.")
return False
try:
tx_id = build_and_send_op(account_name, token, price, quantity, order_type, posting_key, active_key, nodes)
if tx_id:
print(f"Order successful. TXID: {tx_id}")
else:
print(f"Order broadcasted, but TXID not available.")
return True
except Exception as e:
print(f"Order failed: {e}")
import traceback
traceback.print_exc()
return False
# Default gas settings (can be changed in one place for all bots)
DEFAULT_GAS_TOKEN = "SWAP.MATIC"
DEFAULT_GAS_AMOUNT = 0.01
DEFAULT_GAS_PRICE = 1.0 # Set a reasonable default, can be overridden
def buy_gas(account_name, gas_token=None, gas_amount=None, gas_price=None, posting_key=None, active_key=None, nodes=None):
_enforce_tx_delay()
"""Generic function to buy gas (any token) for the account. Uses defaults if not specified."""
token = gas_token if gas_token is not None else DEFAULT_GAS_TOKEN
amount = gas_amount if gas_amount is not None else DEFAULT_GAS_AMOUNT
price = gas_price if gas_price is not None else DEFAULT_GAS_PRICE
print(f"[GAS] Placing gas buy order: {amount} {token} at {price} for {account_name} (token={token}, amount={amount}, price={price})")
return place_order(
account_name,
token,
price,
amount,
order_type="buy",
posting_key=posting_key,
active_key=active_key,
nodes=nodes,
)
PEAKECOIN_GAS_TOKEN = "PEK"
PEAKECOIN_GAS_AMOUNT = 1
PEAKECOIN_GAS_PRICE = 0.000001
PEAKECOIN_GAS_OFFSET = 62 # e.g. 1m2s offset from start
PEAKECOIN_GAS_INTERVAL = 3600 # every hour
def buy_peakecoin_gas(account_name, posting_key=None, active_key=None, nodes=None):
_enforce_tx_delay()
"""Place a PEK buy order at 0.000001 HIVE for 1 PEK, offset from other gas buys."""
print(f"[PEK GAS] Placing PEK gas buy order: {PEAKECOIN_GAS_AMOUNT} {PEAKECOIN_GAS_TOKEN} at {PEAKECOIN_GAS_PRICE} for {account_name}")
return place_order(
account_name,
PEAKECOIN_GAS_TOKEN,
PEAKECOIN_GAS_PRICE,
PEAKECOIN_GAS_AMOUNT,
order_type="buy",
posting_key=posting_key,
active_key=active_key,
nodes=nodes,
)
def next_peakecoin_gas_time(start_time, interval=PEAKECOIN_GAS_INTERVAL, offset=PEAKECOIN_GAS_OFFSET):
"""Return the next PEK gas buy time (epoch seconds), offset by 'offset' seconds from start, then every 'interval' seconds after."""
now = time.time()
if now < start_time + offset:
return start_time + offset
cycles = math.ceil((now - (start_time + offset)) / interval)
return start_time + offset + cycles * interval
def should_buy_peakecoin_gas(last_gas_time, interval=PEAKECOIN_GAS_INTERVAL, offset=PEAKECOIN_GAS_OFFSET):
"""Return True if it's time to buy PEK gas (offset from start by 'offset', then every 'interval')."""
now = time.time()
if last_gas_time == 0:
return now >= (now // interval) * interval + offset
return now - last_gas_time >= interval
def next_gas_time(start_time, interval=3600, offset=20):
"""Return the next gas buy time (epoch seconds), offset by 'offset' seconds from start, then every 'interval' seconds after."""
now = time.time()
if now < start_time + offset:
return start_time + offset
cycles = math.ceil((now - (start_time + offset)) / interval)
return start_time + offset + cycles * interval
def should_buy_gas(last_gas_time, interval=3600, offset=20):
"""Return True if it's time to buy gas (offset from start by 'offset', then every 'interval')."""
now = time.time()
if last_gas_time == 0:
return now >= (now // interval) * interval + offset
return now - last_gas_time >= interval
__all__ = [
'place_order', 'get_open_orders', 'cancel_order', 'get_balance',
'buy_gas', 'should_buy_gas', 'buy_peakecoin_gas', 'should_buy_peakecoin_gas',
]