From ced715eb845fbb3c08b0f472c4d55974dbcb037f Mon Sep 17 00:00:00 2001 From: Sujeet Reddy Nanavala Date: Sun, 18 Jan 2026 00:10:15 +0530 Subject: [PATCH 1/3] added websocket to fetch bitcoin price --- requirements.txt | 3 +- utils/fetchPolymarketLivePrice.py | 120 ++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 utils/fetchPolymarketLivePrice.py diff --git a/requirements.txt b/requirements.txt index f05d39d..cd6babb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ requests>=2.28.0 py-clob-client>=0.1.0 numpy>=1.24.0 -python-dotenv>=1.0.0 \ No newline at end of file +python-dotenv>=1.0.0 +websockets>=12.0,<13.0 \ No newline at end of file diff --git a/utils/fetchPolymarketLivePrice.py b/utils/fetchPolymarketLivePrice.py new file mode 100644 index 0000000..e886c58 --- /dev/null +++ b/utils/fetchPolymarketLivePrice.py @@ -0,0 +1,120 @@ + +import asyncio +import json +import websockets +from datetime import datetime +from zoneinfo import ZoneInfo + +class PolymarketLiveMonitor: + """ + A tool to monitor live Polymarket crypto price data and extract market strike prices. + + This class combines WebSocket-based real-time price streaming with HTTP-based + scraping of market metadata to provide a comprehensive view of short-term + 'Up or Down' markets. + """ + + def __init__(self): + self.ws_url = "wss://ws-live-data.polymarket.com" + self.price_to_beat = None + self.last_window_key = None + + def is_15min_window_start(self, dt) -> bool: + + return ( + dt.minute % 15 == 0 and + dt.second == 0 and + dt.microsecond == 0 + ) + + + async def stream_live_prices(self, symbol="btc/usd"): + """ + Subscribes to the Polymarket WebSocket RTDS for real-time Chainlink prices. + """ + subscription_message = { + "action": "subscribe", + "subscriptions": [{ + "topic": "crypto_prices_chainlink", + "type": "*", + "filters": json.dumps({"symbol": symbol}, separators=(',', ':')) + }] + } + + try: + async with websockets.connect(self.ws_url) as websocket: + await websocket.send(json.dumps(subscription_message)) + + print("-" * 60) + print(f"{'Timestamp':<25} | {'Price':<12} | {'Diff'}") + print("-" * 60) + + while True: + try: + response = await websocket.recv() + if not response: + continue + + data = json.loads(response) + except json.JSONDecodeError: + # Print raw response if it's not JSON for debugging + # Some servers send a string "connected" or heartbeats + if response == "connected": + print("Server confirmed: Connected") + else: + print(f"Received non-JSON message: {response}") + continue + + if data.get("topic") == "crypto_prices_chainlink": + payload = data.get("payload", {}) + + # The Chainlink feed uses the key 'value' + price_raw = payload.get("value") + price = float(price_raw) if price_raw is not None else 0 + ts_raw = payload.get("timestamp") + + try: + dt = datetime.fromtimestamp(int(ts_raw)/1000.0, tz=ZoneInfo("America/New_York")) + ts_str = dt.strftime('%Y-%m-%d %H:%M:%S') + except: + ts_str = str(ts_raw) + + diff_str = "" + + # Ensure price is a float + price_val = float(price) + price_str = f"${price_val:,.2f}" + + window_key = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second // 15) + # Check if 15 min window is starting + if self.is_15min_window_start(dt) and window_key != self.last_window_key: + self.price_to_beat = price_val + price_to_beat_str = f"${self.price_to_beat:,.2f}" + self.last_window_key = window_key + print(f"\nNew window — Price to beat set: {price_to_beat_str}") + + # Calculate the difference between target price and current price + diff_val = (price_val - self.price_to_beat) if self.price_to_beat is not None else 0 + diff_str = f"${diff_val:,.2f}" + + print(f"\r{ts_str:<25} | {price_str:<12} | {diff_str}", end='', flush=True) + elif data.get("topic") == "error": + print(f"[Error] Server: {data.get('payload')}") + else: + pass + + except Exception as e: + print(f"[Error] WebSocket connection lost: {e}") + +async def main(): + monitor = PolymarketLiveMonitor() + + # Start the live stream + # Note: Short-term markets use Chainlink prices + await monitor.stream_live_prices(symbol="btc/usd") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n[Terminated] Monitoring stopped by user.") \ No newline at end of file From 38cd0fbac60b0f966ea986b4ffccccc397c3f08f Mon Sep 17 00:00:00 2001 From: Sujeet Reddy Nanavala Date: Sun, 18 Jan 2026 14:16:53 +0530 Subject: [PATCH 2/3] added review comments --- utils/fetchPolymarketLivePrice.py | 74 +++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/utils/fetchPolymarketLivePrice.py b/utils/fetchPolymarketLivePrice.py index e886c58..bdb6a25 100644 --- a/utils/fetchPolymarketLivePrice.py +++ b/utils/fetchPolymarketLivePrice.py @@ -7,11 +7,10 @@ class PolymarketLiveMonitor: """ - A tool to monitor live Polymarket crypto price data and extract market strike prices. + A tool to monitor live Polymarket crypto price data. - This class combines WebSocket-based real-time price streaming with HTTP-based - scraping of market metadata to provide a comprehensive view of short-term - 'Up or Down' markets. + This class uses WebSocket-based real-time price streaming to track short-term + 'Up or Down' markets and identify relevant 15-minute pricing windows. """ def __init__(self): @@ -22,9 +21,9 @@ def __init__(self): def is_15min_window_start(self, dt) -> bool: return ( - dt.minute % 15 == 0 and - dt.second == 0 and - dt.microsecond == 0 + dt.minute % 15 == 0 and + dt.second == 0 and + dt.microsecond == 0 ) @@ -42,7 +41,11 @@ async def stream_live_prices(self, symbol="btc/usd"): } try: - async with websockets.connect(self.ws_url) as websocket: + async with websockets.connect( + self.ws_url, + ping_interval=20, + ping_timeout=20, + ) as websocket: await websocket.send(json.dumps(subscription_message)) print("-" * 60) @@ -70,31 +73,44 @@ async def stream_live_prices(self, symbol="btc/usd"): # The Chainlink feed uses the key 'value' price_raw = payload.get("value") - price = float(price_raw) if price_raw is not None else 0 + price = 0.0 + if price_raw is not None: + try: + price = float(price_raw) + except (TypeError, ValueError): + # Malformed price value; log and skip this message + print(f"[Warning] Received malformed price value: {price_raw!r}") + continue ts_raw = payload.get("timestamp") + dt = None try: dt = datetime.fromtimestamp(int(ts_raw)/1000.0, tz=ZoneInfo("America/New_York")) ts_str = dt.strftime('%Y-%m-%d %H:%M:%S') - except: + except (TypeError, ValueError, OSError) as e: ts_str = str(ts_raw) - - diff_str = "" + print(f"[Warning] Bad timestamp received: {ts_raw!r} ({e})") # Ensure price is a float - price_val = float(price) - price_str = f"${price_val:,.2f}" - - window_key = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second // 15) - # Check if 15 min window is starting - if self.is_15min_window_start(dt) and window_key != self.last_window_key: - self.price_to_beat = price_val - price_to_beat_str = f"${self.price_to_beat:,.2f}" - self.last_window_key = window_key - print(f"\nNew window — Price to beat set: {price_to_beat_str}") + price_str = f"${price:,.2f}" + if dt is not None: + window_key = ( + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute // 15 + ) + + # Check if 15 min window is starting + if self.is_15min_window_start(dt) and window_key != self.last_window_key: + self.price_to_beat = price + price_to_beat_str = f"${self.price_to_beat:,.2f}" + self.last_window_key = window_key + print(f"\nNew window — Price to beat set: {price_to_beat_str}") # Calculate the difference between target price and current price - diff_val = (price_val - self.price_to_beat) if self.price_to_beat is not None else 0 + diff_val = (price - self.price_to_beat) if self.price_to_beat is not None else 0 diff_str = f"${diff_val:,.2f}" print(f"\r{ts_str:<25} | {price_str:<12} | {diff_str}", end='', flush=True) @@ -103,8 +119,18 @@ async def stream_live_prices(self, symbol="btc/usd"): else: pass + except asyncio.CancelledError: + # Allow task cancellation (Ctrl+C, shutdown, task cancel) + raise + + except (websockets.exceptions.ConnectionClosed, + websockets.exceptions.WebSocketException) as e: + print(f"[WS] Connection closed: {e}") + except Exception as e: - print(f"[Error] WebSocket connection lost: {e}") + # Catch-all for unexpected bugs + print(f"[Error] Unexpected failure: {e}") + async def main(): monitor = PolymarketLiveMonitor() From 577bbf0805f9d261127088932ab51308ee841672 Mon Sep 17 00:00:00 2001 From: Sujeet Reddy Nanavala Date: Sun, 18 Jan 2026 16:16:39 +0530 Subject: [PATCH 3/3] added review comments --- utils/fetchPolymarketLivePrice.py | 37 ++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/utils/fetchPolymarketLivePrice.py b/utils/fetchPolymarketLivePrice.py index bdb6a25..0290e7b 100644 --- a/utils/fetchPolymarketLivePrice.py +++ b/utils/fetchPolymarketLivePrice.py @@ -4,6 +4,8 @@ import websockets from datetime import datetime from zoneinfo import ZoneInfo +import sys +import random class PolymarketLiveMonitor: """ @@ -52,6 +54,9 @@ async def stream_live_prices(self, symbol="btc/usd"): print(f"{'Timestamp':<25} | {'Price':<12} | {'Diff'}") print("-" * 60) + backoff = 1 + max_backoff = 60 + while True: try: response = await websocket.recv() @@ -74,13 +79,17 @@ async def stream_live_prices(self, symbol="btc/usd"): # The Chainlink feed uses the key 'value' price_raw = payload.get("value") price = 0.0 - if price_raw is not None: - try: - price = float(price_raw) - except (TypeError, ValueError): - # Malformed price value; log and skip this message - print(f"[Warning] Received malformed price value: {price_raw!r}") - continue + if price_raw is None: + + print("[Warning] Received message with missing price value; skipping") + continue + + try: + price = float(price_raw) + except (TypeError, ValueError): + # Malformed price value; log and skip this message + print(f"[Warning] Received malformed price value: {price_raw!r}") + continue ts_raw = payload.get("timestamp") dt = None @@ -91,7 +100,6 @@ async def stream_live_prices(self, symbol="btc/usd"): ts_str = str(ts_raw) print(f"[Warning] Bad timestamp received: {ts_raw!r} ({e})") - # Ensure price is a float price_str = f"${price:,.2f}" if dt is not None: window_key = ( @@ -107,11 +115,13 @@ async def stream_live_prices(self, symbol="btc/usd"): self.price_to_beat = price price_to_beat_str = f"${self.price_to_beat:,.2f}" self.last_window_key = window_key - print(f"\nNew window — Price to beat set: {price_to_beat_str}") + sys.stdout.write("\033[1A\033[2K\r") + print(f"New window — Price to beat set: {price_to_beat_str}") # Calculate the difference between target price and current price diff_val = (price - self.price_to_beat) if self.price_to_beat is not None else 0 - diff_str = f"${diff_val:,.2f}" + sign = "-" if diff_val < 0 else "" + diff_str = f"{sign}${abs(diff_val):,.2f}" print(f"\r{ts_str:<25} | {price_str:<12} | {diff_str}", end='', flush=True) elif data.get("topic") == "error": @@ -126,6 +136,13 @@ async def stream_live_prices(self, symbol="btc/usd"): except (websockets.exceptions.ConnectionClosed, websockets.exceptions.WebSocketException) as e: print(f"[WS] Connection closed: {e}") + sleep_time = min(backoff, max_backoff) + random.uniform(0, 0.5) + print(f"[WS] Disconnected: {e}") + print(f"[WS] Reconnecting in {sleep_time:.1f}s...") + + await asyncio.sleep(sleep_time) + backoff = min(backoff * 2, max_backoff) + except Exception as e: # Catch-all for unexpected bugs