diff --git a/modules/events/__init__.py b/modules/events/__init__.py index e40151a..f90288e 100644 --- a/modules/events/__init__.py +++ b/modules/events/__init__.py @@ -28,6 +28,7 @@ ATVOTextConsumerEvent, ) from modules.events.chat_consumer_event import ChatConsumerEvent +from modules.events.overlay_consumer_event import OverlayConsumerEvent from modules.events.f1_qualifying_event import F1QualifyingEvent diff --git a/modules/events/f1_qualifying_event.py b/modules/events/f1_qualifying_event.py index 0301596..9b54e21 100644 --- a/modules/events/f1_qualifying_event.py +++ b/modules/events/f1_qualifying_event.py @@ -1,3 +1,5 @@ +import threading + from pandas import DataFrame from modules.events import BaseEvent @@ -60,6 +62,17 @@ def __init__( self.leaderboard[f"Q{n + 1}"] = {} self.leaderboard_df = DataFrame(self.leaderboard) + # Tracks car numbers knocked out in previous sessions so the overlay + # can display them distinctly from drivers still in contention. + self.knocked_out_drivers: set = set() + self.checkered_flag_out: bool = False + self.session_finishers: set = ( + set() + ) # cars that completed final lap this subsession + # Guards every read/write of leaderboard_df so the overlay thread never + # copies a partially-built DataFrame. + self.leaderboard_lock = threading.Lock() + def event_sequence(self): """ Main event sequence that runs the entire qualifying session. @@ -85,6 +98,7 @@ def event_sequence(self): ) def wait_before_next_session(self, seconds, session_number): + self.checkered_flag_out = False wait_start_time = self.sdk["SessionTime"] wait_end_time = wait_start_time + seconds # ----- SESSION PREPARATION PHASE ----- @@ -138,14 +152,14 @@ def apply_new_laptime(self, laps, carNumber, laptime): ] if new_cars_behind: sorted_laps = sorted(laps.items(), key=lambda x: x[1]) - for car in new_cars_behind: - # Give them their new position - self._chat( - f"/{car} You are now P{sorted_laps.index((car, laps[car])) + 1}" - ) - self._chat( - f"/{carNumber} You are now P{sorted_laps.index((carNumber, laps[carNumber])) + 1}" - ) + # for car in new_cars_behind: + # # Give them their new position + # self._chat( + # f"/{car} You are now P{sorted_laps.index((car, laps[car])) + 1}" + # ) + # self._chat( + # f"/{carNumber} You are now P{sorted_laps.index((carNumber, laps[carNumber])) + 1}" + # ) return laps def update_leaderboard(self, fastest_laps, session_number, send_msg=True): @@ -196,23 +210,24 @@ def update_leaderboard(self, fastest_laps, session_number, send_msg=True): # Update session leaderboard self.leaderboard[f"Q{session_number}"][car] = lap - # Update the dataframe representation of the leaderboard - self.leaderboard_df = DataFrame(self.leaderboard) - driver_names = { - c["CarNumber"]: c["UserName"] for c in self.sdk["DriverInfo"]["Drivers"] - } - self.leaderboard_df["Driver"] = self.leaderboard_df.index.map(driver_names) - # sort df columns - self.leaderboard_df = self.leaderboard_df[ - ["Driver"] + [f"Q{n + 1}" for n in range(len(self.session_minutes))] - ] - - # Sort the dataframe by lap times, prioritizing higher qualifying sessions - sessions = [f"Q{n + 1}" for n in range(len(self.session_minutes))] - sessions.reverse() - self.leaderboard_df = self.leaderboard_df.sort_values( - by=sessions, ascending=True - ) + # Rebuild the DataFrame under the lock so the overlay thread never + # copies a half-constructed frame. + with self.leaderboard_lock: + self.leaderboard_df = DataFrame(self.leaderboard) + driver_names = { + c["CarNumber"]: c["UserName"] for c in self.sdk["DriverInfo"]["Drivers"] + } + self.leaderboard_df["Driver"] = self.leaderboard_df.index.map(driver_names) + # sort df columns + self.leaderboard_df = self.leaderboard_df[ + ["Driver"] + [f"Q{n + 1}" for n in range(len(self.session_minutes))] + ] + # Sort the dataframe by lap times, prioritising higher qualifying sessions + sessions = [f"Q{n + 1}" for n in range(len(self.session_minutes))] + sessions.reverse() + self.leaderboard_df = self.leaderboard_df.sort_values( + by=sessions, ascending=True + ) def subsession( self, length, num_drivers_remain, session_number, subset_of_drivers=None @@ -232,6 +247,7 @@ def subsession( self.subsession_name = f"Q{session_number}" self._chat(f"Pit Exit is OPEN.", race_control=True) self.waiting_on = None + self.session_finishers = set() # ----- SESSION RUNNING PHASE ----- session_time_at_start = self.sdk["SessionTime"] @@ -300,6 +316,7 @@ def subsession( self._chat(f"Time Remaining: {self.subsession_time_remaining}") if out_of_time: + self.checkered_flag_out = True self._chat( f"Checkered flag is out for Q{session_number}!", race_control=True ) @@ -390,6 +407,7 @@ def subsession( > 30 ): remaining_cars.remove(car["CarNumber"]) + self.session_finishers.add(car["CarNumber"]) if first_car_to_take_checkered is None: first_car_to_take_checkered = car["CarNumber"] self._chat( @@ -408,6 +426,7 @@ def subsession( fastest_laps, session_number, send_msg=False ) remaining_cars.remove(car["CarNumber"]) + self.session_finishers.add(car["CarNumber"]) if first_car_to_take_checkered is None: first_car_to_take_checkered = car["CarNumber"] self._chat( @@ -422,6 +441,7 @@ def subsession( carIdx = driver_info_record[0]["CarIdx"] if self.sdk["CarIdxOnPitRoad"][carIdx] == 1: remaining_cars.remove(car["CarNumber"]) + self.session_finishers.add(car["CarNumber"]) self._chat(f"/{car['CarNumber']} Checkered Flag.") if lap_still_valid_reminder.__next__(): @@ -457,6 +477,7 @@ def subsession( # Notify eliminated drivers for car in eliminated_drivers: + self.knocked_out_drivers.add(car) self._chat(f"/{car} you have been eliminated from Q{session_number}!") # Notify advancing drivers diff --git a/modules/events/overlay_consumer_event.py b/modules/events/overlay_consumer_event.py new file mode 100644 index 0000000..2717a76 --- /dev/null +++ b/modules/events/overlay_consumer_event.py @@ -0,0 +1,480 @@ +""" +OverlayConsumerEvent +==================== +Serves browser-based OBS overlays over a local HTTP server. + +Endpoints +--------- +GET / – Consolidated qualifying overlay (timing tower + standings board). +GET /static/fonts/ – Font files served from the flet_pages/fonts/ directory. +GET /sse/rc – Server-Sent Events stream for race-control messages. +GET /sse/f1 – Server-Sent Events stream for F1 timing state. + +Usage +----- +Add a **single** Browser Source in OBS pointed at:: + + http://localhost:/ + +Enable "Allow transparency" in the Browser Source settings. + +The overlay automatically switches between the **timing tower** and the **full +standings board** based on session state: + +* **Timing tower** – shown while a session is live and at least one eligible + driver has not yet completed their final timed lap after the checkered flag. +* **Standings board** – shown once every eligible driver has crossed the line, + and again during the pre-session countdown between rounds. + +Integration with F1 Qualifying +------------------------------- +After starting the F1QualifyingEvent, set ``overlay_event.f1_event = f1_event`` so +the overlay server can poll the live leaderboard. Clear it on stop. + +Message queue +------------- +The overlay consumer reads from the shared ``broadcast_text_queue`` just like the +Discord/SDK text consumers. If you want both Discord and overlay running at the +same time, be aware that each message is consumed by one reader only; choose one +consumer per deployment or extend SubprocessManager with fan-out if needed. +""" + +import json +import queue +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from math import isnan +from pathlib import Path +from typing import TYPE_CHECKING + +from modules.events import BaseEvent + +if TYPE_CHECKING: + from modules.events.f1_qualifying_event import F1QualifyingEvent + +# --------------------------------------------------------------------------- # +# Paths +# --------------------------------------------------------------------------- # + +_THIS_DIR = Path(__file__).parent +_OVERLAY_HTML = _THIS_DIR.parent / "flet_pages" / "overlays" / "overlay.html" +_FONTS_DIR = _OVERLAY_HTML.parent.parent / "fonts" + + +# --------------------------------------------------------------------------- # +# HTTP server +# --------------------------------------------------------------------------- # + + +class _OverlayHTTPServer(ThreadingHTTPServer): + """ThreadingHTTPServer with sane defaults for a local overlay server. + + allow_reuse_address – lets the server rebind to the same port immediately + after a stop/start cycle without hitting 'Address + already in use' errors. + daemon_threads – request-handler threads (including long-lived SSE + connections) are killed automatically when the main + event thread exits, so they never block shutdown. + """ + + allow_reuse_address = True + daemon_threads = True + + +# --------------------------------------------------------------------------- # +# Event +# --------------------------------------------------------------------------- # + + +class OverlayConsumerEvent(BaseEvent): + """ + Consumer event that serves transparent HTML overlays for OBS browser sources. + + Parameters + ---------- + port : int + TCP port for the HTTP server (default 8765). + width : int + Overlay canvas width in pixels (default 1920). + height : int + Overlay canvas height in pixels (default 1080). + """ + + def __init__( + self, + port: int = 8765, + width: int = 1920, + height: int = 1080, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.port = int(port) + self.width = int(width) + self.height = int(height) + + # Per-overlay SSE client queues: list of queue.Queue, one per connected tab. + self._rc_clients: list[queue.Queue] = [] + self._f1_clients: list[queue.Queue] = [] + self._clients_lock = threading.Lock() + + # Set this to a running F1QualifyingEvent to enable the timing-tower overlay. + self.f1_event: F1QualifyingEvent | None = None + + self._server = None + + # ---------------------------------------------------------------------- # + # Public broadcast helpers (called from event_sequence loop) + # ---------------------------------------------------------------------- # + + def push_rc_message(self, title: str, text: str) -> None: + """Broadcast a race-control banner to all connected /rc-message clients.""" + payload = json.dumps({"title": title, "text": text}) + with self._clients_lock: + for q in list(self._rc_clients): + q.put(payload) + + def push_f1_state(self, state: dict) -> None: + """Broadcast an F1 timing-tower update to all connected /f1-timing clients.""" + payload = json.dumps(state) + with self._clients_lock: + for q in list(self._f1_clients): + q.put(payload) + + # ---------------------------------------------------------------------- # + # BaseEvent overrides + # ---------------------------------------------------------------------- # + + def event_sequence(self) -> None: + handler_class = self._build_handler() + self._server = _OverlayHTTPServer(("", self.port), handler_class) + + server_thread = threading.Thread( + target=self._server.serve_forever, daemon=True, name="overlay-http" + ) + server_thread.start() + self.logger.info("Overlay server listening on http://localhost:%d/", self.port) + + # ---- Main loop: drain queues and push state to SSE clients ---- # + last_f1_state: dict | None = None + try: + while True: + try: + # Race-control messages + try: + msg = self.broadcast_text_queue.get_nowait() + if isinstance(msg, dict): + self.push_rc_message( + msg.get("title", "Race Control"), + msg.get("text", ""), + ) + except queue.Empty: + pass + + # F1 timing tower state + if self.f1_event is not None: + state = self._build_f1_state() + # Only push when state actually changes to reduce traffic. + if state and state != last_f1_state: + self.push_f1_state(state) + last_f1_state = state + + except Exception as exc: + # Never let a single bad frame kill the HTTP server. + self.logger.warning( + "Overlay loop error (server kept alive): %s", exc, exc_info=True + ) + + try: + self.sleep(0.5) + except KeyboardInterrupt: + break + finally: + # Always shut down and close the socket so the port is freed + # immediately and a restart can rebind without 'Address in use'. + self._server.shutdown() + self._server.server_close() + self.logger.info("Overlay server stopped.") + + # ---------------------------------------------------------------------- # + # F1 state serialiser + # ---------------------------------------------------------------------- # + + def _build_f1_state(self) -> dict | None: + """Read the live F1 event and return a JSON-serialisable timing-tower state.""" + ev = self.f1_event + if ev is None: + return None + try: + # Acquire the leaderboard lock (if present) so we never copy a + # DataFrame that the F1 event thread is currently rebuilding. + df_lock = getattr(ev, "leaderboard_lock", None) + if df_lock is not None: + with df_lock: + df = ev.leaderboard_df.copy() + else: + df = ev.leaderboard_df.copy() + + session_name = ev.subsession_name or "" + time_remaining = ev.subsession_time_remaining or "--:--" + checkered_flag = getattr(ev, "checkered_flag_out", False) + q_cols = [c for c in df.columns if c != "Driver"] + + if df.empty: + return { + "session_name": session_name, + "time_remaining": time_remaining, + "checkered_flag": checkered_flag, + "sessions": q_cols, + "drivers": [], + } + + # Determine whether we are inside an active Qn session. + is_active_session = session_name.startswith( + "Q" + ) and not session_name.startswith("Pre-") + current_q_num = None + session_advancing = None + driver_at_risk_idx = None + + if is_active_session: + try: + current_q_num = int(session_name[1:]) + session_idx = current_q_num - 1 + if 0 <= session_idx < len(ev.session_advancing_cars): + session_advancing = ev.session_advancing_cars[session_idx] + if 0 < session_advancing <= len(df): + driver_at_risk_idx = session_advancing - 1 + except (ValueError, IndexError): + pass + + knocked_out_set = getattr(ev, "knocked_out_drivers", set()) + session_finishers = getattr(ev, "session_finishers", set()) + drivers = [] + + for pos, (car_num, row) in enumerate(df.iterrows()): + # Best lap time across all sessions (smallest positive value). + best_time = None + for col in q_cols: + val = row.get(col) + if isinstance(val, (int, float)) and not isnan(val) and val > 0: + if best_time is None or val < best_time: + best_time = val + + # Per-session individual lap times for the standings overlay. + session_times: dict[str, str] = {} + for col in q_cols: + val = row.get(col) + if isinstance(val, (int, float)) and not isnan(val) and val > 0: + session_times[col] = _fmt_time(val) + else: + session_times[col] = "" + + # Has this driver NOT yet set a lap in the CURRENT session? + # (Only meaningful during active sessions; knocked-out drivers are excluded.) + no_current_time = False + if ( + is_active_session + and current_q_num is not None + and car_num not in knocked_out_set + ): + current_q_col = f"Q{current_q_num}" + cval = row.get(current_q_col) + no_current_time = ( + cval is None + or not isinstance(cval, (int, float)) + or isnan(cval) + or cval <= 0 + ) + + # Determine status. + status = self._driver_status( + car_num=car_num, + row=row, + pos=pos, + is_active_session=is_active_session, + current_q_num=current_q_num, + session_advancing=session_advancing, + driver_at_risk_idx=driver_at_risk_idx, + knocked_out=knocked_out_set, + ) + + drivers.append( + { + "position": pos + 1, + "car_num": str(car_num), + "driver_name": str(row.get("Driver", "Unknown")), + "best_time": _fmt_time(best_time), + "status": status, + "session_times": session_times, + "no_current_time": no_current_time, + "finished": car_num in session_finishers, + } + ) + + return { + "session_name": session_name, + "time_remaining": time_remaining, + "checkered_flag": checkered_flag, + "sessions": q_cols, + "drivers": drivers, + } + + except Exception: + return None + + @staticmethod + def _driver_status( + car_num, + row, + pos: int, + is_active_session: bool, + current_q_num: int | None, + session_advancing: int | None, + driver_at_risk_idx: int | None, + knocked_out: set, + ) -> str: + """Return one of: 'safe', 'at_risk', 'elimination_zone', 'knocked_out'.""" + # Explicitly tracked knocked-out drivers always take priority. + if car_num in knocked_out: + return "knocked_out" + + if not is_active_session or session_advancing is None or current_q_num is None: + return "safe" + + # Drivers who have advanced to this session but not yet set a lap time are + # shown as 'safe' (not eliminated – they just haven't run yet). + current_q_col = f"Q{current_q_num}" + val = row.get(current_q_col) + no_current_time = ( + val is None or not isinstance(val, (int, float)) or isnan(val) or val <= 0 + ) + if no_current_time: + return "safe" + + # Classify by finishing position relative to the cutoff. + if session_advancing <= 0: + return "safe" + if driver_at_risk_idx is None: + return "safe" + if pos > driver_at_risk_idx: + return "elimination_zone" + if pos == driver_at_risk_idx: + return "at_risk" + return "safe" + + # ---------------------------------------------------------------------- # + # HTTP handler factory + # ---------------------------------------------------------------------- # + + def _build_handler(self): + """Return a BaseHTTPRequestHandler class closed over *self* (the event).""" + event = self + + class OverlayHandler(BaseHTTPRequestHandler): + + def do_GET(self): + path = self.path.split("?")[0] + if path == "/sse/rc": + self._handle_sse(event._rc_clients) + elif path == "/sse/f1": + self._handle_sse(event._f1_clients) + elif path.startswith("/static/fonts/"): + font_name = path[len("/static/fonts/") :] + self._serve_file(_FONTS_DIR / font_name, "font/truetype") + else: + # All paths (including "/") serve the consolidated overlay. + self._serve_html(_OVERLAY_HTML) + + # ---------------------------------------------------------------- # + # SSE + # ---------------------------------------------------------------- # + + def _handle_sse(self, client_list: list): + """Hold the connection open and stream JSON data events.""" + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.send_header("Connection", "keep-alive") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + client_q: queue.Queue = queue.Queue() + with event._clients_lock: + client_list.append(client_q) + + try: + while True: + try: + data = client_q.get(timeout=15) + self.wfile.write(f"data: {data}\n\n".encode()) + self.wfile.flush() + except queue.Empty: + # Keepalive comment keeps the connection alive. + self.wfile.write(b": ping\n\n") + self.wfile.flush() + except Exception: + pass + finally: + with event._clients_lock: + if client_q in client_list: + client_list.remove(client_q) + + # ---------------------------------------------------------------- # + # Static files + # ---------------------------------------------------------------- # + + def _serve_html(self, file_path: Path): + if not file_path.exists(): + self.send_error(404, f"Overlay not found: {file_path.name}") + return + content = file_path.read_text(encoding="utf-8") + # Simple template substitution for width/height/port. + content = ( + content.replace("{{WIDTH}}", str(event.width)) + .replace("{{HEIGHT}}", str(event.height)) + .replace("{{PORT}}", str(event.port)) + ) + body = content.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _serve_file(self, file_path: Path, mime: str): + if not file_path.exists(): + self.send_error(404, f"Not found: {file_path.name}") + return + data = file_path.read_bytes() + self.send_response(200) + self.send_header("Content-Type", mime) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + # Suppress default access-log noise. + def log_message(self, format, *args): # noqa: N802 + pass + + return OverlayHandler + + +# --------------------------------------------------------------------------- # +# Helpers +# --------------------------------------------------------------------------- # + + +def _fmt_time(seconds) -> str: + """Format a lap time given in seconds as MM:SS.mmm.""" + if ( + seconds is None + or not isinstance(seconds, (int, float)) + or isnan(seconds) + or seconds <= 0 + ): + return "" + mins = int(seconds // 60) + secs = int(seconds % 60) + millis = int((seconds % 1) * 1000) + return f"{mins:02d}:{secs:02d}.{millis:03d}" diff --git a/modules/flet_pages/download_fonts.py b/modules/flet_pages/download_fonts.py new file mode 100644 index 0000000..f78bba9 --- /dev/null +++ b/modules/flet_pages/download_fonts.py @@ -0,0 +1,60 @@ +""" +download_fonts.py +================= +Downloads the Saira font variants used by the OBS overlays into the +local ``fonts/`` directory so the overlays can serve them without +requiring an internet connection. + +Run once from the project root (or from this directory): + + python modules/flet_pages/download_fonts.py + +The overlay HTTP server already falls back to the Google Fonts CDN +automatically when the local files are absent, so this script is only +needed for fully-offline setups. +""" + +import urllib.request +from pathlib import Path + +FONTS_DIR = Path(__file__).parent / "fonts" + +# Saira v23 – sourced from Google Fonts CDN (fonts.gstatic.com). +# If these URLs ever become stale, visit: +# https://fonts.googleapis.com/css2?family=Saira:ital,wght@0,400;0,700;0,900;1,700&display=swap +# and copy the updated src URLs. +FONT_FILES = { + "Saira-Black.ttf": ( + "https://fonts.gstatic.com/s/saira/v23/" + "memWYa2wxmKQyPMrZX79wwYZQMhsyuShhKMjjbU9uXuA7_PFosg.ttf" + ), + "Saira-Bold.ttf": ( + "https://fonts.gstatic.com/s/saira/v23/" + "memWYa2wxmKQyPMrZX79wwYZQMhsyuShhKMjjbU9uXuA773Fosg.ttf" + ), + "Saira-Regular.ttf": ( + "https://fonts.gstatic.com/s/saira/v23/" + "memWYa2wxmKQyPMrZX79wwYZQMhsyuShhKMjjbU9uXuA71rCosg.ttf" + ), + "Saira-BoldItalic.ttf": ( + "https://fonts.gstatic.com/s/saira/v23/" + "memUYa2wxmKQyNkiV50dulWP7s95AqZTzZHcVdxWI9WH-pKBrYwxkw.ttf" + ), +} + + +def main() -> None: + FONTS_DIR.mkdir(parents=True, exist_ok=True) + for filename, url in FONT_FILES.items(): + dest = FONTS_DIR / filename + if dest.exists(): + print(f" [skip] {filename} (already present)") + continue + print(f" [fetch] {filename} ...", end=" ", flush=True) + urllib.request.urlretrieve(url, dest) + print(f"done ({dest.stat().st_size // 1024} KB)") + print("\nAll fonts ready in:", FONTS_DIR) + + +if __name__ == "__main__": + main() diff --git a/modules/flet_pages/overlays/overlay.html b/modules/flet_pages/overlays/overlay.html new file mode 100644 index 0000000..e783b71 --- /dev/null +++ b/modules/flet_pages/overlays/overlay.html @@ -0,0 +1,1159 @@ + + + + + + +
+ + + +
+
+ Qualifying + --:-- +
+
+
+ + + + +
+
+ Qualifying Standings + --:-- +
+
+
+
+ + + + +
+
+
+
+
Race Control
+
+
+
Race Control
+
+
+
+ + + + + diff --git a/modules/flet_pages/race_control.py b/modules/flet_pages/race_control.py index 6dc58d2..6edee8a 100644 --- a/modules/flet_pages/race_control.py +++ b/modules/flet_pages/race_control.py @@ -95,9 +95,14 @@ def __init__(self): self.text_consumer_config = {} self.audio_consumer_config = {} self.chat_consumer_config = {} + self.overlay_consumer_config = { + "port": "8765", + } self.text_consumer_enabled = False self.audio_consumer_enabled = False self.chat_consumer_enabled = False + self.overlay_consumer_enabled = False + self.overlay_event = None # Running OverlayConsumerEvent reference self.chat_message_list = None self.chat_refresh_timer = None @@ -111,6 +116,7 @@ def __init__(self): self.clear_black_flag_enabled = False self.scheduled_black_flag_enabled = False self.gap_to_leader_enabled = False + self.f1_qualifying_enabled = False # Tab references for updating indicators self.tabs_control = None @@ -128,7 +134,6 @@ def __init__(self): self.f1_final_session = {"duration": "8", "advancing_cars": "0"} self.f1_wait_between = 120 self.f1_refresh_timer = None - self.f1_dialog = None # Beer Goggles mode state self.goggle_event: Optional[BaseEvent] = None @@ -367,6 +372,12 @@ def get_tab_definitions(self): "enabled": self.gap_to_leader_enabled, "build_func": self.build_gap_to_leader_tab, }, + { + "name": "F1 Qualifying", + "icon": ft.Icons.SPEED, + "enabled": self.f1_qualifying_enabled, + "build_func": self.build_f1_qualifying_tab, + }, ] def build_main_tabs(self): @@ -2334,6 +2345,14 @@ def build_consumer_section(self): border_radius=5, padding=10, ), + ft.Container(height=8), + ft.Container( + content=self.build_overlay_consumer(), + width=400, + border=ft.border.all(1, ft.Colors.OUTLINE), + border_radius=5, + padding=10, + ), ], ) @@ -2525,6 +2544,77 @@ def toggle_enabled(e): spacing=5, ) + def build_overlay_consumer(self): + """Build the overlay server configuration panel.""" + if not self.overlay_consumer_config: + self.overlay_consumer_config = { + "port": "8765", + } + config = self.overlay_consumer_config + + def update_config(key, value): + self.overlay_consumer_config[key] = value + # Live-update the URL hint + url_text.value = ( + f"http://localhost:{self.overlay_consumer_config.get('port', '8765')}/" + ) + url_text.update() + + def toggle_enabled(e): + self.overlay_consumer_enabled = e.control.value + disabled = not e.control.value or self.is_running + port_field.disabled = disabled + self.page.update() + + enable_check = ft.Checkbox( + label="Enable Overlay Server (OBS)", + value=self.overlay_consumer_enabled, + on_change=toggle_enabled, + disabled=self.is_running, + ) + + port_field = ft.TextField( + label="Port", + value=config.get("port", "8765"), + width=90, + disabled=not self.overlay_consumer_enabled or self.is_running, + on_change=lambda e: update_config("port", e.control.value), + ) + + url_text = ft.Text( + f"http://localhost:{config.get('port', '8765')}/", + size=11, + color=ft.Colors.BLUE, + selectable=True, + ) + + return ft.Column( + [ + ft.Text( + "Overlay Server (OBS Browser Source)", + size=14, + weight=ft.FontWeight.BOLD, + ), + ft.Divider(height=5), + ft.Text( + "The overlay canvas automatically fills the OBS browser source size.", + size=11, + color=ft.Colors.ON_SURFACE_VARIANT, + italic=True, + ), + enable_check, + ft.Row([port_field], spacing=8), + ft.Row( + [ + ft.Icon(ft.Icons.LINK, size=14, color=ft.Colors.BLUE), + url_text, + ], + spacing=4, + ), + ], + spacing=5, + ) + def start_race_control(self, e): """Start the race control system""" # Get logger for logging errors @@ -2805,6 +2895,23 @@ def start_race_control(self, e): } ) + # Add Overlay Consumer if enabled + if self.overlay_consumer_enabled: + event_list.append( + { + "class": events.OverlayConsumerEvent, + "args": { + "port": int(self.overlay_consumer_config.get("port", 8765)), + "width": int( + self.overlay_consumer_config.get("width", 1920) + ), + "height": int( + self.overlay_consumer_config.get("height", 1080) + ), + }, + } + ) + # Create event instances with error handling event_instances = [] for i, item in enumerate(event_list): @@ -2836,11 +2943,38 @@ def start_race_control(self, e): if logger: logger.info(f"Started {len(event_instances)} events successfully") + # Keep a reference to the overlay event so we can wire in the F1 + # qualifying event later without restarting the server. + self.overlay_event = next( + ( + e + for e in event_instances + if isinstance(e, events.OverlayConsumerEvent) + ), + None, + ) + # Create and start subprocess manager event_run_methods = [event.run for event in event_instances] self.subprocess_manager = SubprocessManager(event_run_methods) self.subprocess_manager.start() + # Start F1 Qualifying if enabled + if self.f1_qualifying_enabled: + all_sessions = [*self.f1_elim_sessions, self.f1_final_session] + session_lengths = ", ".join([s["duration"] for s in all_sessions]) + advancing_cars = ", ".join([s["advancing_cars"] for s in all_sessions]) + self.f1_event = F1QualifyingEvent( + session_lengths, + advancing_cars, + wait_between_sessions=self.f1_wait_between, + ) + self.f1_subprocess_manager = SubprocessManager([self.f1_event.run]) + self.f1_subprocess_manager.start() + if self.overlay_event is not None: + self.overlay_event.f1_event = self.f1_event + self.page.run_task(self.f1_refresh_task) + # Start chat consumer refresh task if enabled if self.chat_consumer_enabled: self.page.run_task(self.chat_refresh_task) @@ -2936,6 +3070,14 @@ def stop_race_control(self, e): self.subprocess_manager.stop() self.subprocess_manager = None + # Stop F1 Qualifying if running + if self.f1_subprocess_manager: + self.f1_subprocess_manager.stop() + self.f1_subprocess_manager = None + self.f1_event = None + + self.overlay_event = None + # Clear chat messages when stopping if self.chat_message_list: self.chat_message_list.controls.clear() @@ -3155,6 +3297,10 @@ def save_preset(self, name: str): "enabled": self.chat_consumer_enabled, "config": self.chat_consumer_config, }, + "overlay_consumer": { + "enabled": self.overlay_consumer_enabled, + "config": self.overlay_consumer_config, + }, } with open(os.path.join(preset_dir, f"{name}.json"), "w") as f: @@ -3313,6 +3459,14 @@ def _load_config_data(self, config: dict): self.chat_consumer_enabled = True self.chat_consumer_config = {} + # Load Overlay Consumer + overlay_consumer = config.get("overlay_consumer", {}) + self.overlay_consumer_enabled = overlay_consumer.get("enabled", False) + self.overlay_consumer_config = overlay_consumer.get( + "config", + {"port": "8765"}, + ) + def load_preset(self, name: str, silent: bool = False): """Load a saved preset @@ -3374,15 +3528,6 @@ def build_footer(self): content=ft.Row( [ ft.Text("Special Modes:", size=14, weight=ft.FontWeight.BOLD), - ft.ElevatedButton( - "F1 Qualifying", - icon=ft.Icons.SPEED, - on_click=self.open_f1_qualifying, - style=ft.ButtonStyle( - bgcolor=ft.Colors.BLUE_900, - color=ft.Colors.WHITE, - ), - ), ft.ElevatedButton( "Beer Goggles", icon=ft.Icons.REMOVE_RED_EYE, @@ -3401,24 +3546,12 @@ def build_footer(self): border_radius=5, ) - def open_f1_qualifying(self, e): - """Open F1 Qualifying mode dialog""" - self.f1_dialog = ft.AlertDialog( - modal=False, # Allow interaction with other windows - title=ft.Text("F1 Qualifying Mode"), - content=self.build_f1_qualifying_content(), - actions=[ft.TextButton("Close", on_click=self.close_f1_dialog)], - ) - self.page.overlay.append(self.f1_dialog) - self.f1_dialog.open = True - self.page.update() - - def build_f1_qualifying_content(self): - """Build the F1 qualifying configuration UI""" + def build_f1_qualifying_tab(self): + """Build the F1 Qualifying tab content""" session_list = ft.Column( scroll=ft.ScrollMode.AUTO, - height=300, - spacing=10, + height=220, + spacing=5, ) def rebuild_sessions(): @@ -3431,12 +3564,12 @@ def rebuild_sessions(): ft.Text("Session", weight=ft.FontWeight.BOLD), width=80 ), ft.Container( - ft.Text("Duration (Mins)", weight=ft.FontWeight.BOLD), width=150 + ft.Text("Duration (Mins)", weight=ft.FontWeight.BOLD), width=130 ), ft.Container( - ft.Text("Advancing Cars", weight=ft.FontWeight.BOLD), width=150 + ft.Text("Advancing Cars", weight=ft.FontWeight.BOLD), width=130 ), - ft.Container(width=100), + ft.Container(width=60), ] ) session_list.controls.append(header) @@ -3470,20 +3603,23 @@ def on_click(e): ft.Container(ft.Text(f"Q{i + 1}", size=16), width=80), ft.TextField( value=session["duration"], - width=150, + width=130, on_change=make_duration_change(i), text_align=ft.TextAlign.CENTER, + disabled=self.is_running, ), ft.TextField( value=session["advancing_cars"], - width=150, + width=130, on_change=make_advancing_change(i), text_align=ft.TextAlign.CENTER, + disabled=self.is_running, ), ft.IconButton( icon=ft.Icons.DELETE, icon_color=ft.Colors.RED, on_click=make_remove_click(i), + disabled=self.is_running, ), ] ) @@ -3500,17 +3636,18 @@ def on_final_duration_change(e): ), ft.TextField( value=self.f1_final_session["duration"], - width=150, + width=130, on_change=on_final_duration_change, text_align=ft.TextAlign.CENTER, + disabled=self.is_running, ), ft.TextField( value="0", - width=150, + width=130, disabled=True, text_align=ft.TextAlign.CENTER, ), - ft.Container(width=100), + ft.Container(width=60), ] ) session_list.controls.append(final_row) @@ -3528,64 +3665,73 @@ def on_wait_change(e): except: pass - controls = ft.Column( - [ - session_list, - ft.Row( - [ - ft.ElevatedButton( - "Add Session", - icon=ft.Icons.ADD, - on_click=add_session, - ), - ] - ), - ft.Divider(), - ft.Row( - [ - ft.Text("Wait Between Sessions (seconds):", size=14), - ft.TextField( - value=str(self.f1_wait_between), - width=100, - on_change=on_wait_change, - text_align=ft.TextAlign.CENTER, - ), - ] - ), - ft.Divider(), - ft.Row( - [ - ft.ElevatedButton( - "Start F1 Qualifying", - icon=ft.Icons.PLAY_ARROW, - on_click=self.start_f1_qualifying, - style=ft.ButtonStyle( - bgcolor=ft.Colors.GREEN, - color=ft.Colors.WHITE, + def toggle_enabled(e): + self.f1_qualifying_enabled = e.control.value + self.update_tab_indicators() + + enable_toggle = ft.Switch( + label="Enable F1 Qualifying", + value=self.f1_qualifying_enabled, + on_change=toggle_enabled, + disabled=self.is_running, + ) + + return ft.Container( + content=ft.Column( + [ + ft.Row( + [ + ft.Column( + [ + ft.Text( + "F1 Qualifying Mode", + size=16, + weight=ft.FontWeight.BOLD, + ), + ft.Text( + "F1-style knockout qualifying sessions", + size=12, + color=ft.Colors.GREY, + ), + ], + expand=True, ), - ), - ft.ElevatedButton( - "Stop F1 Qualifying", - icon=ft.Icons.STOP, - on_click=self.stop_f1_qualifying, - style=ft.ButtonStyle( - bgcolor=ft.Colors.RED, - color=ft.Colors.WHITE, + enable_toggle, + ], + alignment=ft.MainAxisAlignment.SPACE_BETWEEN, + ), + ft.Divider(height=5), + ft.Row( + [ + ft.Text("Wait Between Sessions (seconds):", size=13), + ft.TextField( + value=str(self.f1_wait_between), + width=80, + on_change=on_wait_change, + text_align=ft.TextAlign.CENTER, + disabled=self.is_running, ), - ), - ], - spacing=10, - ), - ft.Container(height=10), - self.build_f1_leaderboard(), - ], - scroll=ft.ScrollMode.AUTO, - width=1600, - height=1000, + ft.Container(expand=True), + ft.ElevatedButton( + "Add Session", + icon=ft.Icons.ADD, + on_click=add_session, + disabled=self.is_running, + ), + ], + spacing=10, + ), + ft.Container(height=5), + session_list, + ft.Divider(height=5), + self.build_f1_leaderboard(), + ], + scroll=ft.ScrollMode.AUTO, + spacing=5, + ), + padding=8, ) - return controls - def build_f1_leaderboard(self): """Build F1 qualifying leaderboard display - returns container that will be updated""" # Create persistent Column that we'll update (not replace) @@ -3856,60 +4002,6 @@ async def f1_refresh_task(self): self.update_f1_leaderboard() await asyncio.sleep(0.5) # Update twice per second - def start_f1_qualifying(self, e): - """Start F1 Qualifying event""" - all_sessions = [*self.f1_elim_sessions, self.f1_final_session] - session_lengths = ", ".join([s["duration"] for s in all_sessions]) - advancing_cars = ", ".join([s["advancing_cars"] for s in all_sessions]) - - self.f1_event = F1QualifyingEvent( - session_lengths, advancing_cars, wait_between_sessions=self.f1_wait_between - ) - self.f1_subprocess_manager = SubprocessManager([self.f1_event.run]) - self.f1_subprocess_manager.start() - - # Start refresh task - self.page.run_task(self.f1_refresh_task) - - self.page.show_snack_bar( - ft.SnackBar( - content=ft.Text("F1 Qualifying Started!"), bgcolor=ft.Colors.GREEN - ) - ) - self.page.update() - - def stop_f1_qualifying(self, e): - """Stop F1 Qualifying event""" - if self.f1_subprocess_manager: - self.f1_subprocess_manager.stop() - self.f1_subprocess_manager = None - - self.f1_event = None - - # Clear leaderboard - if self.f1_leaderboard_column: - self.f1_leaderboard_column.controls = [ - ft.Text( - "Qualifying stopped", - size=14, - color=ft.Colors.GREY, - ) - ] - self.f1_leaderboard_column.update() - - self.page.show_snack_bar( - ft.SnackBar( - content=ft.Text("F1 Qualifying Stopped!"), bgcolor=ft.Colors.RED - ) - ) - self.page.update() - - def close_f1_dialog(self, e): - """Close F1 Qualifying dialog""" - if self.f1_dialog: - self.f1_dialog.open = False - self.page.update() - def open_beer_goggles(self, e): """Open Beer Goggles SDK viewer dialog""" self.goggles_dialog = ft.AlertDialog( diff --git a/tests/preview_overlay.py b/tests/preview_overlay.py new file mode 100644 index 0000000..c7fb0de --- /dev/null +++ b/tests/preview_overlay.py @@ -0,0 +1,796 @@ +#!/usr/bin/env python3 +""" +preview_overlay.py — Automated overlay simulation preview server +================================================================== + +Serves the overlay HTML/CSS/SSE stack with a **fully automated session +simulation** so you can preview every overlay state transition without +running the full Flet UI or connecting to iRacing. + +The simulation runs a complete F1-style qualifying event in a continuous loop: + + Pre-Q1 → Q1 (live) → Q1 (checkered + flying laps) → + Pre-Q2 → Q2 (live) → Q2 (checkered + flying laps) → + Pre-Q3 → Q3 (live) → Q3 (checkered + flying laps) → + final standings pause → (restart) + +Sub-session lengths (~30–60 s) and inter-session gaps (~12 s) are kept short +so the full demo loops in just a few minutes. All transitions are automatic. + +Live reload +----------- +Whenever ``overlay.html`` changes on disk, every connected browser tab +reloads automatically — no manual refresh needed. + +Usage +----- + python tests/preview_overlay.py + python tests/preview_overlay.py --port 9765 --width 1920 --height 1080 + python tests/preview_overlay.py --no-browser + +Then open http://localhost:9765/ in your browser or OBS browser source. +Press Ctrl+C to stop. +""" + +from __future__ import annotations + +import argparse +import json +import math +import queue +import random +import threading +import time +import webbrowser +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path + +# --------------------------------------------------------------------------- +# Paths (mirrors the layout expected by overlay_consumer_event.py) +# --------------------------------------------------------------------------- + +_THIS_DIR = Path(__file__).parent +_REPO_DIR = _THIS_DIR.parent +_OVERLAY_HTML = _REPO_DIR / "modules" / "flet_pages" / "overlays" / "overlay.html" +_FONTS_DIR = _OVERLAY_HTML.parent.parent / "fonts" + +# --------------------------------------------------------------------------- +# Driver roster +# --------------------------------------------------------------------------- +# 'speed' is each driver's offset (in seconds) above the fastest possible +# lap time. Lower speed → faster driver. + +_BASE_LAP = 83.0 # theoretical minimum lap time (seconds) + +DRIVERS: list[dict] = [ + {"car": "63", "name": "River Page", "speed": 0.000}, + {"car": "99", "name": "Giovanni Romano", "speed": 0.152}, + {"car": "16", "name": "Alex Marsh King", "speed": 0.281}, + {"car": "69", "name": "Tyler Agostino", "speed": 0.354}, + {"car": "10", "name": "Chris Bright", "speed": 0.482}, + {"car": "45", "name": "Tyler Carlton", "speed": 0.601}, + {"car": "119", "name": "Rognald Hotdognald", "speed": 0.714}, + {"car": "9", "name": "Brooks Clayton", "speed": 0.852}, + {"car": "017", "name": "Erik Ronnenberg", "speed": 0.923}, + {"car": "8", "name": "Jason L", "speed": 1.051}, + {"car": "243", "name": "Austin Tucker", "speed": 1.168}, + {"car": "42", "name": "Greg Beckman", "speed": 1.287}, + {"car": "13", "name": "Todd Madole", "speed": 1.381}, + {"car": "64", "name": "Mr. Hall", "speed": 1.474}, + {"car": "22", "name": "Boy Howdy", "speed": 1.562}, + {"car": "87", "name": "xXhalcy0n_SPNKr_94Xx", "speed": 2.103}, + {"car": "2", "name": "Ethan Conde", "speed": 2.248}, + {"car": "067", "name": "Alex Anderson", "speed": 2.401}, + {"car": "837", "name": "Sean Nelan", "speed": 2.553}, + {"car": "5", "name": "Mac Verstoopen", "speed": 2.801}, +] + +# --------------------------------------------------------------------------- +# Session configuration (short durations for a snappy demo loop) +# --------------------------------------------------------------------------- + +SESSION_CONFIG: list[dict] = [ + {"name": "Q1", "duration": 60, "advancing": 15}, # 20 cars → 15 advance + {"name": "Q2", "duration": 50, "advancing": 10}, # 15 cars → 10 advance + {"name": "Q3", "duration": 40, "advancing": 0}, # 10 cars → winner +] + +_ALL_SESSION_NAMES: list[str] = [s["name"] for s in SESSION_CONFIG] + +# Timing constants (seconds) +_PRE_SESSION_DURATION = 12 # countdown before each session +_POST_CHECKERED_PAUSE = 4 # standings shown after all drivers finish +_LOOP_RESTART_PAUSE = 6 # final standings held before the sim loops +_SIM_TICK = 0.5 # broadcast interval + +# --------------------------------------------------------------------------- +# Shared SSE client queues +# --------------------------------------------------------------------------- + +_rc_clients: list[queue.Queue] = [] +_f1_clients: list[queue.Queue] = [] +_reload_clients: list[queue.Queue] = [] +_clients_lock = threading.Lock() + +# Most-recent F1 state — sent immediately to newly connected clients. +_current_f1_state: dict = { + "session_name": "", + "time_remaining": "--:--", + "checkered_flag": False, + "sessions": [], + "drivers": [], +} + + +def _broadcast_f1(state: dict) -> None: + global _current_f1_state + _current_f1_state = state + payload = json.dumps(state) + with _clients_lock: + for q in list(_f1_clients): + q.put(payload) + + +def _broadcast_rc(title: str, text: str) -> None: + payload = json.dumps({"title": title, "text": text}) + with _clients_lock: + for q in list(_rc_clients): + q.put(payload) + + +def _broadcast_reload() -> None: + with _clients_lock: + for q in list(_reload_clients): + q.put("reload") + + +# --------------------------------------------------------------------------- +# Formatting helpers +# --------------------------------------------------------------------------- + + +def _fmt_time(seconds: float | None) -> str: + """Format a lap time (seconds) as 'MM:SS.mmm', or '' for None.""" + if seconds is None: + return "" + minutes = int(seconds // 60) + secs = seconds % 60 + return f"{minutes:02d}:{secs:06.3f}" + + +def _fmt_countdown(seconds: float) -> str: + """Format a countdown value (seconds) as 'MM:SS'.""" + s = max(0.0, seconds) + return f"{int(s // 60):02d}:{int(s % 60):02d}" + + +# --------------------------------------------------------------------------- +# Simulation helpers +# --------------------------------------------------------------------------- + + +def _gen_lap_time(driver: dict, session_idx: int) -> float: + """Return a plausible lap time for *driver* in the given session (0-based).""" + # Drivers tend to improve slightly as qualifying progresses. + improvement = session_idx * random.uniform(0.02, 0.20) + noise = random.gauss(0.0, 0.13) + return _BASE_LAP + driver["speed"] - improvement + noise + + +def _compute_status(pos: int, advancing: int) -> str: + """Return 'safe', 'at_risk', or 'elimination_zone' based on position vs. cutoff.""" + if advancing <= 0: + return "safe" + if pos > advancing: + return "elimination_zone" + if pos == advancing: + return "at_risk" + return "safe" + + +# --------------------------------------------------------------------------- +# Overlay state builder +# --------------------------------------------------------------------------- + + +def _build_overlay_state( + *, + session_name: str, + time_remaining: str, + checkered_flag: bool, + laptimes: dict[str, dict[str, float]], # {session_name: {car: seconds}} + knocked_out: set[str], + session_finishers: set[str], + active_session: str | None, # None during pre-session countdowns + advancing: int, # cars advancing from the active session (0 = final) +) -> dict: + """ + Build the complete overlay state dict suitable for JSON-broadcasting. + + active_session + The Qn name of the session currently running, or None if we are in a + pre-session countdown phase. Controls which session column is treated + as "live" for status/no_current_time calculations. + """ + + # ── Determine which session columns to display ───────────────────────── + # Show only sessions that already have at least one lap time recorded. + visible_sessions: list[str] = [sn for sn in _ALL_SESSION_NAMES if laptimes.get(sn)] + if not visible_sessions: + visible_sessions = [_ALL_SESSION_NAMES[0]] + + # Always include the active session column even before any laps are set, + # so that "No Time" placeholders appear from the first tick. + if active_session and active_session not in visible_sessions: + idx = _ALL_SESSION_NAMES.index(active_session) + visible_sessions = _ALL_SESSION_NAMES[: idx + 1] + + is_active_q = active_session is not None and not session_name.startswith("Pre-") + curr_times = laptimes.get(active_session, {}) if active_session else {} + + # ── Sort non-knocked-out drivers ─────────────────────────────────────── + # Primary: current-session lap time (fastest first; inf if no time yet) + # Secondary: best time in the most recent previous session (for tie-breaks) + active_cars: list[str] = [d["car"] for d in DRIVERS if d["car"] not in knocked_out] + + def _active_sort_key(car: str) -> tuple: + t_curr = curr_times.get(car, math.inf) + t_prev = math.inf + for sn in reversed(visible_sessions[:-1]): + t = laptimes.get(sn, {}).get(car) + if t is not None: + t_prev = t + break + return (t_curr, t_prev) + + active_cars.sort(key=_active_sort_key) + + # ── Build active driver rows ─────────────────────────────────────────── + drivers_out: list[dict] = [] + + for pos_0, car in enumerate(active_cars): + pos = pos_0 + 1 + info = next(x for x in DRIVERS if x["car"] == car) + + has_curr_time = car in curr_times + no_current_time = is_active_q and not has_curr_time + + # Status classification + if is_active_q and advancing > 0 and has_curr_time: + status = _compute_status(pos, advancing) + else: + status = "safe" + + # Best time shown in the timing tower + best_raw: float | None = curr_times.get(car) + if best_raw is None: + for sn in reversed(visible_sessions): + t = laptimes.get(sn, {}).get(car) + if t is not None: + best_raw = t + break + + session_times_fmt = { + sn: _fmt_time(laptimes.get(sn, {}).get(car)) for sn in visible_sessions + } + + drivers_out.append( + { + "position": pos, + "car_num": car, + "driver_name": info["name"], + "best_time": _fmt_time(best_raw), + "status": status, + "session_times": session_times_fmt, + "no_current_time": no_current_time, + "finished": car in session_finishers, + } + ) + + # ── Append knocked-out drivers at the bottom ─────────────────────────── + ko_cars: list[str] = [d["car"] for d in DRIVERS if d["car"] in knocked_out] + + def _ko_sort_key(car: str) -> float: + best = math.inf + for sn in _ALL_SESSION_NAMES: + t = laptimes.get(sn, {}).get(car) + if t is not None and t < best: + best = t + return best + + ko_cars.sort(key=_ko_sort_key) + + for ko_pos_0, car in enumerate(ko_cars): + info = next(x for x in DRIVERS if x["car"] == car) + best_raw = None + for sn in _ALL_SESSION_NAMES: + t = laptimes.get(sn, {}).get(car) + if t is not None and (best_raw is None or t < best_raw): + best_raw = t + + session_times_fmt = { + sn: _fmt_time(laptimes.get(sn, {}).get(car)) for sn in visible_sessions + } + + drivers_out.append( + { + "position": len(active_cars) + ko_pos_0 + 1, + "car_num": car, + "driver_name": info["name"], + "best_time": _fmt_time(best_raw), + "status": "knocked_out", + "session_times": session_times_fmt, + "no_current_time": False, + "finished": car in session_finishers, + } + ) + + return { + "session_name": session_name, + "time_remaining": time_remaining, + "checkered_flag": checkered_flag, + "sessions": visible_sessions, + "drivers": drivers_out, + } + + +# --------------------------------------------------------------------------- +# Main simulation loop +# --------------------------------------------------------------------------- + + +def _run_simulation() -> None: + """ + Drive the full qualifying session simulation indefinitely. + + Each iteration of the outer while-loop is one complete qualifying event: + Pre-Q1 countdown → Q1 live → Q1 checkered/flying laps → + Pre-Q2 countdown → Q2 live → Q2 checkered/flying laps → + Pre-Q3 countdown → Q3 live → Q3 checkered/flying laps → + final-standings pause → (loop) + """ + time.sleep(0.5) # brief pause so the HTTP server is fully up first + + while True: + # ── Per-loop state reset ─────────────────────────────────────────── + laptimes: dict[str, dict[str, float]] = {sn: {} for sn in _ALL_SESSION_NAMES} + knocked_out: set[str] = set() + eligible: list[str] | None = None # None → all 20 cars + + for sess_idx, sess_cfg in enumerate(SESSION_CONFIG): + sess_name = sess_cfg["name"] + duration = float(sess_cfg["duration"]) + advancing = sess_cfg["advancing"] + pre_name = f"Pre-{sess_name}" + + # Cars eligible for this session + session_cars: list[str] = ( + list(eligible) if eligible is not None else [d["car"] for d in DRIVERS] + ) + active_cars_this_sess = [c for c in session_cars if c not in knocked_out] + + # ── Pre-session countdown ────────────────────────────────────── + countdown_end = time.monotonic() + _PRE_SESSION_DURATION + while time.monotonic() < countdown_end: + state = _build_overlay_state( + session_name=pre_name, + time_remaining=_fmt_countdown(countdown_end - time.monotonic()), + checkered_flag=False, + laptimes=laptimes, + knocked_out=knocked_out, + session_finishers=set(), + active_session=None, + advancing=advancing, + ) + _broadcast_f1(state) + time.sleep(_SIM_TICK) + + # ── Schedule when each driver sets their lap times ───────────── + # Every active car gets a first lap somewhere between 20 %–90 % + # through the session. ~65 % of cars also set a second (faster) + # attempt in the latter half. + first_reveals: dict[str, float] = {} + second_reveals: dict[str, float] = {} + + for car in active_cars_this_sess: + first_reveals[car] = random.uniform(0.20, 0.90) * duration + if random.random() < 0.65: + t2 = random.uniform(0.55, 0.98) * duration + if t2 > first_reveals[car] + 2.0: + second_reveals[car] = t2 + + # ── Active session ───────────────────────────────────────────── + _broadcast_rc( + "Race Control", + "Pit Exit OPEN!", + ) + + sess_start = time.monotonic() + session_finishers: set[str] = set() + + while True: + elapsed = time.monotonic() - sess_start + remaining = max(0.0, duration - elapsed) + out_of_time = elapsed >= duration + + # Reveal first-lap times as each driver's scheduled moment arrives. + for car in list(first_reveals): + if elapsed >= first_reveals[car]: + drv = next(d for d in DRIVERS if d["car"] == car) + laptimes[sess_name][car] = _gen_lap_time(drv, sess_idx) + del first_reveals[car] + + # Reveal second (potentially improved) lap times. + for car in list(second_reveals): + if elapsed >= second_reveals[car]: + drv = next(d for d in DRIVERS if d["car"] == car) + new_t = _gen_lap_time(drv, sess_idx) + existing = laptimes[sess_name].get(car, math.inf) + if new_t < existing: + laptimes[sess_name][car] = new_t + del second_reveals[car] + + state = _build_overlay_state( + session_name=sess_name, + time_remaining=_fmt_countdown(remaining), + checkered_flag=out_of_time, + laptimes=laptimes, + knocked_out=knocked_out, + session_finishers=session_finishers, + active_session=sess_name, + advancing=advancing, + ) + _broadcast_f1(state) + + if out_of_time: + _broadcast_rc( + "Race Control", + "Checkered Flag", + ) + break + + time.sleep(_SIM_TICK) + + # ── Checkered-flag phase: drivers complete their flying laps ─── + # Any car that never set a time gets one now (they were still out). + for car in active_cars_this_sess: + if car not in laptimes[sess_name]: + drv = next(d for d in DRIVERS if d["car"] == car) + laptimes[sess_name][car] = _gen_lap_time(drv, sess_idx) + + # Stagger when each car crosses the line over the next few seconds. + pending_finish: set[str] = set(active_cars_this_sess) + checkered_window = max(4.0, len(pending_finish) * 0.6) + finish_at: dict[str, float] = { + car: time.monotonic() + random.uniform(0.3, checkered_window) + for car in pending_finish + } + phase_deadline = time.monotonic() + checkered_window + 1.0 + + while pending_finish and time.monotonic() < phase_deadline: + now = time.monotonic() + for car in list(pending_finish): + if now >= finish_at[car]: + session_finishers.add(car) + pending_finish.discard(car) + + state = _build_overlay_state( + session_name=sess_name, + time_remaining="00:00", + checkered_flag=True, + laptimes=laptimes, + knocked_out=knocked_out, + session_finishers=session_finishers, + active_session=sess_name, + advancing=advancing, + ) + _broadcast_f1(state) + time.sleep(_SIM_TICK) + + # Ensure every active car is marked finished before moving on. + session_finishers.update(active_cars_this_sess) + + # ── Process results — determine who advances ─────────────────── + session_results: list[tuple[str, float]] = sorted( + laptimes[sess_name].items(), key=lambda x: x[1] + ) + + if advancing > 0: + advancing_cars = [car for car, _ in session_results[:advancing]] + eliminated = [car for car, _ in session_results[advancing:]] + + for car in eliminated: + knocked_out.add(car) + # Any eligible car with no recorded time is also eliminated. + for car in active_cars_this_sess: + if car not in knocked_out and car not in laptimes[sess_name]: + knocked_out.add(car) + + eligible = advancing_cars + else: + # Final session — rank everyone, nobody is eliminated. + eligible = [car for car, _ in session_results] + + # ── Post-checkered pause — show the all-done standings board ─── + pause_end = time.monotonic() + _POST_CHECKERED_PAUSE + while time.monotonic() < pause_end: + state = _build_overlay_state( + session_name=sess_name, + time_remaining="00:00", + checkered_flag=True, + laptimes=laptimes, + knocked_out=knocked_out, + session_finishers=session_finishers, + active_session=sess_name, + advancing=advancing, + ) + _broadcast_f1(state) + time.sleep(_SIM_TICK) + + # ── End of full event — hold final standings, then restart ───────── + final_sess = _ALL_SESSION_NAMES[-1] + loop_end = time.monotonic() + _LOOP_RESTART_PAUSE + while time.monotonic() < loop_end: + state = _build_overlay_state( + session_name=final_sess, + time_remaining="00:00", + checkered_flag=True, + laptimes=laptimes, + knocked_out=knocked_out, + session_finishers=session_finishers, + active_session=final_sess, + advancing=0, + ) + _broadcast_f1(state) + time.sleep(_SIM_TICK) + + # Brief blank gap before the next loop so the restart is perceptible. + _broadcast_f1( + { + "session_name": "Pre-Q1", + "time_remaining": "--:--", + "checkered_flag": False, + "sessions": [_ALL_SESSION_NAMES[0]], + "drivers": [], + } + ) + time.sleep(1.5) + + +# --------------------------------------------------------------------------- +# File watcher — live reload on overlay.html changes +# --------------------------------------------------------------------------- + + +def _file_watcher(watched_paths: list[Path]) -> None: + """Reload all connected browser tabs whenever a watched file changes.""" + mtimes: dict[Path, float] = { + p: (p.stat().st_mtime if p.exists() else 0.0) for p in watched_paths + } + while True: + time.sleep(1.0) + for p in watched_paths: + if not p.exists(): + continue + mtime = p.stat().st_mtime + if mtime != mtimes[p]: + mtimes[p] = mtime + print(f" [live-reload] {p.name} changed — refreshing browsers") + _broadcast_reload() + break # one reload per tick is enough + + +# --------------------------------------------------------------------------- +# HTTP handler +# --------------------------------------------------------------------------- + +_LIVE_RELOAD_SNIPPET = """ + +""" + + +class _Server(ThreadingHTTPServer): + allow_reuse_address = True + daemon_threads = True + + +class PreviewHandler(BaseHTTPRequestHandler): + """Request handler for the preview server.""" + + # Class-level config — patched in main() before the server starts. + port: int = 9765 + width: int = 1920 + height: int = 1080 + + def do_GET(self) -> None: # noqa: N802 + path = self.path.split("?")[0] + + match path: + case "/sse/rc": + self._handle_sse(_rc_clients, initial_payload=None) + case "/sse/f1": + self._handle_sse( + _f1_clients, + initial_payload=json.dumps(_current_f1_state), + ) + case "/sse/reload": + self._handle_sse(_reload_clients, initial_payload=None) + case _ if path.startswith("/static/fonts/"): + font_name = path[len("/static/fonts/") :] + self._serve_file(_FONTS_DIR / font_name, "font/truetype") + case _: + self._serve_html(_OVERLAY_HTML) + + # ------------------------------------------------------------------ # + # SSE # + # ------------------------------------------------------------------ # + + def _handle_sse(self, client_list: list, initial_payload: str | None) -> None: + """Hold the connection open and stream Server-Sent Events.""" + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.send_header("Connection", "keep-alive") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + client_q: queue.Queue = queue.Queue() + if initial_payload is not None: + client_q.put(initial_payload) + + with _clients_lock: + client_list.append(client_q) + + try: + while True: + try: + data = client_q.get(timeout=15) + self.wfile.write(f"data: {data}\n\n".encode()) + self.wfile.flush() + except queue.Empty: + # Keepalive comment — prevents the browser from closing the stream. + self.wfile.write(b": ping\n\n") + self.wfile.flush() + except Exception: + pass + finally: + with _clients_lock: + if client_q in client_list: + client_list.remove(client_q) + + # ------------------------------------------------------------------ # + # Static / HTML # + # ------------------------------------------------------------------ # + + def _serve_html(self, file_path: Path) -> None: + if not file_path.exists(): + self.send_error(404, f"Overlay not found: {file_path.name}") + return + content = ( + file_path.read_text(encoding="utf-8") + .replace("{{WIDTH}}", str(self.width)) + .replace("{{HEIGHT}}", str(self.height)) + .replace("{{PORT}}", str(self.port)) + ) + # Inject the live-reload listener just before . + body = content.replace("", _LIVE_RELOAD_SNIPPET + "", 1) + encoded = body.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + def _serve_file(self, file_path: Path, mime: str) -> None: + if not file_path.exists(): + self.send_error(404, f"Not found: {file_path.name}") + return + data = file_path.read_bytes() + self.send_response(200) + self.send_header("Content-Type", mime) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + def log_message(self, format, *args): # noqa: N802, A002 + pass # suppress per-request access-log noise + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + total_demo_secs = ( + sum( + _PRE_SESSION_DURATION + s["duration"] + _POST_CHECKERED_PAUSE + for s in SESSION_CONFIG + ) + + _LOOP_RESTART_PAUSE + ) + + parser = argparse.ArgumentParser( + description="Automated overlay simulation preview — no iRacing connection needed.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--port", type=int, default=9765, help="HTTP port (default 9765)" + ) + parser.add_argument( + "--width", type=int, default=1920, help="Overlay canvas width (default 1920)" + ) + parser.add_argument( + "--height", type=int, default=1080, help="Overlay canvas height (default 1080)" + ) + parser.add_argument( + "--no-browser", action="store_true", help="Don't auto-open a browser tab" + ) + args = parser.parse_args() + + PreviewHandler.port = args.port + PreviewHandler.width = args.width + PreviewHandler.height = args.height + + server = _Server(("", args.port), PreviewHandler) + + # Background simulation thread + threading.Thread( + target=_run_simulation, + daemon=True, + name="preview-sim", + ).start() + + # Live-reload file watcher + watched = [_OVERLAY_HTML] if _OVERLAY_HTML.exists() else [] + if watched: + threading.Thread( + target=_file_watcher, + args=(watched,), + daemon=True, + name="preview-watcher", + ).start() + + url = f"http://localhost:{args.port}/" + sep = "─" * 62 + print(f"\n {sep}") + print(f" Overlay simulation preview server") + print(f" {sep}") + print(f" Open: {url}") + print(f" {sep}") + print(f" Session flow (loops automatically every ~{total_demo_secs:.0f} s):") + for s in SESSION_CONFIG: + cars_in = ( + len(DRIVERS) + if s == SESSION_CONFIG[0] + else SESSION_CONFIG[SESSION_CONFIG.index(s) - 1]["advancing"] + ) + adv_str = f"→ {s['advancing']} advance" if s["advancing"] else "→ final" + print( + f" Pre-{s['name']} ({_PRE_SESSION_DURATION}s) " + f"{s['name']} live ({s['duration']}s, {cars_in} cars, {adv_str})" + ) + print(f" Live reload: active (edit overlay.html to trigger)") + print(f" {sep}") + print(f" Press Ctrl+C to stop.\n") + + if not args.no_browser: + threading.Timer(0.6, webbrowser.open, args=(url,)).start() + + try: + server.serve_forever() + except KeyboardInterrupt: + print("\n Stopping preview server.") + server.shutdown() + server.server_close() + + +if __name__ == "__main__": + main()