5.3 Example Consumers
Once there’s a simple API, everything else is commodity.
5.3.1 Python client (for bots / quants)
import requests
import time
API_URL = "http://localhost:8000/opportunities"
def fetch_opportunities():
r = requests.get(API_URL, timeout=2)
r.raise_for_status()
return r.json()
def main():
while True:
opps = fetch_opportunities()
for opp in opps:
if opp["profit_bps"] >= 50:
process_profitable_opportunity(opp)
time.sleep(2)
def process_profitable_opportunity(opp):
# Your own logic: log, alert, execute, etc.
print("Profitable:", opp)
if __name__ == "__main__":
main()5.3.2 JS/TS client (for frontends)
// snapshot client
async function fetchOpportunities() {
const res = await fetch("http://localhost:8000/opportunities");
if (!res.ok) throw new Error("Failed");
return await res.json();
}
// websocket client
function subscribeOpportunities(cb: (opps: any[]) => void) {
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
cb(data);
};
return ws;
}Use this in React/Next.js to drive your own dashboard, alerts, charts, etc.
5.3.3 Persisting to Redis / DB / Kafka
You can plug the engine loop into any storage / queue:
import asyncio, json
import aioredis
from capture_engine import CaptureEngine
async def run():
engine = CaptureEngine()
redis = await aioredis.from_url("redis://localhost")
while True:
opps = await engine.tick()
# store the latest snapshot
await redis.set("capture:opportunities", json.dumps(opps))
# or push each opp to a stream/topic
for opp in opps:
await redis.xadd("capture:stream", {"data": json.dumps(opp)})
await asyncio.sleep(engine.sleep_seconds)
asyncio.run(run())From there you can:
Feed downstream bots.
Build historical databases.
Do analytics and backtesting.
Last updated

