Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit b544d27

Browse files
committed
nvdemux: improvements
* Do not wait for ready during driver init * Be more defensive on demuxer process cleanup * Move many log messages to debug * Wait for poll_interval when restarting demuxer always to avoid busy loops
1 parent e7c4056 commit b544d27

3 files changed

Lines changed: 228 additions & 40 deletions

File tree

packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ def __post_init__(self):
7474
self.logger.error("Failed to register with DemuxerManager: %s", e)
7575
raise
7676

77-
# Wait for initial ready state (with timeout)
78-
if not self._ready.wait(timeout=self.timeout):
79-
self.logger.warning("Timeout waiting for demuxer to become ready during initialization")
8077

8178
@classmethod
8279
def client(cls) -> str:

packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py

Lines changed: 203 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,50 @@
55
and distributes pts paths to registered drivers.
66
"""
77

8+
import atexit
9+
import ctypes
810
import glob
911
import logging
1012
import os
13+
import signal
1114
import subprocess
15+
import sys
1216
import threading
1317
from dataclasses import dataclass
1418
from typing import Callable, Optional
1519

1620
logger = logging.getLogger(__name__)
1721

22+
# Platform detection
23+
_IS_LINUX = sys.platform.startswith("linux")
24+
25+
26+
def _get_preexec_fn() -> Callable[[], None] | None:
27+
"""Get platform-specific preexec_fn for subprocess.
28+
29+
On Linux, returns a function that sets PR_SET_PDEATHSIG to SIGTERM,
30+
ensuring the subprocess receives SIGTERM when the parent process dies.
31+
This works even if the parent is killed with SIGKILL.
32+
33+
On other platforms, returns None.
34+
"""
35+
if not _IS_LINUX:
36+
return None
37+
38+
def set_pdeathsig():
39+
"""Set parent death signal to SIGTERM via prctl."""
40+
try:
41+
libc = ctypes.CDLL("libc.so.6", use_errno=True)
42+
PR_SET_PDEATHSIG = 1
43+
result = libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM, 0, 0, 0)
44+
if result != 0:
45+
errno = ctypes.get_errno()
46+
logger.warning("prctl(PR_SET_PDEATHSIG) failed with errno %d", errno)
47+
except Exception as e:
48+
logger.warning("Failed to set parent death signal: %s", e)
49+
50+
return set_pdeathsig
51+
1852

1953
def _has_glob_chars(path: str) -> bool:
2054
"""Check if path contains glob wildcard characters."""
@@ -58,6 +92,9 @@ class DemuxerManager:
5892

5993
_instance: Optional["DemuxerManager"] = None
6094
_instance_lock = threading.Lock()
95+
_signal_handlers_installed = False
96+
_original_sigterm_handler: signal.Handlers | None = None
97+
_original_sigint_handler: signal.Handlers | None = None
6198

6299
def __init__(self):
63100
"""Private constructor. Use get_instance() instead."""
@@ -68,13 +105,21 @@ def __init__(self):
68105
self._process: Optional[subprocess.Popen] = None
69106
self._monitor_thread: Optional[threading.Thread] = None
70107
self._shutdown = threading.Event()
108+
self._cleanup_done = False
71109

72110
# Process configuration (must be same for all drivers)
73111
self._demuxer_path: Optional[str] = None
74112
self._device: Optional[str] = None
75113
self._chip: Optional[str] = None
76114
self._poll_interval: float = 1.0
77115

116+
# Register atexit handler for cleanup on normal exit
117+
atexit.register(self._atexit_cleanup)
118+
logger.debug("Registered atexit handler for demuxer cleanup")
119+
120+
# Install signal handlers (only once globally)
121+
self._install_signal_handlers()
122+
78123
@classmethod
79124
def get_instance(cls) -> "DemuxerManager":
80125
"""Get the singleton instance of DemuxerManager."""
@@ -89,9 +134,61 @@ def reset_instance(cls):
89134
"""Reset the singleton instance. Used for testing."""
90135
with cls._instance_lock:
91136
if cls._instance is not None:
137+
# Reset cleanup_done flag before cleanup to allow cleanup to run
138+
cls._instance._cleanup_done = False
92139
cls._instance._cleanup()
93140
cls._instance = None
94141

142+
def _atexit_cleanup(self):
143+
"""Cleanup handler called on normal program exit via atexit."""
144+
if self._cleanup_done:
145+
return
146+
logger.debug("atexit cleanup triggered")
147+
self._cleanup()
148+
149+
def _install_signal_handlers(self):
150+
"""Install signal handlers for SIGTERM and SIGINT.
151+
152+
Handlers ensure cleanup is performed before the process terminates.
153+
Only installs handlers once globally, and preserves original handlers.
154+
"""
155+
cls = type(self)
156+
if cls._signal_handlers_installed:
157+
return
158+
159+
def make_handler(sig: signal.Signals) -> Callable[[int, any], None]:
160+
"""Create a signal handler that cleans up and re-raises the signal."""
161+
162+
def handler(signum: int, frame):
163+
logger.debug("Signal %s received, cleaning up demuxer process", sig.name)
164+
# Cleanup the demuxer process
165+
if cls._instance is not None:
166+
cls._instance._cleanup()
167+
168+
# Restore original handler and re-raise signal
169+
if sig == signal.SIGTERM and cls._original_sigterm_handler is not None:
170+
signal.signal(signal.SIGTERM, cls._original_sigterm_handler)
171+
elif sig == signal.SIGINT and cls._original_sigint_handler is not None:
172+
signal.signal(signal.SIGINT, cls._original_sigint_handler)
173+
174+
# Re-raise the signal to allow normal termination
175+
os.kill(os.getpid(), signum)
176+
177+
return handler
178+
179+
try:
180+
# Only install signal handlers from the main thread
181+
if threading.current_thread() is not threading.main_thread():
182+
logger.debug("Not installing signal handlers from non-main thread")
183+
return
184+
185+
cls._original_sigterm_handler = signal.signal(signal.SIGTERM, make_handler(signal.SIGTERM))
186+
cls._original_sigint_handler = signal.signal(signal.SIGINT, make_handler(signal.SIGINT))
187+
cls._signal_handlers_installed = True
188+
logger.debug("Installed signal handlers for SIGTERM and SIGINT")
189+
except Exception as e:
190+
logger.warning("Failed to install signal handlers: %s", e)
191+
95192
def _validate_config(self, demuxer_path: str, device: str, chip: str, target: str):
96193
"""Validate driver configuration against existing drivers.
97194
@@ -158,7 +255,7 @@ def register_driver(
158255
driver_info = DriverInfo(driver_id=driver_id, target=target, callback=callback)
159256
self._drivers[driver_id] = driver_info
160257

161-
logger.info("Registered driver %s for target '%s'", driver_id, target)
258+
logger.debug("Registered driver %s for target '%s'", driver_id, target)
162259

163260
# If target is already ready, notify immediately
164261
notify_args = self._get_ready_callback(target, callback)
@@ -184,7 +281,7 @@ def unregister_driver(self, driver_id: str) -> None:
184281
if driver_id in self._drivers:
185282
target = self._drivers[driver_id].target
186283
del self._drivers[driver_id]
187-
logger.info("Unregistered driver %s (target: %s)", driver_id, target)
284+
logger.debug("Unregistered driver %s (target: %s)", driver_id, target)
188285

189286
# Keep monitor running even if no drivers remain
190287

@@ -222,36 +319,72 @@ def _start_monitor(self):
222319
target=self._monitor_loop, daemon=True, name="DemuxerManager-monitor"
223320
)
224321
self._monitor_thread.start()
225-
logger.info("Started demuxer monitor thread")
322+
logger.debug("Started demuxer monitor thread")
226323

