diff --git a/src/caliscope/core/process_synchronized_recording.py b/src/caliscope/core/process_synchronized_recording.py index 42a776a5..354f852d 100644 --- a/src/caliscope/core/process_synchronized_recording.py +++ b/src/caliscope/core/process_synchronized_recording.py @@ -5,7 +5,7 @@ """ import logging -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path from typing import Callable @@ -41,6 +41,7 @@ def process_synchronized_recording( synced_timestamps: SynchronizedTimestamps, *, subsample: int = 1, + parallel: bool = True, on_progress: Callable[[int, int], None] | None = None, on_frame_data: Callable[[int, dict[int, FrameData]], None] | None = None, token: CancellationToken | None = None, @@ -56,6 +57,9 @@ def process_synchronized_recording( tracker: Tracker for 2D point extraction (handles per-cam_id state internally) synced_timestamps: Pre-constructed timestamp alignment object subsample: Process every Nth sync index (1 = all, 10 = every 10th) + parallel: Process cameras concurrently (True) or serially (False). + Uses ThreadPoolExecutor when True and multiple cameras present. + Set to False as fallback if threading issues are discovered. on_progress: Called with (current, total) during processing on_frame_data: Called with (sync_index, {cam_id: FrameData}) for live display token: Cancellation token for graceful shutdown @@ -75,43 +79,79 @@ def process_synchronized_recording( point_rows: list[dict] = [] try: - for i, sync_index in enumerate(all_sync_indices): - if token is not None and token.is_cancelled: - logger.info("Processing cancelled") - break + use_pool = parallel and len(frame_sources) > 1 - frame_data: dict[int, FrameData] = {} + if use_pool: + camera_pool = ThreadPoolExecutor(max_workers=len(frame_sources)) + else: + camera_pool = None - for cam_id in synced_timestamps.cam_ids: - frame_index = synced_timestamps.frame_for(sync_index, cam_id) - - if frame_index is None: - logger.debug(f"Dropped frame: sync={sync_index}, cam_id={cam_id}") - continue - - if cam_id not in frame_sources: - logger.warning(f"cam_id {cam_id} not in cameras dict, skipping") - continue - - camera = cameras[cam_id] - frame = frame_sources[cam_id].get_frame(frame_index) - - if frame is None: - logger.warning( - f"Failed to read frame: sync={sync_index}, cam_id={cam_id}, frame_index={frame_index}" - ) - continue - - points = tracker.get_points(frame, cam_id, camera.rotation_count) - frame_data[cam_id] = FrameData(frame, points, frame_index) - - frame_time = synced_timestamps.time_for(cam_id, frame_index) - _accumulate_points(point_rows, sync_index, cam_id, frame_index, frame_time, points) + try: + for i, sync_index in enumerate(all_sync_indices): + if token is not None and token.is_cancelled: + logger.info("Processing cancelled") + break - if on_frame_data is not None: - on_frame_data(sync_index, frame_data) - if on_progress is not None: - on_progress(i + 1, total) + frame_data: dict[int, FrameData] = {} + + if use_pool and camera_pool is not None: + # --- Parallel path --- + futures: dict[int, Future[tuple[int, FrameData | None, list[dict]]]] = {} + for cam_id in synced_timestamps.cam_ids: + frame_index = synced_timestamps.frame_for(sync_index, cam_id) + if frame_index is None: + continue + if cam_id not in frame_sources: + continue + camera = cameras[cam_id] + frame_time = synced_timestamps.time_for(cam_id, frame_index) + futures[cam_id] = camera_pool.submit( + _process_one_camera, + cam_id, + sync_index, + frame_index, + frame_sources[cam_id], + camera, + tracker, + frame_time, + ) + + for cam_id, future in futures.items(): + _, fd, rows = future.result() + if fd is not None: + frame_data[cam_id] = fd + point_rows.extend(rows) + else: + # --- Serial path (original logic) --- + for cam_id in synced_timestamps.cam_ids: + frame_index = synced_timestamps.frame_for(sync_index, cam_id) + if frame_index is None: + continue + if cam_id not in frame_sources: + continue + camera = cameras[cam_id] + frame = frame_sources[cam_id].read_frame_at(frame_index) + if frame is None: + logger.warning( + f"Failed to read frame: sync={sync_index}, cam_id={cam_id}, frame_index={frame_index}" + ) + continue + points = tracker.get_points(frame, cam_id, camera.rotation_count) + frame_data[cam_id] = FrameData(frame, points, frame_index) + frame_time = synced_timestamps.time_for(cam_id, frame_index) + _accumulate_points(point_rows, sync_index, cam_id, frame_index, frame_time, points) + + # Threading contract: callbacks are always invoked from this + # thread (the worker thread that owns the sync-index loop), + # never from pool threads. Presenters rely on this guarantee + # for unsynchronized accumulator state. + if on_frame_data is not None: + on_frame_data(sync_index, frame_data) + if on_progress is not None: + on_progress(i + 1, total) + finally: + if camera_pool is not None: + camera_pool.shutdown(wait=False) finally: for source in frame_sources.values(): @@ -224,6 +264,51 @@ def _accumulate_points( ) +def _process_one_camera( + cam_id: int, + sync_index: int, + frame_index: int, + frame_source: FrameSource, + camera: CameraData, + tracker: Tracker, + frame_time: float, +) -> tuple[int, FrameData | None, list[dict]]: + """Process a single camera for one sync index. + + Thread safety: This function is safe to call concurrently for different + cam_ids because: + - Each FrameSource instance is dedicated to one camera (no sharing). + - Tracker.get_points() is thread-safe: + - OnnxTracker._prev_bboxes: keyed by cam_id, each thread accesses + only its own key. Dict internal structure is GIL-protected. + INVARIANT: thread safety depends on each thread accessing a + distinct cam_id. Two threads must never process the same cam_id + concurrently. + - OnnxTracker.session.run(): onnxruntime InferenceSession.run() is + thread-safe (C++ session uses read-only model weights, per-call + buffer allocation). Verified for CPU execution provider. + - CharucoTracker/ArUcoTracker/ChessboardTracker: stateless OpenCV + calls on caller-provided buffers. + - point_rows is built locally and returned, not shared. + + Returns: + (cam_id, frame_data_or_none, point_rows) + """ + frame = frame_source.read_frame_at(frame_index) + + if frame is None: + logger.warning(f"Failed to read frame: sync={sync_index}, cam_id={cam_id}, frame_index={frame_index}") + return cam_id, None, [] + + points = tracker.get_points(frame, cam_id, camera.rotation_count) + fd = FrameData(frame, points, frame_index) + + local_rows: list[dict] = [] + _accumulate_points(local_rows, sync_index, cam_id, frame_index, frame_time, points) + + return cam_id, fd, local_rows + + def _build_image_points(point_rows: list[dict]) -> ImagePoints: """Construct ImagePoints from accumulated point data.""" if not point_rows: diff --git a/src/caliscope/gui/presenters/multi_camera_processing_presenter.py b/src/caliscope/gui/presenters/multi_camera_processing_presenter.py index 904c2239..6756102f 100644 --- a/src/caliscope/gui/presenters/multi_camera_processing_presenter.py +++ b/src/caliscope/gui/presenters/multi_camera_processing_presenter.py @@ -77,8 +77,9 @@ class MultiCameraProcessingPresenter(QObject): state_changed = Signal(MultiCameraProcessingState) # Progress signals - progress_updated = Signal(int, int, int) # (current, total, percent) + progress_updated = Signal(int, int, int, str) # (current, total, percent, eta_string) thumbnail_updated = Signal(int, object, object) # (cam_id, NDArray frame, PointPacket | None) + coverage_updated = Signal(object) # (NDArray[np.int64] matrix, list[int] cam_ids) # Completion signals processing_complete = Signal(object, object, object) # (ImagePoints, ExtrinsicCoverageReport, Tracker) @@ -89,6 +90,7 @@ class MultiCameraProcessingPresenter(QObject): # Thumbnail throttle interval (seconds) THUMBNAIL_INTERVAL = 0.1 # ~10 FPS + COVERAGE_INTERVAL = 5.0 # seconds between coverage heatmap updates def __init__( self, @@ -121,6 +123,17 @@ def __init__( self._thumbnails: dict[int, NDArray[np.uint8]] = {} self._last_thumbnail_time: float = 0.0 + # ETA and coverage state + self._processing_start_time: float = 0.0 + self._last_coverage_time: float = 0.0 + self._sync_frames_done: int = 0 + self._frames_total: int = 0 + + # Incremental coverage matrix — updated per sync index, never rebuilt + self._coverage_matrix: np.ndarray | None = None + self._coverage_cam_ids: list[int] = [] + self._coverage_cam_id_to_index: dict[int, int] = {} + # ------------------------------------------------------------------------- # Public Properties # ------------------------------------------------------------------------- @@ -297,6 +310,18 @@ def start_processing(self, subsample: int = 1) -> None: # Clear previous results only after successful timestamp construction self._reset_results() + self._processing_start_time = time.time() + self._last_coverage_time = 0.0 + self._sync_frames_done = 0 + self._frames_total = 0 + + # Initialize incremental coverage matrix + cam_ids = sorted(cameras.keys()) + self._coverage_cam_ids = cam_ids + self._coverage_cam_id_to_index = {cid: idx for idx, cid in enumerate(cam_ids)} + n = len(cam_ids) + self._coverage_matrix = np.zeros((n, n), dtype=np.int64) + def worker(token: CancellationToken, handle: TaskHandle) -> ImagePoints: return process_synchronized_recording( recording_dir=recording_dir, @@ -383,23 +408,71 @@ def cleanup(self) -> None: # ------------------------------------------------------------------------- def _on_progress(self, current: int, total: int) -> None: - """Progress callback from process_synchronized_recording.""" + """Progress callback from process_synchronized_recording. + + Called from worker thread. Thread-safe because progress_updated + is a Qt signal (cross-thread emission handled by Qt event loop). + """ + self._frames_total = total percent = int(100 * current / total) if total > 0 else 0 - self.progress_updated.emit(current, total, percent) + + # ETA computation (wait 3 seconds for rate to stabilize) + elapsed = time.time() - self._processing_start_time + eta_str = "" + if elapsed > 3.0 and current > 0: + rate = current / elapsed + remaining = max(0.0, (total - current) / rate) + minutes, seconds = divmod(int(remaining), 60) + eta_str = f" — ~{minutes}:{seconds:02d} remaining" + + self.progress_updated.emit(current, total, percent, eta_str) def _on_frame_data(self, sync_index: int, frame_data: dict[int, FrameData]) -> None: """Frame data callback from process_synchronized_recording. - Throttled to ~10 FPS to avoid overwhelming the UI. + Called from worker thread. Accumulator state is not protected by + locks because this callback is guaranteed single-threaded by the + processing function's contract (callbacks are invoked from the + main worker thread, never from pool threads). + + Throttled thumbnail emission at ~10 FPS. + Incremental coverage matrix update on every call. + Coverage signal emission throttled to COVERAGE_INTERVAL. """ now = time.time() + self._sync_frames_done += 1 + + # --- Incremental coverage matrix update (cheap, every call) --- + if self._coverage_matrix is not None: + # Collect which cameras saw each point_id at this sync_index + point_cameras: dict[int, list[int]] = {} + for cam_id, data in frame_data.items(): + if data.points is not None and len(data.points.point_id) > 0: + for pid in data.points.point_id: + point_cameras.setdefault(int(pid), []).append(cam_id) + + # Increment pairwise counts + for pid, cam_list in point_cameras.items(): + cam_list_sorted = sorted(cam_list) + for i, cam_id_i in enumerate(cam_list_sorted): + for cam_id_j in cam_list_sorted[i:]: + if cam_id_i in self._coverage_cam_id_to_index and cam_id_j in self._coverage_cam_id_to_index: + idx_i = self._coverage_cam_id_to_index[cam_id_i] + idx_j = self._coverage_cam_id_to_index[cam_id_j] + self._coverage_matrix[idx_i, idx_j] += 1 + if idx_i != idx_j: + self._coverage_matrix[idx_j, idx_i] += 1 + + # --- Emit coverage snapshot (throttled) --- + if now - self._last_coverage_time >= self.COVERAGE_INTERVAL: + self._last_coverage_time = now + if self._coverage_matrix is not None: + self.coverage_updated.emit((self._coverage_matrix.copy(), list(self._coverage_cam_ids))) + + # --- Thumbnails (throttled, ~10 FPS) --- if now - self._last_thumbnail_time < self.THUMBNAIL_INTERVAL: return - self._last_thumbnail_time = now - - # Update thumbnails for all cameras in this sync packet - # Points passed to View for overlay rendering (MVP: View renders) for cam_id, data in frame_data.items(): self._thumbnails[cam_id] = data.frame self.thumbnail_updated.emit(cam_id, data.frame, data.points) @@ -438,6 +511,11 @@ def _reset_results(self) -> None: self._result = None self._coverage_report = None self._task_handle = None + self._sync_frames_done = 0 + self._frames_total = 0 + self._coverage_matrix = None + self._coverage_cam_ids = [] + self._coverage_cam_id_to_index = {} def _emit_state_changed(self) -> None: """Emit state_changed signal with current computed state.""" diff --git a/src/caliscope/gui/views/multi_camera_processing_widget.py b/src/caliscope/gui/views/multi_camera_processing_widget.py index 1bd8c429..cfbfe789 100644 --- a/src/caliscope/gui/views/multi_camera_processing_widget.py +++ b/src/caliscope/gui/views/multi_camera_processing_widget.py @@ -288,6 +288,10 @@ def _connect_signals(self) -> None: self._presenter.thumbnail_updated.connect(self._on_thumbnail_updated) self._presenter.processing_complete.connect(self._on_processing_complete) self._presenter.processing_failed.connect(self._on_processing_failed) + self._presenter.coverage_updated.connect( + self._on_coverage_updated, + Qt.ConnectionType.QueuedConnection, + ) def _update_ui_for_state(self, state: "MultiCameraProcessingState") -> None: """Update UI elements based on presenter state.""" @@ -325,9 +329,9 @@ def _update_ui_for_state(self, state: "MultiCameraProcessingState") -> None: self._progress_label.setText("Starting...") self._set_rotation_enabled(False) self._subsample_spin.setEnabled(False) - # Keep showing placeholder during processing - self._coverage_placeholder.show() - self._coverage_content.hide() + # Show coverage content for live heatmap updates + self._coverage_placeholder.hide() + self._coverage_content.show() elif state == MultiCameraProcessingState.COMPLETE: self._action_btn.setText("Reset") @@ -349,10 +353,16 @@ def _set_rotation_enabled(self, enabled: bool) -> None: # Slots for Presenter Signals # ------------------------------------------------------------------------- - def _on_progress_updated(self, current: int, total: int, percent: int) -> None: + def _on_progress_updated(self, current: int, total: int, percent: int, eta_str: str) -> None: """Handle progress update from presenter.""" self._progress_bar.setValue(percent) - self._progress_label.setText(f"Processing: {current}/{total} frames ({percent}%)") + self._progress_label.setText(f"Processing: {current}/{total} frames ({percent}%){eta_str}") + + def _on_coverage_updated(self, data: tuple) -> None: + """Update the coverage heatmap during processing.""" + matrix, cam_ids = data + labels = [f"C{cid}" for cid in cam_ids] + self._coverage_heatmap.set_data(matrix, killed_linkages=set(), labels=labels) def _on_thumbnail_updated(self, cam_id: int, frame: NDArray, points: "PointPacket | None") -> None: """Handle thumbnail update from presenter. diff --git a/src/caliscope/recording/frame_source.py b/src/caliscope/recording/frame_source.py index fb5b38ba..92c6e993 100644 --- a/src/caliscope/recording/frame_source.py +++ b/src/caliscope/recording/frame_source.py @@ -137,6 +137,10 @@ def _open(self, video_path: Path, cam_id: int) -> None: f"found {len(self._keyframe_pts)} keyframes" ) + # Sequential read state for read_frame_at() + self._sequential_position: int = -1 # frame index of last decoded frame + self._sequential_iterator: Iterator[VideoFrame] | None = None + # Mark as successfully initialized - must be last line of _open # If _open fails, _closed won't exist and __del__ won't warn self._closed = False @@ -247,6 +251,104 @@ def get_frame(self, frame_index: int) -> np.ndarray | None: return None + def read_frame_at(self, frame_index: int) -> np.ndarray | None: + """Read frame at index, optimizing for sequential access patterns. + + Maintains internal position tracking. When the requested frame is + at or just ahead of the current position, decodes forward + sequentially (O(1) per frame). When it is far ahead or behind, + falls back to seek-based access. + + Not protected by self._lock — designed for single-owner access + in the parallel processing pipeline (one FrameSource per camera + per thread). + + Args: + frame_index: Target frame index. + + Returns: + BGR numpy array, or None if out of bounds or decode fails. + """ + if frame_index < 0 or frame_index > self.last_frame_index: + return None + + if self._container is None: + return None + + # Compute average GOP and decode-forward threshold + if self._keyframe_indices: + avg_gop = max(1, self._actual_last_frame_index // max(1, len(self._keyframe_indices))) + else: + avg_gop = 30 # default assumption + threshold = avg_gop // 2 + + gap = frame_index - self._sequential_position + + # Fast path: next frame in sequence + if gap == 1 and self._sequential_iterator is not None: + try: + frame = next(self._sequential_iterator) + if frame.pts is not None: + decoded_idx = round(frame.pts * self._time_base * self.fps) + if decoded_idx == frame_index: + self._sequential_position = frame_index + return frame.to_ndarray(format="bgr24") + else: + logger.warning( + f"PTS mismatch on fast path: expected {frame_index}, got {decoded_idx}. " + "Falling back to seek." + ) + # Fall through to seek path + except StopIteration: + pass # Fall through to seek path + + # Small-gap path: decode forward through the gap + elif 1 < gap <= threshold and self._sequential_iterator is not None: + try: + result_frame: VideoFrame | None = None + frames_decoded = 0 + # We need to advance (gap) frames from current position + for frame in self._sequential_iterator: + if frame.pts is None: + continue + decoded_idx = round(frame.pts * self._time_base * self.fps) + frames_decoded += 1 + if decoded_idx == frame_index: + result_frame = frame + break + elif decoded_idx > frame_index: + # Overshot — PTS mismatch, fall through to seek + logger.warning( + f"Overshot target {frame_index} (got {decoded_idx}) on small-gap path. " + "Falling back to seek." + ) + result_frame = None + break + # else: decoded_idx < frame_index, keep advancing + + if result_frame is not None: + self._sequential_position = frame_index + return result_frame.to_ndarray(format="bgr24") + # Fall through to seek path if we didn't find the target + except StopIteration: + pass # Fall through to seek path + + # Seek path: large jump, backward, cold start, or fallback from above paths + target_pts = int(frame_index / self.fps / self._time_base) + self._container.seek(target_pts, stream=self._video_stream) + + new_iterator = self._container.decode(self._video_stream) + for frame in new_iterator: + if frame.pts is None: + continue + decoded_idx = round(frame.pts * self._time_base * self.fps) + if decoded_idx >= frame_index: + self._sequential_position = decoded_idx + self._sequential_iterator = new_iterator + return frame.to_ndarray(format="bgr24") + + return None + def get_nearest_keyframe(self, frame_index: int) -> tuple[np.ndarray | None, int]: """Seek to nearest keyframe at or before target. @@ -322,6 +424,8 @@ def close(self) -> None: if self._container is not None: self._container.close() self._container = None # type: ignore[assignment] + self._sequential_position = -1 + self._sequential_iterator = None def __enter__(self) -> Self: """Context manager entry.""" diff --git a/tests/test_frame_source_sequential.py b/tests/test_frame_source_sequential.py new file mode 100644 index 00000000..8c3046eb --- /dev/null +++ b/tests/test_frame_source_sequential.py @@ -0,0 +1,109 @@ +"""Tests for FrameSource.read_frame_at() sequential read optimization. + +Validates that read_frame_at() returns identical frames to get_frame() +across sequential, skip, and large-skip access patterns. +""" + +from pathlib import Path +import numpy as np +import pytest +from caliscope.recording.frame_source import FrameSource + +TEST_SESSION = Path("tests/sessions/4_cam_recording") +RECORDING_DIR = TEST_SESSION / "calibration" / "extrinsic" + + +@pytest.fixture +def frame_source(): + """Create FrameSource for cam_0.""" + fs = FrameSource(RECORDING_DIR, cam_id=0) + yield fs + fs.close() + + +@pytest.fixture +def reference_source(): + """Separate FrameSource for get_frame() reference.""" + fs = FrameSource(RECORDING_DIR, cam_id=0) + yield fs + fs.close() + + +class TestReadFrameAt: + """Tests for sequential read optimization.""" + + def test_sequential_matches_get_frame(self, frame_source, reference_source): + """Sequential read_frame_at() returns same frames as get_frame().""" + for i in range(10): + seq_frame = frame_source.read_frame_at(i) + ref_frame = reference_source.get_frame(i) + assert seq_frame is not None + assert ref_frame is not None + np.testing.assert_array_equal(seq_frame, ref_frame) + + def test_small_skip_matches(self, frame_source, reference_source): + """Small forward skip returns correct frame.""" + # Read frame 0 to initialize + frame_source.read_frame_at(0) + # Skip to frame 5 + seq_frame = frame_source.read_frame_at(5) + ref_frame = reference_source.get_frame(5) + assert seq_frame is not None + assert ref_frame is not None + np.testing.assert_array_equal(seq_frame, ref_frame) + + def test_large_skip_matches(self, frame_source, reference_source): + """Large forward skip (> GOP) returns correct frame.""" + # Read frame 0 to initialize + frame_source.read_frame_at(0) + # Skip to a frame well beyond GOP size (typically ~30) + target = min(100, frame_source.last_frame_index) + seq_frame = frame_source.read_frame_at(target) + ref_frame = reference_source.get_frame(target) + assert seq_frame is not None + assert ref_frame is not None + np.testing.assert_array_equal(seq_frame, ref_frame) + + def test_backward_seek(self, frame_source, reference_source): + """Backward seek returns correct frame.""" + frame_source.read_frame_at(50) + seq_frame = frame_source.read_frame_at(10) + ref_frame = reference_source.get_frame(10) + assert seq_frame is not None + assert ref_frame is not None + np.testing.assert_array_equal(seq_frame, ref_frame) + + def test_out_of_bounds(self, frame_source): + """Out-of-bounds indices return None.""" + assert frame_source.read_frame_at(-1) is None + assert frame_source.read_frame_at(frame_source.last_frame_index + 1) is None + + def test_cold_start(self, frame_source, reference_source): + """First call without any prior reads works correctly.""" + target = 20 + seq_frame = frame_source.read_frame_at(target) + ref_frame = reference_source.get_frame(target) + assert seq_frame is not None + assert ref_frame is not None + np.testing.assert_array_equal(seq_frame, ref_frame) + + +if __name__ == "__main__": + debug_dir = Path(__file__).parent / "tmp" + debug_dir.mkdir(parents=True, exist_ok=True) + + fs = FrameSource(RECORDING_DIR, cam_id=0) + ref = FrameSource(RECORDING_DIR, cam_id=0) + + print(f"Frame count: {fs.frame_count}, last index: {fs.last_frame_index}") + print(f"Keyframes: {len(fs._keyframe_indices)}") + + # Test sequential reads + for i in range(5): + seq = fs.read_frame_at(i) + ref_f = ref.get_frame(i) + match = np.array_equal(seq, ref_f) + print(f"Frame {i}: match={match}") + + fs.close() + ref.close()