Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
requests>=2.28.0
py-clob-client>=0.1.0
numpy>=1.24.0
python-dotenv>=1.0.0
python-dotenv>=1.0.0
websockets>=12.0,<13.0
163 changes: 163 additions & 0 deletions utils/fetchPolymarketLivePrice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@

import asyncio
import json
import websockets
from datetime import datetime
from zoneinfo import ZoneInfo
import sys
import random

class PolymarketLiveMonitor:
"""
A tool to monitor live Polymarket crypto price data.

This class uses WebSocket-based real-time price streaming to track short-term
'Up or Down' markets and identify relevant 15-minute pricing windows.
"""

def __init__(self):
self.ws_url = "wss://ws-live-data.polymarket.com"
self.price_to_beat = None
self.last_window_key = None

def is_15min_window_start(self, dt) -> bool:

return (
dt.minute % 15 == 0 and
dt.second == 0 and
dt.microsecond == 0
)


async def stream_live_prices(self, symbol="btc/usd"):
"""
Subscribes to the Polymarket WebSocket RTDS for real-time Chainlink prices.
"""
subscription_message = {
"action": "subscribe",
"subscriptions": [{
"topic": "crypto_prices_chainlink",
"type": "*",
"filters": json.dumps({"symbol": symbol}, separators=(',', ':'))
}]
}

try:
async with websockets.connect(
self.ws_url,
ping_interval=20,
ping_timeout=20,
) as websocket:
await websocket.send(json.dumps(subscription_message))

print("-" * 60)
print(f"{'Timestamp':<25} | {'Price':<12} | {'Diff'}")
print("-" * 60)

backoff = 1
max_backoff = 60

while True:
try:
response = await websocket.recv()
if not response:
continue

data = json.loads(response)
except json.JSONDecodeError:
# Print raw response if it's not JSON for debugging
# Some servers send a string "connected" or heartbeats
if response == "connected":
print("Server confirmed: Connected")
else:
print(f"Received non-JSON message: {response}")
continue

if data.get("topic") == "crypto_prices_chainlink":
payload = data.get("payload", {})

# The Chainlink feed uses the key 'value'
price_raw = payload.get("value")
price = 0.0
if price_raw is None:

print("[Warning] Received message with missing price value; skipping")
continue

try:
price = float(price_raw)
except (TypeError, ValueError):
# Malformed price value; log and skip this message
print(f"[Warning] Received malformed price value: {price_raw!r}")
continue
ts_raw = payload.get("timestamp")

dt = None
try:
dt = datetime.fromtimestamp(int(ts_raw)/1000.0, tz=ZoneInfo("America/New_York"))
ts_str = dt.strftime('%Y-%m-%d %H:%M:%S')
except (TypeError, ValueError, OSError) as e:
ts_str = str(ts_raw)
print(f"[Warning] Bad timestamp received: {ts_raw!r} ({e})")
Comment on lines +100 to +101
Copy link

Copilot AI Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When timestamp parsing fails and dt is None, the code continues to calculate and display price information. However, this means price updates without valid timestamps are still printed, which could be confusing and makes it impossible to track when these prices occurred. Consider skipping messages with invalid timestamps.

Suggested change
ts_str = str(ts_raw)
print(f"[Warning] Bad timestamp received: {ts_raw!r} ({e})")
print(f"[Warning] Bad timestamp received: {ts_raw!r} ({e})")
# Skip this message since we cannot reliably determine when it occurred
continue

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are getting time stamp from and using that in case of any exception, ref "ts_raw"


price_str = f"${price:,.2f}"
if dt is not None:
window_key = (
dt.year,
dt.month,
dt.day,
dt.hour,
dt.minute // 15
)

# Check if 15 min window is starting
if self.is_15min_window_start(dt) and window_key != self.last_window_key:
self.price_to_beat = price
price_to_beat_str = f"${self.price_to_beat:,.2f}"
self.last_window_key = window_key
sys.stdout.write("\033[1A\033[2K\r")
print(f"New window — Price to beat set: {price_to_beat_str}")

# Calculate the difference between target price and current price
diff_val = (price - self.price_to_beat) if self.price_to_beat is not None else 0
sign = "-" if diff_val < 0 else ""
diff_str = f"{sign}${abs(diff_val):,.2f}"

print(f"\r{ts_str:<25} | {price_str:<12} | {diff_str}", end='', flush=True)
elif data.get("topic") == "error":
print(f"[Error] Server: {data.get('payload')}")
else:
pass

except asyncio.CancelledError:
# Allow task cancellation (Ctrl+C, shutdown, task cancel)
raise

except (websockets.exceptions.ConnectionClosed,
websockets.exceptions.WebSocketException) as e:
print(f"[WS] Connection closed: {e}")
Comment thread
Nsreddy18 marked this conversation as resolved.
sleep_time = min(backoff, max_backoff) + random.uniform(0, 0.5)
print(f"[WS] Disconnected: {e}")
print(f"[WS] Reconnecting in {sleep_time:.1f}s...")

await asyncio.sleep(sleep_time)
backoff = min(backoff * 2, max_backoff)


except Exception as e:
# Catch-all for unexpected bugs
print(f"[Error] Unexpected failure: {e}")


async def main():
monitor = PolymarketLiveMonitor()

# Start the live stream
# Note: Short-term markets use Chainlink prices
await monitor.stream_live_prices(symbol="btc/usd")

if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n[Terminated] Monitoring stopped by user.")