Worker API Reference
Complete reference for the ctx object available in worker code.
Code Structure
def setup(ctx):
"""Called once when the worker starts."""
pass
def tick(ctx):
"""Called every tick_interval seconds. Required."""
pass
def on_error(ctx, error):
"""Optional. Called when tick() raises an exception."""
passctx.state -- Persistent Storage
Redis-backed key-value store. Data persists across restarts and redeployments. Values are JSON-serialized.
Methods
| Method | Signature | Description |
|---|---|---|
get | get(key: str, default=None) -> Any | Get value by key. Returns default if not found. |
set | set(key: str, value: Any) -> None | Set a value. Must be JSON-serializable. |
delete | delete(key: str) -> None | Delete a key. |
keys | keys() -> list[str] | List all state keys for this worker. |
get_all | get_all() -> dict | Get all key-value pairs. |
Example
def setup(ctx):
ctx.state.set("prices", [])
ctx.state.set("config", {"threshold": 5.0, "symbol": "BTCUSDT"})
def tick(ctx):
config = ctx.state.get("config", {})
prices = ctx.state.get("prices", [])
# Append new price
prices.append({"time": "2024-01-01", "price": 42000})
ctx.state.set("prices", prices[-100:]) # keep last 100
# List all keys
all_keys = ctx.state.keys() # ["prices", "config"]ctx.log -- Logging
Batched logging system. Logs are flushed automatically after each tick. Visible in the worker's Log tab.
Methods
| Method | Signature | Description |
|---|---|---|
info | info(msg: str) | Info level message |
warn | warn(msg: str) | Warning level message |
error | error(msg: str) | Error level message |
debug | debug(msg: str) | Debug level message |
flush | flush() | Force flush logs (usually automatic) |
Example
def tick(ctx):
ctx.log.info("Starting tick")
ctx.log.debug(f"State keys: {ctx.state.keys()}")
try:
result = do_something()
ctx.log.info(f"Success: {result}")
except Exception as e:
ctx.log.error(f"Failed: {e}")
ctx.log.warn("Will retry next tick")TIP
Logs are batched (up to 20 entries) and sent in a single HTTP call for performance. Messages are truncated at 4000 characters.
ctx.monitor -- Real-time Dashboard
Send structured widget data to connected Monitor block(s). Use this to build live dashboards.
Requires: Monitor block connected via edge.
Methods
| Method | Signature | Description |
|---|---|---|
render | render(widgets: list[dict]) | Replace the entire display with new widgets |
patch | patch(updates: list[dict]) | Update specific widgets by id |
log | log(line: str, level: str = "info") | Append a line to the monitor log |
clear | clear() | Clear the display |
Widget Builders
Use these static methods to build widget dicts:
| Builder | Signature | Description |
|---|---|---|
metric | metric(label, value, color="blue", icon=None, id=None, subtitle=None) | Metric card |
status | status(label, state, detail="", id=None) | Status indicator. state: "ok", "warning", "error" |
table | table(headers: list, rows: list[list], id=None) | Data table |
text | text(content, style="normal", id=None) | Text block. style: "normal" or "monospace" |
progress | progress(label, value: float, color="gold", id=None) | Progress bar (0.0 to 1.0) |
list_widget | list_widget(items: list, ordered=False, id=None) | Bullet or numbered list |
log_widget | log_widget(lines: list, max_lines=50, id=None) | Scrolling log widget |
separator | separator() | Visual divider line |
Colors
Available colors for metric and progress: "blue", "green", "red", "gold", "orange", "purple", "gray".
Example
def tick(ctx):
price = 42150.50
change = +2.35
ctx.monitor.render([
ctx.monitor.metric("BTC Price", f"${price:,.2f}",
color="green" if change >= 0 else "red",
id="btc_price"),
ctx.monitor.metric("24h Change", f"{change:+.2f}%",
color="green" if change >= 0 else "red"),
ctx.monitor.separator(),
ctx.monitor.status("Exchange", "ok", "Bybit connected"),
ctx.monitor.status("Bot", "warning", "Waiting for signal"),
ctx.monitor.separator(),
ctx.monitor.table(
headers=["Symbol", "Side", "Size", "PnL"],
rows=[
["BTCUSDT", "Long", "0.5", "+$120"],
["ETHUSDT", "Short", "2.0", "-$45"],
]
),
ctx.monitor.progress("Daily Target", 0.65, color="gold"),
])Partial Updates with patch()
Use patch() to update specific widgets without re-rendering everything:
# First render with IDs
ctx.monitor.render([
ctx.monitor.metric("Price", "$42,000", id="price"),
ctx.monitor.status("Bot", "ok", "Running", id="bot_status"),
])
# Later, update only the price
ctx.monitor.patch([
{"id": "price", "value": "$42,150"},
])ctx.http -- HTTP Requests
Make HTTP requests to external APIs. All requests are proxied through the backend for security.
Methods
| Method | Signature | Returns |
|---|---|---|
get | get(url: str, headers: dict = None) -> dict | {"status": int, "body": str, "url": str, "content_type": str} |
post | post(url: str, data: dict = None, headers: dict = None) -> dict | {"status": int, "body": str, "url": str} |
Example
def tick(ctx):
# Fetch JSON API
resp = ctx.http.get("https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd")
body = resp.get("body", {})
# body might be a string — parse it
if isinstance(body, str):
import json
body = json.loads(body)
price = body.get("bitcoin", {}).get("usd", 0)
ctx.log.info(f"BTC price: ${price}")
# POST request with custom headers
resp = ctx.http.post(
"https://hooks.slack.com/services/xxx",
data={"text": f"BTC is ${price}"},
headers={"Content-Type": "application/json"}
)ctx.bybit -- Bybit Exchange
Trade on Bybit. Supports spot and derivatives.
Requires: Bybit block connected via edge, with API keys configured.
Methods
| Method | Signature | Description |
|---|---|---|
get_tickers | get_tickers(symbol: str = "BTCUSDT") -> dict | Market ticker data (last price, bid, ask, volume) |
get_balance | get_balance(coin: str = None) -> dict | Account balance |
get_positions | get_positions(symbol: str = None) -> dict | Open positions |
get_orders | get_orders(symbol: str = None) -> dict | Active orders |
place_order | place_order(symbol, side, qty, order_type="Market", price=None) -> dict | Place an order |
cancel_order | cancel_order(symbol: str, order_id: str) -> dict | Cancel an order |
Parameters
place_order:
symbol: Trading pair, e.g."BTCUSDT"side:"Buy"or"Sell"qty: Order quantity as string, e.g."0.001"order_type:"Market"or"Limit"price: Required for Limit orders, string e.g."42000"
Example
def tick(ctx):
# Check positions
positions = ctx.bybit.get_positions("BTCUSDT")
ctx.log.info(f"Positions: {positions}")
# Get current price
ticker = ctx.bybit.get_tickers("BTCUSDT")
last_price = ticker.get("last_price", "0")
# Place a market buy
result = ctx.bybit.place_order(
symbol="BTCUSDT",
side="Buy",
qty="0.001",
order_type="Market"
)
ctx.log.info(f"Order result: {result}")Error Handling
If no Bybit block is connected, calling any method raises RuntimeError:
RuntimeError: No trading block connected. Connect a Bybit block to this worker.ctx.ig -- IG Markets
Trade CFDs, forex, indices, and commodities on IG Markets.
Requires: IG Markets block connected via edge, with API keys configured.
Methods
| Method | Signature | Description |
|---|---|---|
get_accounts | get_accounts() -> dict | Account info (balance, P&L, margin) |
get_positions | get_positions() -> dict | All open positions |
get_orders | get_orders() -> dict | All working orders |
search_markets | search_markets(search_term: str) -> dict | Search instruments by name |
get_market_info | get_market_info(epic: str) -> dict | Detailed info for an instrument |
open_position | open_position(epic, direction, size, ...) -> dict | Open a new position |
close_position | close_position(deal_id, direction, size, ...) -> dict | Close a position |
update_position | update_position(deal_id, stop_level=None, limit_level=None) -> dict | Update stop/limit |
confirm_deal | confirm_deal(deal_reference: str) -> dict | Get deal confirmation |
open_position parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
epic | str | required | Instrument identifier, e.g. "CS.D.EURUSD.TODAY.IP" |
direction | str | required | "BUY" or "SELL" |
size | float | required | Position size |
order_type | str | "MARKET" | "MARKET" or "LIMIT" |
currency_code | str | "USD" | Account currency |
expiry | str | "DFB" | Expiry type |
force_open | bool | True | Force open even if opposing position exists |
guaranteed_stop | bool | False | Use guaranteed stop loss |
stop_distance | float | None | Stop loss distance in points |
limit_distance | float | None | Take profit distance in points |
stop_level | float | None | Absolute stop loss level |
limit_level | float | None | Absolute take profit level |
Example
def tick(ctx):
# Check account
accounts = ctx.ig.get_accounts()
ctx.log.info(f"Account balance: {accounts}")
# Search for an instrument
results = ctx.ig.search_markets("Gold")
# Get positions
positions = ctx.ig.get_positions()
for pos in positions.get("positions", []):
deal_id = pos["position"]["dealId"]
pnl = pos["position"].get("profit", 0)
ctx.log.info(f"Position {deal_id}: PnL={pnl}")
# Open a position with stop loss
result = ctx.ig.open_position(
epic="CS.D.EURUSD.TODAY.IP",
direction="BUY",
size=1.0,
stop_distance=50.0,
limit_distance=100.0
)
ctx.log.info(f"Opened: {result}")ctx.telegram -- Telegram Messaging
Send and receive messages via Telegram bot.
Requires: Telegram block connected via edge, with bot token and chat ID configured.
Methods
| Method | Signature | Description |
|---|---|---|
send | send(text: str, parse_mode: str = "HTML") -> dict | Send a message |
get_messages | get_messages(limit: int = 10) -> dict | Get recent messages |
get_chat_info | get_chat_info() -> dict | Get chat details (title, type) |
parse_mode
"HTML"-- supports<b>bold</b>,<i>italic</i>,<code>code</code>,<pre>block</pre>,<a href="url">link</a>"Markdown"-- supports*bold*,_italic_,`code`
Example
def tick(ctx):
price = 42000
change = +2.5
# Send formatted alert
ctx.telegram.send(
f"<b>BTC Update</b>\n"
f"Price: <code>${price:,}</code>\n"
f"Change: {change:+.1f}%",
parse_mode="HTML"
)
# Read recent messages
messages = ctx.telegram.get_messages(limit=5)
for msg in messages.get("messages", []):
text = msg.get("text", "")
ctx.log.info(f"Received: {text}")ctx.files -- File Storage
Read and write files in a sandboxed file system (100 MB limit per workspace).
Requires: File Explorer block connected via edge.
Methods
| Method | Signature | Description |
|---|---|---|
list | list(path: str = "/") -> dict | List files and dirs. Returns {"files": [...], "dirs": [...]} |
read | read(path: str) -> dict | Read file (max 50 KB). Returns {"content": "..."} |
write | write(path: str, content: str) -> dict | Write/create file. Returns {"ok": true, "path": "..."} |
delete | delete(path: str) -> dict | Delete file or dir. Returns {"ok": true} |
Example
def tick(ctx):
import json
import datetime
# Save data to file
data = {"price": 42000, "time": str(datetime.datetime.now())}
ctx.files.write("/logs/latest.json", json.dumps(data, indent=2))
# Read it back
result = ctx.files.read("/logs/latest.json")
content = result.get("content", "")
ctx.log.info(f"Saved: {content}")
# List directory
listing = ctx.files.list("/logs/")
ctx.log.info(f"Files: {listing.get('files', [])}")ctx.llm -- LLM Queries
Send prompts to connected AI models (Claude, Grok, OpenAI) and get text responses.
Requires: LLM Agent block (Claude Agent, Grok Agent, or LLM Agent) connected via edge.
Methods
| Method | Signature | Description |
|---|---|---|
ask | ask(message, system="", model="", max_tokens=2048, temperature=0.7) -> str | Send a prompt, get text response |
ask_with_tools | ask_with_tools(message, system="", model="", max_tokens=4096) -> str | Ask with tool access (trading, telegram, etc.) |
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
message | str | required | Your prompt / question |
system | str | "" | System prompt (instructions for the AI) |
model | str | "" | Model override (leave empty for block default) |
max_tokens | int | 2048 / 4096 | Maximum response length |
temperature | float | 0.7 | Creativity (0.0 = deterministic, 1.0 = creative) |
Example
def tick(ctx):
# Simple question
answer = ctx.llm.ask(
"Summarize the current BTC market sentiment in 2 sentences.",
system="You are a cryptocurrency analyst.",
temperature=0.3
)
ctx.log.info(f"AI says: {answer}")
# Use with trading data
positions = ctx.bybit.get_positions("BTCUSDT")
analysis = ctx.llm.ask(
f"Analyze these positions and suggest actions: {positions}",
system="You are a quantitative trading advisor. Be concise."
)
# Send analysis to Telegram
ctx.telegram.send(f"<b>AI Analysis:</b>\n{analysis}", parse_mode="HTML")ctx.connected_blocks -- Connection Info
Read-only dictionary showing which blocks are connected to this worker.
Example
def setup(ctx):
blocks = ctx.connected_blocks
ctx.log.info(f"Connected: {blocks}")
# Example output:
# {
# "trading_node_id": "bybit_1",
# "trading_block_type": "trading.bybit",
# "llm_node_id": "claude_1",
# "llm_block_type": "ai.claude_agent",
# "monitor_node_ids": ["monitor_1"],
# "telegram_node_id": "telegram_1",
# "file_explorer_node_id": "files_1",
# "workspace_id": 42
# }
if not blocks.get("trading_node_id"):
ctx.log.warn("No trading block connected!")Error Handling Pattern
def tick(ctx):
try:
ticker = ctx.bybit.get_tickers("BTCUSDT")
# Check for API errors
if "error" in ticker:
ctx.log.error(f"API error: {ticker['error']}")
return
price = ticker.get("last_price", "0")
ctx.log.info(f"BTC: {price}")
except RuntimeError as e:
# Block not connected
ctx.log.error(f"Missing connection: {e}")
except Exception as e:
# Unexpected error -- will trigger on_error() if defined
raise
def on_error(ctx, error):
ctx.log.error(f"Tick failed: {error}")
ctx.telegram.send(f"Worker error: {error}")WARNING
All API methods return dict results. Always check for an "error" key in the response before using the data.