227324
def _stop_monitor(self):
228-
"""Stop the monitor thread."""
325+
"""Stop the monitor thread.
326+
327+
This method is idempotent - safe to call multiple times.
328+
"""
229329
self._shutdown.set()
230330

231331
# Terminate process if running
232-
if self._process:
233-
logger.info("Terminating demuxer process...")
332+
process = self._process
333+
if process is not None:
334+
logger.debug("Terminating demuxer process (PID %s)...", process.pid)
234335
try:
235-
self._process.terminate()
236-
self._process.wait(timeout=5.0)
237-
except subprocess.TimeoutExpired:
238-
self._process.kill()
239-
self._process.wait()
240-
self._process = None
336+
# First try graceful termination
337+
process.terminate()
338+
try:
339+
process.wait(timeout=5.0)
340+
logger.debug("Demuxer process terminated gracefully")
341+
except subprocess.TimeoutExpired:
342+
# Force kill if it doesn't respond
343+
logger.warning("Demuxer process did not terminate, killing...")
344+
process.kill()
345+
process.wait(timeout=2.0)
346+
logger.debug("Demuxer process killed")
347+
except ProcessLookupError:
348+
# Process already dead
349+
logger.debug("Demuxer process already exited")
350+
except Exception as e:
351+
logger.error("Error terminating demuxer process: %s", e)
352+
finally:
353+
self._process = None
241354

