5.3 Example Consumers

Once there’s a simple API, everything else is commodity.

5.3.1 Python client (for bots / quants)

import requests
import time

API_URL = "http://localhost:8000/opportunities"

def fetch_opportunities():
    r = requests.get(API_URL, timeout=2)
    r.raise_for_status()
    return r.json()

def main():
    while True:
        opps = fetch_opportunities()
        for opp in opps:
            if opp["profit_bps"] >= 50:
                process_profitable_opportunity(opp)
        time.sleep(2)

def process_profitable_opportunity(opp):
    # Your own logic: log, alert, execute, etc.
    print("Profitable:", opp)

if __name__ == "__main__":
    main()

5.3.2 JS/TS client (for frontends)

// snapshot client
async function fetchOpportunities() {
  const res = await fetch("http://localhost:8000/opportunities");
  if (!res.ok) throw new Error("Failed");
  return await res.json();
}

// websocket client
function subscribeOpportunities(cb: (opps: any[]) => void) {
  const ws = new WebSocket("ws://localhost:8000/ws");
  ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    cb(data);
  };
  return ws;
}

Use this in React/Next.js to drive your own dashboard, alerts, charts, etc.

5.3.3 Persisting to Redis / DB / Kafka

You can plug the engine loop into any storage / queue:

import asyncio, json
import aioredis
from capture_engine import CaptureEngine

async def run():
    engine = CaptureEngine()
    redis = await aioredis.from_url("redis://localhost")

    while True:
        opps = await engine.tick()
        # store the latest snapshot
        await redis.set("capture:opportunities", json.dumps(opps))
        # or push each opp to a stream/topic
        for opp in opps:
            await redis.xadd("capture:stream", {"data": json.dumps(opp)})
        await asyncio.sleep(engine.sleep_seconds)

asyncio.run(run())

From there you can:

  • Feed downstream bots.

  • Build historical databases.

  • Do analytics and backtesting.

Last updated