diff --git a/requirements.txt b/requirements.txt index f05d39d..cd6babb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ requests>=2.28.0 py-clob-client>=0.1.0 numpy>=1.24.0 -python-dotenv>=1.0.0 \ No newline at end of file +python-dotenv>=1.0.0 +websockets>=12.0,<13.0 \ No newline at end of file diff --git a/utils/fetchPolymarketLivePrice.py b/utils/fetchPolymarketLivePrice.py new file mode 100644 index 0000000..0290e7b --- /dev/null +++ b/utils/fetchPolymarketLivePrice.py @@ -0,0 +1,163 @@ + +import asyncio +import json +import websockets +from datetime import datetime +from zoneinfo import ZoneInfo +import sys +import random + +class PolymarketLiveMonitor: + """ + A tool to monitor live Polymarket crypto price data. + + This class uses WebSocket-based real-time price streaming to track short-term + 'Up or Down' markets and identify relevant 15-minute pricing windows. + """ + + def __init__(self): + self.ws_url = "wss://ws-live-data.polymarket.com" + self.price_to_beat = None + self.last_window_key = None + + def is_15min_window_start(self, dt) -> bool: + + return ( + dt.minute % 15 == 0 and + dt.second == 0 and + dt.microsecond == 0 + ) + + + async def stream_live_prices(self, symbol="btc/usd"): + """ + Subscribes to the Polymarket WebSocket RTDS for real-time Chainlink prices. + """ + subscription_message = { + "action": "subscribe", + "subscriptions": [{ + "topic": "crypto_prices_chainlink", + "type": "*", + "filters": json.dumps({"symbol": symbol}, separators=(',', ':')) + }] + } + + try: + async with websockets.connect( + self.ws_url, + ping_interval=20, + ping_timeout=20, + ) as websocket: + await websocket.send(json.dumps(subscription_message)) + + print("-" * 60) + print(f"{'Timestamp':<25} | {'Price':<12} | {'Diff'}") + print("-" * 60) + + backoff = 1 + max_backoff = 60 + + while True: + try: + response = await websocket.recv() + if not response: + continue + + data = json.loads(response) + except json.JSONDecodeError: + # Print raw response if it's not JSON for debugging + # Some servers send a string "connected" or heartbeats + if response == "connected": + print("Server confirmed: Connected") + else: + print(f"Received non-JSON message: {response}") + continue + + if data.get("topic") == "crypto_prices_chainlink": + payload = data.get("payload", {}) + + # The Chainlink feed uses the key 'value' + price_raw = payload.get("value") + price = 0.0 + if price_raw is None: + + print("[Warning] Received message with missing price value; skipping") + continue + + try: + price = float(price_raw) + except (TypeError, ValueError): + # Malformed price value; log and skip this message + print(f"[Warning] Received malformed price value: {price_raw!r}") + continue + ts_raw = payload.get("timestamp") + + dt = None + try: + dt = datetime.fromtimestamp(int(ts_raw)/1000.0, tz=ZoneInfo("America/New_York")) + ts_str = dt.strftime('%Y-%m-%d %H:%M:%S') + except (TypeError, ValueError, OSError) as e: + ts_str = str(ts_raw) + print(f"[Warning] Bad timestamp received: {ts_raw!r} ({e})") + + price_str = f"${price:,.2f}" + if dt is not None: + window_key = ( + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute // 15 + ) + + # Check if 15 min window is starting + if self.is_15min_window_start(dt) and window_key != self.last_window_key: + self.price_to_beat = price + price_to_beat_str = f"${self.price_to_beat:,.2f}" + self.last_window_key = window_key + sys.stdout.write("\033[1A\033[2K\r") + print(f"New window — Price to beat set: {price_to_beat_str}") + + # Calculate the difference between target price and current price + diff_val = (price - self.price_to_beat) if self.price_to_beat is not None else 0 + sign = "-" if diff_val < 0 else "" + diff_str = f"{sign}${abs(diff_val):,.2f}" + + print(f"\r{ts_str:<25} | {price_str:<12} | {diff_str}", end='', flush=True) + elif data.get("topic") == "error": + print(f"[Error] Server: {data.get('payload')}") + else: + pass + + except asyncio.CancelledError: + # Allow task cancellation (Ctrl+C, shutdown, task cancel) + raise + + except (websockets.exceptions.ConnectionClosed, + websockets.exceptions.WebSocketException) as e: + print(f"[WS] Connection closed: {e}") + sleep_time = min(backoff, max_backoff) + random.uniform(0, 0.5) + print(f"[WS] Disconnected: {e}") + print(f"[WS] Reconnecting in {sleep_time:.1f}s...") + + await asyncio.sleep(sleep_time) + backoff = min(backoff * 2, max_backoff) + + + except Exception as e: + # Catch-all for unexpected bugs + print(f"[Error] Unexpected failure: {e}") + + +async def main(): + monitor = PolymarketLiveMonitor() + + # Start the live stream + # Note: Short-term markets use Chainlink prices + await monitor.stream_live_prices(symbol="btc/usd") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n[Terminated] Monitoring stopped by user.") \ No newline at end of file