diff --git a/plugins/youtubearr/plugin.json b/plugins/youtubearr/plugin.json index db49bce..42d29a7 100644 --- a/plugins/youtubearr/plugin.json +++ b/plugins/youtubearr/plugin.json @@ -1,6 +1,6 @@ { "name": "YouTubearr", - "version": "1.20.0", + "version": "1.20.3", "description": "Zero-dependency YouTube livestream plugin with automatic monitoring and configurable numbering", "author": "jeff-gooch", "license": "Unlicense", diff --git a/plugins/youtubearr/plugin.py b/plugins/youtubearr/plugin.py index f7e5cce..47572f8 100644 --- a/plugins/youtubearr/plugin.py +++ b/plugins/youtubearr/plugin.py @@ -24,7 +24,7 @@ class Plugin: name = "YouTubearr" - version = "1.20.0" + version = "1.20.3" description = "Zero-dependency YouTube livestream plugin with automatic monitoring and configurable numbering" author = "Jeff Gooch" help_url = "https://github.com/jeff-gooch/youtubearr" @@ -156,13 +156,7 @@ class Plugin: "id": "info_webhook", "label": "Webhooks", "type": "info", - "description": "Trigger external services when channels are added or removed, and send notifications for new streams.", - }, - { - "id": "info_generic_webhooks", - "label": "Generic Webhooks", - "type": "info", - "description": "Configure generic webhook endpoints for media refresh and stream notifications.", + "description": "Trigger external services when channels are added or removed, and send notifications for new streams. Legacy webhook fields from previous versions are still honored internally — see the README for migration notes.", }, { "id": "media_refresh_webhook_url", @@ -180,20 +174,6 @@ class Plugin: "max": 60, "help_text": "Delay before sending the media refresh webhook to allow Dispatcharr to finish processing (default: 5 seconds).", }, - { - "id": "media_refresh_webhook_headers", - "label": "Media Refresh Webhook Headers", - "type": "string", - "default": "", - "help_text": "Optional JSON object of extra request headers (e.g., {\"Authorization\": \"Bearer TOKEN\"}). Leave empty for no extra headers.", - }, - { - "id": "media_refresh_webhook_body_template", - "label": "Media Refresh Webhook Body Template", - "type": "text", - "default": "", - "help_text": "Optional static JSON or text body to send. Leave empty to use the default event payload.", - }, { "id": "notification_webhook_url", "label": "Notification Webhook URL", @@ -208,49 +188,6 @@ class Plugin: "default": "", "help_text": "Base URL for Dispatcharr stream links in the notification payload (e.g., https://tv.example.com). Used to build stream URLs like {base_url}/proxy/ts/stream/{uuid}.", }, - { - "id": "notification_webhook_headers", - "label": "Notification Webhook Headers", - "type": "string", - "default": "", - "help_text": "Optional JSON object of extra request headers. Leave empty for no extra headers.", - }, - { - "id": "info_legacy_webhooks", - "label": "Legacy Webhook Aliases", - "type": "info", - "description": "These fields are kept for backward compatibility. Prefer the generic webhook fields above for new setups.", - }, - { - "id": "webhook_url", - "label": "Webhook URL (Legacy — media refresh alias, bodyless POST)", - "type": "string", - "default": "", - "help_text": "Legacy media refresh alias. Sends a bodyless POST (original Jellyfin-style). Use 'Media Refresh Webhook URL' above for new setups.", - }, - { - "id": "webhook_delay_seconds", - "label": "Webhook Delay (Legacy — media refresh delay alias)", - "type": "number", - "default": 5, - "min": 0, - "max": 60, - "help_text": "Legacy media refresh delay alias. Use 'Media Refresh Webhook Delay' above for new setups.", - }, - { - "id": "telegram_webhook_url", - "label": "Telegram Notification URL (Legacy — notification alias)", - "type": "string", - "default": "", - "help_text": "Legacy notification alias. Preserves the original Telegram payload shape for compatibility. Use 'Notification Webhook URL' above for new setups.", - }, - { - "id": "dispatcharr_base_url", - "label": "Dispatcharr Base URL (Legacy — notification base URL alias)", - "type": "string", - "default": "", - "help_text": "Legacy notification base URL alias. Use 'Notification Base URL' above for new setups.", - }, { "id": "info_epg", "label": "EPG Settings", @@ -637,6 +574,12 @@ def _handle_start_monitoring(self, context: Dict[str, Any]) -> Dict[str, Any]: "message": "yt-dlp not found (bundled version may not be working). Check logs.", } + # Fast path: local thread is alive — no DB round-trip needed + thread_alive = bool(self._monitor_thread and self._monitor_thread.is_alive()) + if thread_alive: + self._log("Monitoring already active (local thread alive)") + return {"status": "running", "message": "Monitoring already active"} + # Read fresh settings from DB to avoid stale state try: cfg = PluginConfig.objects.get(key=self._plugin_key) @@ -644,50 +587,34 @@ def _handle_start_monitoring(self, context: Dict[str, Any]) -> Dict[str, Any]: except PluginConfig.DoesNotExist: settings = context.get("settings", {}) - # Check if already active (use both DB flag AND heartbeat from any worker) - # Each Celery worker has its own Plugin instance, so we can't rely on self._monitor_thread - # Instead, check the heartbeat to see if ANY thread is actively running - thread_alive = self._monitor_thread and self._monitor_thread.is_alive() - if settings.get("monitoring_active"): - # Check heartbeat to see if another worker's thread is running - heartbeat_str = settings.get("monitoring_heartbeat") - if heartbeat_str: - try: - heartbeat = datetime.fromisoformat(heartbeat_str.replace("Z", "+00:00")) - if isinstance(heartbeat.tzinfo, type(None)): - heartbeat = heartbeat.replace(tzinfo=dt_timezone.utc) - age_seconds = (datetime.now(dt_timezone.utc) - heartbeat).total_seconds() - poll_interval_minutes = settings.get("poll_interval_minutes", 15) - heartbeat_threshold = (poll_interval_minutes + 10) * 60 - if age_seconds < heartbeat_threshold: - self._log(f"Monitoring already active (heartbeat {int(age_seconds)}s ago)") - return {"status": "running", "message": "Monitoring already active"} - except (ValueError, TypeError): - pass - - # Also check local thread - if thread_alive: + if self._is_heartbeat_recent(settings) and self._is_last_poll_recent(settings): + self._log("Monitoring already active (fresh heartbeat and recent poll from running worker)") return {"status": "running", "message": "Monitoring already active"} + if self._is_starting_recent(settings): + self._log("Monitoring already being claimed by another worker (fresh monitoring_starting_at)") + return {"status": "running", "message": "Monitoring already starting"} + self._log("monitoring_active=True but health stale (heartbeat or last_poll) — will restart") monitored = settings.get("monitored_channels", "").strip() if not monitored: return {"status": "error", "message": "No channels to monitor. Add channel IDs/URLs in settings."} - # Set in-memory flag BEFORE starting thread (prevents race with Dispatcharr form saves) + # Atomically claim the monitor slot under a DB row lock. + # This prevents a second Dispatcharr worker from starting a duplicate thread + # in the window between this check and the thread's first heartbeat write. + claim = self._try_claim_monitor() + if claim == "already_running": + return {"status": "running", "message": "Monitoring already active"} + if claim == "already_starting": + return {"status": "running", "message": "Monitoring already starting"} + + # We hold the claim — start the thread self._monitoring_active = True self._monitor_stop_event.clear() self._extraction_failures.clear() # Fresh start: retry any previously-failed extractions + self._persist_settings({"extraction_failures": {}}) - # Update settings in DB - updates = { - "monitoring_active": True, - "last_poll_time": timezone.now().isoformat(), - "extraction_failures": {}, - } - self._persist_settings(updates) - - # Start monitoring thread AFTER persisting settings self._monitor_thread = threading.Thread( target=self._monitoring_loop, args=(self._plugin_key,), @@ -712,10 +639,11 @@ def _handle_stop_monitoring(self, context: Dict[str, Any]) -> Dict[str, Any]: return {"status": "stopped", "message": "Monitoring not active"} # Step 1: Set DB flag FIRST - this is what threads in other workers will see - # Also clear heartbeat so Start Monitoring doesn't think a thread is still running + # Clear heartbeat and starting_at so Start can work cleanly after stop updates = { "monitoring_active": False, "monitoring_heartbeat": None, + "monitoring_starting_at": None, } self._persist_settings(updates) @@ -748,17 +676,50 @@ def _handle_refresh(self, context: Dict[str, Any]) -> Dict[str, Any]: except PluginConfig.DoesNotExist: settings = context.get("settings", {}) - # If monitoring is active, the background thread is already polling on its schedule. + # If monitoring is active, check whether the thread is actually running before deciding what to do. # Running a second concurrent poll blocks the HTTP thread for minutes and causes # 504 Gateway Timeouts — especially with many tracked streams. if settings.get("monitoring_active"): - poll_interval = settings.get("poll_interval_minutes", 15) - last_poll = settings.get("last_poll_time", "") - last_poll_display = last_poll[:19].replace("T", " ") if last_poll else "unknown" - return { - "status": "success", - "message": f"Monitoring is active (polling every {poll_interval} min). Last poll: {last_poll_display}. No manual refresh needed.", - } + thread_alive = bool(self._monitor_thread and self._monitor_thread.is_alive()) + heartbeat_recent = self._is_heartbeat_recent(settings) + last_poll_recent = self._is_last_poll_recent(settings) + + # Local thread alive → trust it unconditionally (it's this worker's own thread). + # Another worker healthy (fresh heartbeat + fresh last_poll) → also safe to skip. + # Do NOT skip on heartbeat alone when local thread is dead and last_poll is stale: + # the heartbeat may be a ghost from the process that died at restart. + if thread_alive or (heartbeat_recent and last_poll_recent): + # Monitoring is genuinely running — return informative status, no duplicate poll needed. + poll_interval = settings.get("poll_interval_minutes", 15) + last_poll = settings.get("last_poll_time", "") + last_poll_display = last_poll[:19].replace("T", " ") if last_poll else "unknown" + heartbeat_str = settings.get("monitoring_heartbeat", "") + heartbeat_display = heartbeat_str[:19].replace("T", " ") if heartbeat_str else "unknown" + return { + "status": "success", + "message": ( + f"Monitoring is active (polling every {poll_interval} min). " + f"Last poll: {last_poll_display}. Heartbeat: {heartbeat_display}. " + "No manual refresh needed." + ), + } + + # monitoring_active=True but local thread dead and (heartbeat stale OR last_poll stale). + # Check for a concurrent claim before trying to restart ourselves. + if self._is_starting_recent(settings): + return { + "status": "running", + "message": "Monitoring is starting (just claimed by another process).", + } + restarted = self._ensure_monitoring_thread(settings) + if restarted: + return { + "status": "running", + "message": "Monitoring was marked active but was not running; restarted monitoring.", + } + # _ensure_monitoring_thread couldn't restart (no channels configured, another worker holds + # a fresh heartbeat, etc.) — fall through to the one-shot manual refresh below so EPG + # and stream state are still refreshed promptly. if not self._manual_refresh_lock.acquire(blocking=False): return {"status": "info", "message": "A manual refresh is already in progress — check logs for progress."} @@ -836,6 +797,8 @@ def _handle_reset_all(self, context: Dict[str, Any]) -> Dict[str, Any]: # (In-place modification of JSONField may not trigger save properly) new_settings = dict(cfg.settings or {}) new_settings["monitoring_active"] = False + new_settings["monitoring_heartbeat"] = None + new_settings["monitoring_starting_at"] = None new_settings["tracked_streams"] = {} # Clear immediately to prevent re-adds cfg.settings = new_settings # Assign new dict object cfg.save(update_fields=["settings", "updated_at"]) @@ -921,7 +884,10 @@ def _handle_diagnostics(self, context: Dict[str, Any]) -> Dict[str, Any]: thread_alive = bool(self._monitor_thread and self._monitor_thread.is_alive()) details["monitor_thread_alive"] = thread_alive details["last_poll_time"] = settings.get("last_poll_time") or "unknown" + _lpa = self._age_seconds(settings.get("last_poll_time")) + details["last_poll_age_seconds"] = int(_lpa) if _lpa is not None else None details["monitoring_heartbeat"] = settings.get("monitoring_heartbeat") or "unknown" + details["monitoring_starting_at"] = settings.get("monitoring_starting_at") or "unknown" if monitoring_active_db and not thread_alive: heartbeat_str = settings.get("monitoring_heartbeat") @@ -937,6 +903,18 @@ def _handle_diagnostics(self, context: Dict[str, Any]) -> Dict[str, Any]: else: issues.append("warning:monitoring active but no heartbeat found") + # Stale-poll warning — active but last_poll is beyond the expected cycle window + if monitoring_active_db and not self._is_last_poll_recent(settings): + _poll_age_str = f"{int(_lpa)}s" if _lpa is not None else "never" + issues.append(f"warning:monitoring active but last poll is stale (age={_poll_age_str})") + + # EPG window counts — surface current/future program counts for the YouTubearr source + _epg_window = self._get_youtubearr_epg_window_counts(settings) + details["epg_window_counts"] = _epg_window + if monitoring_active_db and _epg_window.get("source_found"): + if _epg_window.get("current", 0) == 0 and _epg_window.get("future12", 0) == 0: + issues.append("warning:YouTubearr EPG source has no current or future programs — monitor may need refresh") + # Monitored channels / tracked streams monitored_raw = settings.get("monitored_channels", "").strip() details["monitored_channel_count"] = ( @@ -1038,6 +1016,41 @@ def _handle_diagnostics(self, context: Dict[str, Any]) -> Dict[str, Any]: if stale_count > 0: issues.append(f"warning:{stale_count} live stream URL(s) are stale (monitor may not be refreshing URLs)") + # Orphaned and stale-EPG tracked entries (best-effort — DB required) + orphaned_tracked_count = 0 + stale_epg_tracked_count = 0 + try: + _diag_now = datetime.now(dt_timezone.utc) + for _vid, _sd in tracked_streams.items(): + if not _sd.get("is_live"): + continue + _cid = _sd.get("channel_id") + if not _cid: + orphaned_tracked_count += 1 + continue + try: + _ch = Channel.objects.get(id=_cid) + if _ch.epg_data: + _prog = ProgramData.objects.filter(epg=_ch.epg_data).first() + if _prog is not None and _prog.end_time is not None: + _end = _prog.end_time + if getattr(_end, "tzinfo", None) is None: + _end = _end.replace(tzinfo=dt_timezone.utc) + if _end < _diag_now: + stale_epg_tracked_count += 1 + except Channel.DoesNotExist: + orphaned_tracked_count += 1 + except Exception: + pass + except Exception: + pass + details["orphaned_tracked_count"] = orphaned_tracked_count + details["stale_epg_tracked_count"] = stale_epg_tracked_count + if orphaned_tracked_count > 0: + issues.append(f"warning:{orphaned_tracked_count} tracked stream(s) point to missing channels (run Cleanup)") + if stale_epg_tracked_count > 0: + issues.append(f"warning:{stale_epg_tracked_count} tracked-live stream(s) have expired EPG data (may be stale)") + # Recent log summary details["log_summary"] = self._get_recent_log_summary() @@ -2658,6 +2671,27 @@ def _refresh_epg_times(self, settings: Dict[str, Any]) -> int: ) if updated > 0: refreshed_count += 1 + else: + # No ProgramData row exists — create one so the EPG window stays current. + # This repairs channels whose program entry was deleted externally or + # expired after a long monitoring outage. + channel_tvg_id = channel.tvg_id or str(channel.channel_number) + title = stream_data.get("title", "YouTube Live") + try: + ProgramData.objects.update_or_create( + epg=channel.epg_data, + tvg_id=channel_tvg_id, + defaults={ + "title": title, + "description": title, + "start_time": prog_now, + "end_time": prog_now + timedelta(hours=12), + } + ) + refreshed_count += 1 + self._log(f"Created missing EPG program for channel {channel_id} (tvg_id={channel_tvg_id})") + except Exception as create_exc: + self._log_error(f"Failed to create EPG program for channel {channel_id}: {create_exc}") except Channel.DoesNotExist: pass except Exception as exc: @@ -2668,7 +2702,13 @@ def _refresh_epg_times(self, settings: Dict[str, Any]) -> int: # --- Cleanup --- def _cleanup_ended_streams(self, settings: Dict[str, Any], force: bool = False) -> int: - """Remove channels for ended streams. Returns count of cleaned channels""" + """Remove channels for ended/stale streams and orphaned tracking entries. + + Returns count of Dispatcharr channels actually deleted. + Also removes orphaned tracking entries (is_live=True but channel missing) and + stale-live entries (is_live=True, EPG ended, and direct verify confirms not live). + Persists tracked_streams whenever any entries are removed. + """ tracked_streams = settings.get("tracked_streams", {}) auto_cleanup = settings.get("auto_cleanup", True) @@ -2679,10 +2719,13 @@ def _cleanup_ended_streams(self, settings: Dict[str, Any], force: bool = False) to_remove = [] for video_id, stream_data in tracked_streams.items(): - if not stream_data.get("is_live") or force: + is_live = stream_data.get("is_live") + channel_id = stream_data.get("channel_id") + stream_id = stream_data.get("stream_id") + + if not is_live or force: + # Normal ended stream cleanup (or force) try: - # Delete Channel - channel_id = stream_data.get("channel_id") if channel_id: try: channel = Channel.objects.get(id=channel_id) @@ -2690,10 +2733,8 @@ def _cleanup_ended_streams(self, settings: Dict[str, Any], force: bool = False) cleaned_count += 1 self._log(f"Deleted channel: {stream_data.get('title')}") except Channel.DoesNotExist: - pass + pass # Already gone; still remove tracking entry below - # Delete Stream (if not used by other channels) - stream_id = stream_data.get("stream_id") if stream_id: try: stream = Stream.objects.get(id=stream_id) @@ -2707,12 +2748,75 @@ def _cleanup_ended_streams(self, settings: Dict[str, Any], force: bool = False) except Exception as exc: self._log_error(f"Cleanup failed for {video_id}: {exc}") + else: + # is_live=True — check for orphan or stale EPG condition + + # Orphan check: channel no longer exists in the database + if not channel_id: + self._log(f"Removing orphaned tracking entry: no channel_id (video {video_id})") + to_remove.append(video_id) + continue + + try: + channel = Channel.objects.get(id=channel_id) + except Channel.DoesNotExist: + self._log(f"Removing orphaned tracking entry: channel {channel_id} missing (video {video_id})") + to_remove.append(video_id) + continue + except Exception: + continue # DB error — fail safe, keep tracking entry + + # Stale EPG check: EPG programme end time is in the past + epg_ended = False + try: + if channel.epg_data: + prog = ProgramData.objects.filter(epg=channel.epg_data).first() + if prog is not None and prog.end_time is not None: + _now = datetime.now(dt_timezone.utc) + _end = prog.end_time + if getattr(_end, "tzinfo", None) is None: + _end = _end.replace(tzinfo=dt_timezone.utc) + if _end < _now: + epg_ended = True + except Exception: + pass # EPG unavailable — fail safe, don't treat as stale + + if not epg_ended: + continue + + # EPG end time is in the past — do a direct live verification + # _verify_video_is_live fails safe: returns True (assume live) on any error + title = stream_data.get("title", video_id) + self._log(f"EPG ended for still-live entry: {title} — verifying via yt-dlp") + if self._verify_video_is_live(video_id): + self._log(f"Confirmed still live (EPG lag): {title}") + continue + + # Confirmed not live — clean up the stale entry + self._log(f"Confirmed stale-live stream ended: {title}") + try: + channel.delete() + cleaned_count += 1 + self._log(f"Deleted stale channel: {title}") + except Exception as exc: + self._log_error(f"Failed to delete stale channel {channel_id}: {exc}") + + if stream_id: + try: + stream = Stream.objects.get(id=stream_id) + if not stream.channelstream_set.exists(): + stream.delete() + except Stream.DoesNotExist: + pass + + to_remove.append(video_id) + # Remove from tracked streams for video_id in to_remove: del tracked_streams[video_id] - # Persist updates - if cleaned_count > 0: + # Persist whenever any entries were removed (not just when channels were deleted) + if to_remove: self._persist_settings({"tracked_streams": tracked_streams}) return cleaned_count @@ -2723,6 +2827,18 @@ def _monitoring_loop(self, plugin_key: str) -> None: """Background monitoring loop (runs in daemon thread)""" self._log("Monitoring loop started") + # Write the initial heartbeat immediately and clear the starting_at lease marker. + # This closes the race window between _try_claim_monitor() writing monitoring_starting_at + # and the first heartbeat update inside the poll loop: any concurrent Start/Refresh that + # reads after this point will see a fresh heartbeat and correctly return "already active". + try: + self._persist_settings({ + "monitoring_heartbeat": timezone.now().isoformat(), + "monitoring_starting_at": None, + }) + except Exception as _hb_exc: + self._log_error(f"Failed to write initial heartbeat: {_hb_exc}") + try: # Restore extraction failures that survived from before last container restart try: @@ -3206,12 +3322,144 @@ def _cleanup_legacy_celery_task(self) -> None: except Exception as exc: self._log_error(f"Legacy Celery task cleanup failed: {exc}") + def _is_heartbeat_recent(self, settings: Dict[str, Any]) -> bool: + """Return True if the monitoring heartbeat is within the stale threshold.""" + heartbeat_str = settings.get("monitoring_heartbeat") + if not heartbeat_str: + return False + try: + heartbeat = datetime.fromisoformat(heartbeat_str.replace("Z", "+00:00")) + if heartbeat.tzinfo is None: + heartbeat = heartbeat.replace(tzinfo=dt_timezone.utc) + age_seconds = (datetime.now(dt_timezone.utc) - heartbeat).total_seconds() + poll_interval_minutes = settings.get("poll_interval_minutes", 15) + threshold = (poll_interval_minutes + 10) * 60 + return age_seconds < threshold + except (ValueError, TypeError): + return False + + def _is_starting_recent(self, settings: Dict[str, Any]) -> bool: + """Return True if monitoring_starting_at is within 60 seconds. + + A fresh monitoring_starting_at means another worker just claimed the monitor + slot via _try_claim_monitor but the thread may not have written its first + heartbeat yet. Callers should treat this as "already starting" and not launch + a duplicate thread. + """ + starting_str = settings.get("monitoring_starting_at") + if not starting_str: + return False + try: + starting_at = datetime.fromisoformat(starting_str.replace("Z", "+00:00")) + if starting_at.tzinfo is None: + starting_at = starting_at.replace(tzinfo=dt_timezone.utc) + age_seconds = (datetime.now(dt_timezone.utc) - starting_at).total_seconds() + return age_seconds < 60 + except (ValueError, TypeError): + return False + + def _parse_iso_datetime(self, value) -> Optional[datetime]: + """Parse an ISO-8601 datetime string safely. Returns None on any failure.""" + if not value or not isinstance(value, str): + return None + try: + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=dt_timezone.utc) + return dt + except (ValueError, TypeError): + return None + + def _age_seconds(self, value) -> Optional[float]: + """Return age in seconds of an ISO-8601 timestamp, or None if unparseable.""" + dt = self._parse_iso_datetime(value) + if dt is None: + return None + return max(0.0, (datetime.now(dt_timezone.utc) - dt).total_seconds()) + + def _is_last_poll_recent(self, settings: Dict[str, Any]) -> bool: + """Return True if last_poll_time is within (poll_interval + 10 min) — same grace as heartbeat.""" + age = self._age_seconds(settings.get("last_poll_time")) + if age is None: + return False # Never polled or unparseable + poll_interval_minutes = settings.get("poll_interval_minutes", 15) + threshold = (poll_interval_minutes + 10) * 60 + return age < threshold + + def _get_youtubearr_epg_window_counts(self, settings: Dict[str, Any]) -> Dict[str, Any]: + """Return current/future-12h program counts for the configured EPG source. + + Safe fallback: returns zeros and source_found=False on any DB/import error. + """ + result: Dict[str, Any] = {"current": 0, "future12": 0, "source_found": False} + epg_source_name = settings.get("epg_source_name", "YouTube Live").strip() + if not epg_source_name: + return result + try: + source = EPGSource.objects.filter(name=epg_source_name).first() + if not source: + return result + result["source_found"] = True + now = datetime.now(dt_timezone.utc) + future12 = now + timedelta(hours=12) + result["current"] = ProgramData.objects.filter( + epg__epg_source=source, + start_time__lte=now, + end_time__gt=now, + ).count() + result["future12"] = ProgramData.objects.filter( + epg__epg_source=source, + end_time__gt=now, + start_time__lt=future12, + ).count() + except Exception: + pass + return result + + def _try_claim_monitor(self) -> str: + """Atomically claim the monitor owner slot under a DB row lock. + + Reads the current settings under select_for_update(), checks that no other + worker already owns the monitor (via heartbeat or starting_at), and if clear + writes monitoring_active=True and monitoring_starting_at= as an + immediate lease. The monitoring loop clears monitoring_starting_at and + writes monitoring_heartbeat as soon as it starts. + + Returns one of: "already_running", "already_starting", "claimed". + On any DB exception falls back to "claimed" (optimistic — better a second + thread than a permanent deadlock). + """ + try: + with transaction.atomic(): + try: + cfg = PluginConfig.objects.select_for_update().get(key=self._plugin_key) + except PluginConfig.DoesNotExist: + return "claimed" + + s = dict(cfg.settings or {}) + + if s.get("monitoring_active"): + if self._is_heartbeat_recent(s) and self._is_last_poll_recent(s): + return "already_running" + if self._is_starting_recent(s): + return "already_starting" + + # Claim: stamp starting_at immediately as proof of ownership + s["monitoring_active"] = True + s["monitoring_starting_at"] = timezone.now().isoformat() + cfg.settings = s + cfg.save(update_fields=["settings", "updated_at"]) + return "claimed" + except Exception as exc: + self._log_error(f"_try_claim_monitor failed (proceeding optimistically): {exc}") + return "claimed" + def _ensure_monitoring_thread(self, settings: Dict[str, Any]) -> bool: """Restart the monitor thread if DB says active but no live thread is running. - This is the in-plugin self-healing path: it handles container restarts and - crashed/hung threads without relying on any Celery task. Returns True if a - new thread was started. + This is the in-plugin self-healing / auto-start path: it handles container + restarts and crashed/hung threads without relying on any Celery task. Returns + True if a new thread was started. """ if not settings.get("monitoring_active"): return False @@ -3220,26 +3468,30 @@ def _ensure_monitoring_thread(self, settings: Dict[str, Any]) -> bool: if not thread_dead: return False - # Another Celery worker's thread may still be running — check the shared heartbeat - heartbeat_str = settings.get("monitoring_heartbeat") - if heartbeat_str: - try: - heartbeat = datetime.fromisoformat(heartbeat_str.replace("Z", "+00:00")) - if isinstance(heartbeat.tzinfo, type(None)): - heartbeat = heartbeat.replace(tzinfo=dt_timezone.utc) - age_seconds = (datetime.now(dt_timezone.utc) - heartbeat).total_seconds() - poll_interval_minutes = settings.get("poll_interval_minutes", 15) - heartbeat_threshold = (poll_interval_minutes + 10) * 60 - if age_seconds < heartbeat_threshold: - self._log(f"Monitoring heartbeat is recent ({int(age_seconds)}s ago, threshold={heartbeat_threshold}s), skipping auto-restart") - return False - except (ValueError, TypeError): - pass + # Another worker's thread may still be running — check the shared heartbeat AND last_poll. + # A fresh heartbeat alone is not sufficient: it may be a stale write from the process that + # died on the last container restart. Require both to be within threshold before skipping. + if self._is_heartbeat_recent(settings) and self._is_last_poll_recent(settings): + heartbeat_str = settings.get("monitoring_heartbeat", "") + self._log(f"Monitoring heartbeat and poll are recent (heartbeat={heartbeat_str[:19]}), skipping auto-restart") + return False + + # Another worker may have just claimed the monitor slot (starting_at is fresh) + if self._is_starting_recent(settings): + starting_str = settings.get("monitoring_starting_at", "") + self._log(f"Monitoring claim is recent (monitoring_starting_at={starting_str[:19]}), skipping auto-restart") + return False channels = settings.get("monitored_channels", "").strip() if not channels or not self._ytdlp_path: return False + # Atomically claim the monitor slot before starting the thread + claim = self._try_claim_monitor() + if claim != "claimed": + self._log(f"Auto-restart skipped: monitor already claimed ({claim})") + return False + self._log("Auto-restarting monitoring after service restart") self._monitoring_active = True self._monitor_stop_event.clear()