242355
# Wait for monitor thread to exit
243-
if self._monitor_thread and self._monitor_thread.is_alive():
244-
self._monitor_thread.join(timeout=2.0)
356+
monitor_thread = self._monitor_thread
357+
if monitor_thread is not None and monitor_thread.is_alive():
358+
# Don't join if we're being called from the monitor thread itself
359+
if threading.current_thread() is not monitor_thread:
360+
monitor_thread.join(timeout=2.0)
361+
if monitor_thread.is_alive():
362+
logger.warning("Monitor thread did not exit within timeout")
245363
self._monitor_thread = None
246364

247-
logger.info("Stopped demuxer monitor thread")
365+
logger.debug("Stopped demuxer monitor")
248366

249367
def _cleanup(self):
250-
"""Clean up resources."""
368+
"""Clean up resources.
369+
370+
This method is idempotent - safe to call multiple times.
371+
Ensures the demuxer process is terminated on program exit.
372+
"""
373+
if self._cleanup_done:
374+
logger.debug("Cleanup already done, skipping")
375+
return
376+
377+
logger.debug("Cleaning up DemuxerManager resources")
378+
self._cleanup_done = True
379+
251380
self._stop_monitor()
252-
self._drivers.clear()
253-
self._pts_map.clear()
254-
self._ready_targets.clear()
381+
382+
with self._lock:
383+
self._drivers.clear()
384+
self._pts_map.clear()
385+
self._ready_targets.clear()
386+
387+
logger.info("DemuxerManager cleanup complete")
255388

256389
def _monitor_loop(self):
257390
"""Background thread that manages demuxer lifecycle and auto-recovery."""
@@ -264,9 +397,9 @@ def _monitor_loop(self):
264397
with self._lock:
265398
self._pts_map.clear()
266399
self._ready_targets.clear()
267-
# Wait before retrying
268-
if self._shutdown.wait(timeout=self._poll_interval):
269-
break
400+
# Always wait for poll interval before retrying
401+
if self._shutdown.wait(timeout=self._poll_interval):
402+
break
270403

271404
def _run_demuxer_cycle(self):
272405
"""Run one cycle of: find device -> start demuxer -> monitor until exit."""
@@ -280,9 +413,15 @@ def _run_demuxer_cycle(self):
280413
self._shutdown.wait(timeout=self._poll_interval)
281414
return
282415

283-
# Read and parse demuxer output
416+
# Read and parse demuxer output (stdout and stderr concurrently)
417+
stderr_thread = threading.Thread(target=self._read_demuxer_stderr, daemon=True)
418+
stderr_thread.start()
419+
284420
self._read_demuxer_output()
285421

422+
# Wait for stderr thread to finish
423+
stderr_thread.join(timeout=1.0)
424+
286425
# Cleanup process
287426
self._cleanup_demuxer_process()
288427

@@ -294,17 +433,24 @@ def _wait_for_device(self) -> str | None:
294433
while not self._shutdown.is_set():
295434
resolved_device = _resolve_device(self._device)
296435
if resolved_device:
297-
logger.info("Found device: %s", resolved_device)
436+
logger.debug("Found device: %s", resolved_device)
298437
return resolved_device
299438
logger.debug("Device not found, polling... (pattern: %s)", self._device)
300439
if self._shutdown.wait(timeout=self._poll_interval):
301440
return None
302441
return None
303442

304443
def _start_demuxer_process(self, device: str) -> bool:
305-
"""Start the demuxer process. Returns True on success."""
444+
"""Start the demuxer process. Returns True on success.
445+
446+
On Linux, uses prctl(PR_SET_PDEATHSIG) to ensure the subprocess
447+
receives SIGTERM when the parent dies (including kill -9).
448+
"""
306449
cmd = [self._demuxer_path, "-m", self._chip, "-d", device]
307-
logger.info("Starting demuxer: %s", " ".join(cmd))
450+
logger.debug("Starting demuxer: %s", " ".join(cmd))
451+
452+
# Get platform-specific preexec_fn (Linux: set parent death signal)
453+
preexec_fn = _get_preexec_fn()
308454

309455
try:
310456
self._process = subprocess.Popen(
@@ -313,7 +459,9 @@ def _start_demuxer_process(self, device: str) -> bool:
313459
stderr=subprocess.PIPE,
314460
text=True,
315461
bufsize=1, # Line buffered
462+
preexec_fn=preexec_fn,
316463
)
464+
logger.debug("Demuxer process started with PID %d", self._process.pid)
317465
return True
318466
except (FileNotFoundError, PermissionError) as e:
319467
logger.error("Failed to start demuxer: %s", e)
@@ -329,17 +477,37 @@ def _parse_demuxer_line(self, line: str) -> tuple[str | None, str | None]:
329477
if len(parts) < 2:
330478
return None, None
331479

332-
pts_path = None
333-
target = None
334-
335-
for part in parts:
336-
if part.startswith("/dev/"):
337-
pts_path = part
338-
elif ":" in part: # Targets have format "NAME: N"
339-
target = part
480+
# First part is the pts path, second part is the target name
481+
pts_path = parts[0].strip() if parts[0].startswith("/dev/") else None
482+
target = parts[1].strip() if len(parts) >= 2 else None
340483

341484
return pts_path, target
342485

486+
def _read_demuxer_stderr(self):
487+
"""Read demuxer stderr and check for catastrophic errors."""
488+
try:
489+
for line in iter(self._process.stderr.readline, ""):
490+
if self._shutdown.is_set():
491+
break
492+
493+
line = line.strip()
494+
if not line:
495+
continue
496+
497+
logger.warning("Demuxer stderr: %s", line)
498+
499+
# Check for catastrophic file lock error
500+
if "ERROR: unable to obtain file lock" in line:
501+
logger.critical(
502+
"Demuxer file lock error detected. Another instance may be running. "
503+
"Terminating exporter to prevent conflicts."
504+
)
505+
# Force immediate process termination
506+
os._exit(1)
507+
508+
except Exception as e:
509+
logger.error("Error reading demuxer stderr: %s", e)
510+
343511
def _read_demuxer_output(self):
344512
"""Read demuxer stdout and parse all pts paths."""
345513
try:
@@ -357,7 +525,7 @@ def _read_demuxer_output(self):
357525
pts_path, target = self._parse_demuxer_line(line)
358526

359527
if pts_path and target:
360-
logger.info("Found pts path for target '%s': %s", target, pts_path)
528+
logger.debug("Found pts path for target '%s': %s", target, pts_path)
361529

362530
callback_to_invoke = None
363531
with self._lock:

0 commit comments

Comments
 (0)