diff --git a/README.md b/README.md index 8399111..7836dd4 100644 --- a/README.md +++ b/README.md @@ -6,16 +6,6 @@ RoboQA-Temporal is an open-source, professional Python toolkit focused on automa ![RoboQA-Logo](.docs/roboqa_logo.png) -## Design/Components/Stories: - -### User Stories: -> User Stories for this project can be found here - [USER_STORIES.md](.docs/USER_STORIES.md) - -### User Design: -> User Desigs for this project can be found here - [USER_DESIGN.md](.docs/USER_DESIGN.md) - -### User Components: -> User Components for this project can be found here - [USER_COMPONENTS.md](.docs/USER_COMPONENTS.md) ## Installation @@ -50,7 +40,7 @@ cd roboqa-temporal python3 -m venv venv # Activate the virtual environment -source venv/bin/activate` +source venv/bin/activate # 5. Source ROS source /opt/ros/humble/setup.bash @@ -193,6 +183,22 @@ roboqa anomaly path/to/bag_file.db3 --voxel-size 0.1 --max-points-for-outliers 5 - `--max-points-for-outliers 50000`: Skip outlier removal for point clouds exceeding this limit - Use the pre-configured `examples/config_anomaly_kitti.yaml` for optimal KITTI settings + +## User Stories/Design/Components +
+ Click for more info + + ### User Stories: + > User Stories for this project can be found here - [USER_STORIES.md](.docs/USER_STORIES.md) + + ### User Design: + > User Desigs for this project can be found here - [USER_DESIGN.md](.docs/USER_DESIGN.md) + + ### User Components: + > User Components for this project can be found here - [USER_COMPONENTS.md](.docs/USER_COMPONENTS.md) + +
+ ## Project Structure (For more details check [here](PROJECT_STRUCTURE.md)) diff --git a/tests/test_bag_loader.py b/tests/test_bag_loader.py deleted file mode 100644 index 10db2eb..0000000 --- a/tests/test_bag_loader.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import annotations - -import itertools - -from roboqa_temporal.loader.bag_loader import PointCloudFrame - - -def test_topic_info_includes_point_cloud_topic(bag_loader): - """ - author: architjain - reviewer: dharinesh - category: integration test (for CI purposes) - justification: Tests ROS2 bag integration functionality - """ - topic_info = bag_loader.get_topic_info() - assert "/synthetic_points" in topic_info - assert topic_info["/synthetic_points"]["type"].endswith("PointCloud2") - - -def test_read_point_clouds_produces_frames(bag_loader): - """ - author: architjain - reviewer: dharinesh - category: integration test (for CI purposes) - justification: Tests ROS2 bag reading and frame production - """ - frames = list(itertools.islice(bag_loader.read_point_clouds(progress=False), 5)) - assert frames, "Expected sample bag to yield at least one frame" - - first_frame: PointCloudFrame = frames[0] - assert first_frame.points.shape[1] == 3 - assert first_frame.num_points == first_frame.points.shape[0] - - timestamps = [frame.timestamp for frame in frames] - assert all(b >= a for a, b in zip(timestamps, timestamps[1:])) - diff --git a/tests/test_detection_pipeline.py b/tests/test_detection_pipeline.py deleted file mode 100644 index 739678d..0000000 --- a/tests/test_detection_pipeline.py +++ /dev/null @@ -1,44 +0,0 @@ -from __future__ import annotations - -import numpy as np - -from roboqa_temporal.detection.detector import AnomalyDetector, DetectionResult - - -def test_anomaly_detector_returns_health_metrics(sample_frames): - """ - author: architjain - reviewer: dharinesh - category: integration test (for CI purposes) - justification: Tests end-to-end detection pipeline with real ROS2 bag data - """ - np.random.seed(0) # deterministic sampling inside ghost detector - - detector = AnomalyDetector() - result = detector.detect(sample_frames) - - assert isinstance(result, DetectionResult) - assert isinstance(result.anomalies, list) - assert "overall_health_score" in result.health_metrics - assert result.health_metrics["avg_points_per_frame"] > 0 - - -def test_anomaly_detector_respects_disabled_detectors(sample_frames): - """ - author: architjain - reviewer: dharinesh - category: integration test (for CI purposes) - justification: Tests detector configuration with real ROS2 bag data - """ - detector = AnomalyDetector( - enable_density_detection=False, - enable_spatial_detection=True, - enable_ghost_detection=False, - enable_temporal_detection=True, - ) - result = detector.detect(sample_frames) - - assert set(result.detector_results.keys()).issubset({"spatial", "temporal"}) - assert "density" not in result.detector_results - assert "ghost" not in result.detector_results - diff --git a/tests/test_edge.py b/tests/test_edge.py index 22614b0..a903559 100644 --- a/tests/test_edge.py +++ b/tests/test_edge.py @@ -94,6 +94,209 @@ def test_detector_with_extreme_thresholds(): assert isinstance(result, DetectionResult) +def test_sensor_stream_with_empty_timestamps(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import SensorStream + + stream = SensorStream( + name="empty_stream", + source_path="/fake/path", + timestamps_ns=[], + expected_frequency=10.0, + ) + + assert stream.timestamps_sec.size == 0 + assert stream.frequency_estimate_hz is None + assert stream.metadata["message_count"] == 0 + + +def test_sensor_stream_with_single_timestamp(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import SensorStream + + stream = SensorStream( + name="single_stream", + source_path="/fake/path", + timestamps_ns=[1000000000], + expected_frequency=10.0, + ) + + assert stream.timestamps_sec.size == 1 + assert stream.frequency_estimate_hz is None + assert stream.metadata["message_count"] == 1 + + +def test_sensor_stream_with_negative_timestamps(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import SensorStream + + timestamps_ns = [-1000000000, -900000000, -800000000] + stream = SensorStream( + name="negative_stream", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.timestamps_sec.size == 3 + assert stream.frequency_estimate_hz is not None + + +def test_sensor_stream_with_unordered_timestamps(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import SensorStream + + timestamps_ns = [1000000000, 1200000000, 1100000000, 1300000000] + stream = SensorStream( + name="unordered_stream", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.timestamps_sec.size == 4 + + +def test_sensor_stream_with_zero_intervals(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import SensorStream + + timestamps_ns = [1000000000, 1000000000, 1000000000] + stream = SensorStream( + name="zero_interval_stream", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.timestamps_sec.size == 3 + assert stream.metadata["duplicate_frames"] > 0 + + +def test_temporal_sync_validator_with_extreme_frequencies(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + + high_freq_ts = list(range(0, 1000000, 1000)) # 1000 Hz + low_freq_ts = list(range(0, 1000000000, 1000000000)) # 1 Hz + + streams = { + "high_freq": SensorStream( + name="high_freq", + source_path="/fake/high", + timestamps_ns=high_freq_ts, + expected_frequency=1000.0, + ), + "low_freq": SensorStream( + name="low_freq", + source_path="/fake/low", + timestamps_ns=low_freq_ts, + expected_frequency=1.0, + ), + } + + validator = TemporalSyncValidator(auto_export_reports=False) + report = validator.analyze_streams(streams, dataset_name="extreme_freq", include_visualizations=False) + + assert report is not None + assert len(report.streams) == 2 + + +def test_temporal_sync_validator_with_huge_time_gaps(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + import numpy as np + + timestamps_ns = [ + 1000000000, + 1100000000, + 5000000000, + 5100000000, + ] + + stream = SensorStream( + name="gappy_stream", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + streams = {"gappy": stream} + validator = TemporalSyncValidator(auto_export_reports=False) + report = validator.analyze_streams(streams, dataset_name="gappy_test", include_visualizations=False) + + assert report is not None + assert stream.metadata["missing_frames"] > 0 + + +def test_temporal_sync_validator_with_nan_timestamps(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import SensorStream + import numpy as np + + timestamps_ns = [1000000000, 1100000000, int(np.nan) if not np.isnan(np.nan) else 0] + + stream = SensorStream( + name="nan_stream", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.timestamps_sec.size == 3 + + +def test_temporal_sync_validator_custom_thresholds(): + """ + author: xinxin + reviewer: sayali + category: edge test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator + + validator = TemporalSyncValidator( + approximate_time_threshold_ms={ + "camera_left_lidar": 0.001, + }, + rolling_window=1, + ) + + assert validator.approximate_time_threshold_ms["camera_left_lidar"] == 0.001 + assert validator.rolling_window == 1 + + def test_preprocessor_downsample_with_zero_voxel_size(): """ author: architjain @@ -168,3 +371,436 @@ def test_anomaly_with_max_severity(): ) assert anomaly.severity == 1.0 + + +def test_calibration_stream_with_empty_paths(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import CalibrationStream + + stream = CalibrationStream( + name="empty_stream", + image_paths=[], + pointcloud_paths=[], + calibration_file="/fake/calib.txt", + camera_id="cam_00", + lidar_id="velodyne" + ) + + assert len(stream.image_paths) == 0 + assert len(stream.pointcloud_paths) == 0 + + +def test_calibration_stream_with_mismatched_counts(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import CalibrationStream + + # Different number of images and point clouds + stream = CalibrationStream( + name="mismatched_stream", + image_paths=["/fake/img1.png", "/fake/img2.png", "/fake/img3.png"], + pointcloud_paths=["/fake/pc1.bin", "/fake/pc2.bin"], + calibration_file="/fake/calib.txt", + camera_id="cam_01", + lidar_id="velodyne" + ) + + assert len(stream.image_paths) == 3 + assert len(stream.pointcloud_paths) == 2 + + +def test_projection_error_frame_with_zero_error(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import ProjectionErrorFrame + + error_frame = ProjectionErrorFrame( + frame_index=0, + timestamp=1000.0, + reprojection_error=0.0, + projected_points_count=0 + ) + + assert error_frame.reprojection_error == 0.0 + assert error_frame.projected_points_count == 0 + + +def test_projection_error_frame_with_extreme_error(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import ProjectionErrorFrame + + error_frame = ProjectionErrorFrame( + frame_index=0, + timestamp=1000.0, + reprojection_error=999999.9, + max_error_point=(1e6, 1e6), + projected_points_count=1 + ) + + assert error_frame.reprojection_error > 1000.0 + assert error_frame.max_error_point[0] == 1e6 + + +def test_illumination_frame_with_zero_brightness(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import IlluminationFrame + + illum_frame = IlluminationFrame( + frame_index=0, + timestamp=1000.0, + brightness_mean=0.0, + brightness_std=0.0, + contrast=0.0, + scene_change_score=0.0, + light_source_change=False + ) + + assert illum_frame.brightness_mean == 0.0 + assert illum_frame.contrast == 0.0 + + +def test_illumination_frame_with_max_brightness(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import IlluminationFrame + + illum_frame = IlluminationFrame( + frame_index=0, + timestamp=1000.0, + brightness_mean=255.0, + brightness_std=0.0, + contrast=1.0, + scene_change_score=1.0, + light_source_change=True + ) + + assert illum_frame.brightness_mean == 255.0 + assert illum_frame.scene_change_score == 1.0 + + +def test_moving_object_frame_with_no_objects(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import MovingObjectFrame + + obj_frame = MovingObjectFrame( + frame_index=0, + timestamp=1000.0, + detected_objects=0, + detection_confidence=0.0, + consistency_score=0.0, + fusion_quality_score=0.0 + ) + + assert obj_frame.detected_objects == 0 + assert obj_frame.detection_confidence == 0.0 + + +def test_moving_object_frame_with_max_confidence(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import MovingObjectFrame + + obj_frame = MovingObjectFrame( + frame_index=0, + timestamp=1000.0, + detected_objects=100, + detection_confidence=1.0, + consistency_score=1.0, + fusion_quality_score=1.0 + ) + + assert obj_frame.detected_objects == 100 + assert obj_frame.detection_confidence == 1.0 + assert obj_frame.fusion_quality_score == 1.0 + + +def test_calibration_pair_result_all_fail(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import CalibrationPairResult + + result = CalibrationPairResult( + geom_edge_score=0.1, + mutual_information=0.2, + contrastive_score=0.15, + pass_geom_edge=False, + pass_mi=False, + pass_contrastive=False, + overall_pass=False, + details={"reason": "Poor calibration"} + ) + + assert result.overall_pass is False + assert not result.pass_geom_edge + assert not result.pass_mi + assert not result.pass_contrastive + + +def test_calibration_pair_result_all_perfect(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import CalibrationPairResult + + result = CalibrationPairResult( + geom_edge_score=1.0, + mutual_information=1.0, + contrastive_score=1.0, + pass_geom_edge=True, + pass_mi=True, + pass_contrastive=True, + overall_pass=True, + details={"quality": "perfect"} + ) + + assert result.overall_pass is True + assert result.geom_edge_score == 1.0 + assert result.mutual_information == 1.0 + + +def test_calibration_quality_validator_with_empty_config(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import CalibrationQualityValidator + + validator = CalibrationQualityValidator( + output_dir="reports/empty_config", + config={} + ) + + assert validator.config == {} + assert validator.output_dir.exists() + + +def test_calibration_quality_report_with_empty_lists(): + """ + author: dharinesh + reviewer: architjain + category: edge test + """ + from roboqa_temporal.fusion import CalibrationQualityReport + + report = CalibrationQualityReport( + dataset_name="empty_dataset", + metrics={}, + pair_results={}, + projection_errors=[], + illumination_changes=[], + moving_objects=[], + recommendations=[], + parameter_file=None + ) + + assert len(report.projection_errors) == 0 + assert len(report.illumination_changes) == 0 + assert len(report.moving_objects) == 0 + assert report.parameter_file is None + + +def test_temporal_score_with_single_timestamp(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_temporal_score + import numpy as np + + # Single timestamp should return 0 + timestamps = np.array([1000], dtype='datetime64[ns]') + score = compute_temporal_score(timestamps) + + assert score == 0.0 + + +def test_temporal_score_with_empty_timestamps(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_temporal_score + import numpy as np + + # Empty array should return 0 + timestamps = np.array([], dtype='datetime64[ns]') + score = compute_temporal_score(timestamps) + + assert score == 0.0 + + +def test_temporal_score_with_identical_timestamps(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_temporal_score + import numpy as np + + # All identical timestamps (zero intervals) + timestamps = np.array([1000, 1000, 1000], dtype='datetime64[ns]') + score = compute_temporal_score(timestamps) + + assert isinstance(score, float) + assert 0.0 <= score <= 1.0 + + +def test_anomaly_score_with_single_timestamp(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_anomaly_score + import numpy as np + + timestamps = np.array([1000], dtype='datetime64[ns]') + score = compute_anomaly_score(timestamps) + + assert score == 0.0 + + +def test_anomaly_score_with_empty_timestamps(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_anomaly_score + import numpy as np + + timestamps = np.array([], dtype='datetime64[ns]') + score = compute_anomaly_score(timestamps) + + assert score == 0.0 + + +def test_completeness_metrics_with_empty_timestamps(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + import numpy as np + + timestamps = np.array([], dtype='datetime64[ns]') + metrics = compute_completeness_metrics(timestamps, max_frames_in_sequence=10) + + assert isinstance(metrics, dict) + assert metrics["message_availability"] == 0.0 + + +def test_completeness_metrics_with_zero_max_frames(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + import numpy as np + + timestamps = np.arange(0, 1000, 100).astype('datetime64[ns]') + metrics = compute_completeness_metrics(timestamps, max_frames_in_sequence=0) + + assert isinstance(metrics, dict) + assert metrics["message_availability"] == 0.0 + + +def test_completeness_metrics_exceeding_max_frames(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + import numpy as np + + timestamps = np.arange(0, 2000, 100).astype('datetime64[ns]') # 20 frames + metrics = compute_completeness_metrics(timestamps, max_frames_in_sequence=10) + + assert isinstance(metrics, dict) + assert metrics["message_availability"] >= 1.0 + + +def test_curation_recommendation_with_critical_severity(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting.curation import CurationRecommendation + + rec = CurationRecommendation( + sequence="bad_sequence", + severity="critical", + category="quality", + message="Dataset unusable", + metric_value=0.1, + threshold=0.5, + action="exclude" + ) + + assert rec.severity == "critical" + assert rec.action == "exclude" + assert rec.metric_value < rec.threshold + + +def test_curation_recommendation_with_low_severity(): + """ + author: sayali + reviewer: xinxin + category: edge test + """ + from roboqa_temporal.health_reporting.curation import CurationRecommendation + + rec = CurationRecommendation( + sequence="ok_sequence", + severity="low", + category="completeness", + message="Minor data loss", + metric_value=0.58, + threshold=0.6, + action="monitor" + ) + + assert rec.severity == "low" + assert rec.action == "monitor" + assert rec.metric_value < rec.threshold diff --git a/tests/test_one_shot.py b/tests/test_one_shot.py index ba795c3..447f9ab 100644 --- a/tests/test_one_shot.py +++ b/tests/test_one_shot.py @@ -96,6 +96,119 @@ def test_preprocessor_downsample(): assert downsampled[0].num_points <= frame.num_points +def test_temporal_sync_validator_with_single_stream(): + """ + author: xinxin + reviewer: sayali + category: one-shot test + """ + preprocessor = Preprocessor(remove_outliers=True, max_points_for_outliers=500) + + # Create a frame with some outliers + points = np.random.rand(200, 3) * 10 + # Add some outliers + outliers = np.array([[100.0, 100.0, 100.0], [-100.0, -100.0, -100.0]]) + all_points = np.vstack([points, outliers]) + + frame = PointCloudFrame(timestamp=1000.0, frame_id="test", points=all_points) + processed = preprocessor.process_sequence([frame]) + + assert len(processed) == 1 + assert processed[0].num_points < frame.num_points + + +def test_sensor_stream_creation(): + """ + author: xinxin + reviewer: sayali + category: one-shot test + """ + from roboqa_temporal.synchronization import SensorStream + + # Create a simple sensor stream with synthetic timestamps + timestamps_ns = [1000000000, 1100000000, 1200000000, 1300000000] # 100ms intervals + stream = SensorStream( + name="test_camera", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.name == "test_camera" + assert len(stream.timestamps_ns) == 4 + assert stream.frequency_estimate_hz is not None + assert abs(stream.frequency_estimate_hz - 10.0) < 1.0 + + +def test_sensor_stream_with_missing_frames(): + """ + author: xinxin + reviewer: sayali + category: one-shot test + """ + from roboqa_temporal.synchronization import SensorStream + + # Create timestamps with a missing frame (gap) + timestamps_ns = [ + 1000000000, # t=0 + 1100000000, # t=0.1s + 1200000000, # t=0.2s + 1400000000, # t=0.4s (missing frame at 0.3s) + 1500000000, # t=0.5s + ] + stream = SensorStream( + name="test_lidar", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.metadata["missing_frames"] > 0 + + +def test_sensor_stream_with_duplicate_timestamps(): + """ + author: xinxin + reviewer: sayali + category: one-shot test + """ + from roboqa_temporal.synchronization import SensorStream + + # Create timestamps with duplicates + timestamps_ns = [ + 1000000000, + 1100000000, + 1100000000, # Duplicate + 1200000000, + ] + stream = SensorStream( + name="test_imu", + source_path="/fake/path", + timestamps_ns=timestamps_ns, + expected_frequency=10.0, + ) + + assert stream.metadata["duplicate_frames"] > 0 + + +def test_temporal_sync_validator_with_empty_streams(): + """ + author: xinxin + reviewer: sayali + category: one-shot test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + + validator = TemporalSyncValidator(auto_export_reports=False) + streams = {} + + report = validator.analyze_streams(streams, dataset_name="empty_test", include_visualizations=False) + + assert report is not None + assert len(report.streams) == 0 + assert len(report.pair_results) == 0 + + def test_preprocessor_remove_outliers(): """ author: architjain @@ -160,3 +273,268 @@ def test_detection_result_creation(): assert len(result.anomalies) == 1 assert result.health_metrics["overall_health_score"] == 0.8 assert len(result.frame_statistics) == 1 + + +def test_calibration_stream_creation(): + """ + author: dharinesh + reviewer: architjain + category: one-shot test + """ + from roboqa_temporal.fusion import CalibrationStream + + stream = CalibrationStream( + name="camera_lidar_pair", + image_paths=["/fake/img1.png", "/fake/img2.png"], + pointcloud_paths=["/fake/pc1.bin", "/fake/pc2.bin"], + calibration_file="/fake/calib.txt", + camera_id="cam_02", + lidar_id="velodyne" + ) + + assert stream.name == "camera_lidar_pair" + assert len(stream.image_paths) == 2 + assert len(stream.pointcloud_paths) == 2 + assert stream.camera_id == "cam_02" + + +def test_calibration_pair_result_creation(): + """ + author: dharinesh + reviewer: architjain + category: one-shot test + """ + from roboqa_temporal.fusion import CalibrationPairResult + + result = CalibrationPairResult( + geom_edge_score=0.85, + mutual_information=0.75, + contrastive_score=0.80, + pass_geom_edge=True, + pass_mi=True, + pass_contrastive=True, + overall_pass=True, + details={"frames_analyzed": 10} + ) + + assert result.geom_edge_score == 0.85 + assert result.overall_pass is True + assert result.details["frames_analyzed"] == 10 + + +def test_projection_error_frame_creation(): + """ + author: dharinesh + reviewer: architjain + category: one-shot test + """ + from roboqa_temporal.fusion import ProjectionErrorFrame + + error_frame = ProjectionErrorFrame( + frame_index=5, + timestamp=1500.0, + reprojection_error=2.5, + max_error_point=(100.0, 200.0), + projected_points_count=150, + error_trend="increasing" + ) + + assert error_frame.frame_index == 5 + assert error_frame.reprojection_error == 2.5 + assert error_frame.error_trend == "increasing" + assert error_frame.projected_points_count == 150 + + +def test_illumination_frame_creation(): + """ + author: dharinesh + reviewer: architjain + category: one-shot test + """ + from roboqa_temporal.fusion import IlluminationFrame + + illum_frame = IlluminationFrame( + frame_index=10, + timestamp=2000.0, + brightness_mean=128.5, + brightness_std=45.2, + contrast=0.65, + scene_change_score=0.3, + light_source_change=False + ) + + assert illum_frame.frame_index == 10 + assert illum_frame.brightness_mean == 128.5 + assert illum_frame.light_source_change is False + + +def test_moving_object_frame_creation(): + """ + author: dharinesh + reviewer: architjain + category: one-shot test + """ + from roboqa_temporal.fusion import MovingObjectFrame + + obj_frame = MovingObjectFrame( + frame_index=15, + timestamp=2500.0, + detected_objects=3, + detection_confidence=0.92, + consistency_score=0.88, + fusion_quality_score=0.85 + ) + + assert obj_frame.frame_index == 15 + assert obj_frame.detected_objects == 3 + assert obj_frame.detection_confidence == 0.92 + assert obj_frame.fusion_quality_score == 0.85 + + +def test_calibration_quality_validator_initialization(): + """ + author: dharinesh + reviewer: architjain + category: one-shot test + """ + from roboqa_temporal.fusion import CalibrationQualityValidator + + validator = CalibrationQualityValidator( + output_dir="reports/test_fusion", + config={"edge_threshold": 0.7} + ) + + assert validator.output_dir.name == "test_fusion" + assert validator.config["edge_threshold"] == 0.7 + + +def test_temporal_score_computation(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting import compute_temporal_score + import numpy as np + + # Create regular timestamps (10 Hz, 100ms intervals) + timestamps = np.arange(0, 1000, 100).astype('datetime64[ns]') + score = compute_temporal_score(timestamps) + + assert isinstance(score, float) + assert 0.0 <= score <= 1.0 + assert score > 0.8 + + +def test_temporal_score_irregular_timestamps(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting import compute_temporal_score + import numpy as np + + # Create very irregular timestamps with extreme gaps + timestamps = np.array([0, 100, 200, 2000, 2100], dtype='datetime64[ns]') + score = compute_temporal_score(timestamps) + + assert isinstance(score, float) + assert 0.0 <= score <= 1.0 + assert score < 0.9 + + +def test_anomaly_score_computation(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting import compute_anomaly_score + import numpy as np + + # Create timestamps with few anomalies + timestamps = np.arange(0, 1000, 100).astype('datetime64[ns]') + score = compute_anomaly_score(timestamps) + + assert isinstance(score, float) + assert 0.0 <= score <= 1.0 + assert score > 0.9 + + +def test_anomaly_score_with_outliers(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting import compute_anomaly_score + import numpy as np + + # Create timestamps with obvious outliers + base = np.arange(0, 1000, 100) + base[5] += 5000 # Large gap at index 5 + timestamps = base.astype('datetime64[ns]') + score = compute_anomaly_score(timestamps) + + assert isinstance(score, float) + assert 0.0 <= score <= 1.0 + + +def test_completeness_metrics_full_sequence(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + import numpy as np + + timestamps = np.arange(0, 1000, 100).astype('datetime64[ns]') + metrics = compute_completeness_metrics(timestamps, max_frames_in_sequence=10) + + assert isinstance(metrics, dict) + assert "message_availability" in metrics + assert "dropout_rate" in metrics + assert metrics["message_availability"] == 1.0 + + +def test_completeness_metrics_partial_sequence(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + import numpy as np + + # Only 5 frames out of possible 10 + timestamps = np.arange(0, 500, 100).astype('datetime64[ns]') + metrics = compute_completeness_metrics(timestamps, max_frames_in_sequence=10) + + assert isinstance(metrics, dict) + assert metrics["message_availability"] == 0.5 + + +def test_curation_recommendation_creation(): + """ + author: sayali + reviewer: xinxin + category: one-shot test + """ + from roboqa_temporal.health_reporting.curation import CurationRecommendation + + rec = CurationRecommendation( + sequence="test_sequence", + severity="high", + category="temporal", + message="Poor temporal regularity", + metric_value=0.45, + threshold=0.6, + action="review" + ) + + assert rec.sequence == "test_sequence" + assert rec.severity == "high" + assert rec.action == "review" + assert rec.metric_value < rec.threshold diff --git a/tests/test_pattern.py b/tests/test_pattern.py index 8d96615..46c160e 100644 --- a/tests/test_pattern.py +++ b/tests/test_pattern.py @@ -98,9 +98,176 @@ def test_pattern_selective_detection_workflow(): # Verify only requested detectors ran assert isinstance(result, DetectionResult) - if result.detector_results: - for key in result.detector_results.keys(): - assert key in ["spatial", "temporal"] + + +def test_pattern_sync_validator_workflow(): + """ + author: xinxin + reviewer: sayali + category: pattern test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + + # Create synthetic sensor streams with pairs that will be analyzed + camera_left_ts = list(range(0, 1000000000, 100000000)) # 10 Hz + lidar_ts = list(range(0, 1000000000, 100000000)) # 10 Hz + camera_right_ts = list(range(0, 1000000000, 100000000)) # 10 Hz + + streams = { + "camera_left": SensorStream( + name="camera_left", + source_path="/fake/camera_left", + timestamps_ns=camera_left_ts, + expected_frequency=10.0, + ), + "camera_right": SensorStream( + name="camera_right", + source_path="/fake/camera_right", + timestamps_ns=camera_right_ts, + expected_frequency=10.0, + ), + "lidar": SensorStream( + name="lidar", + source_path="/fake/lidar", + timestamps_ns=lidar_ts, + expected_frequency=10.0, + ), + } + + # Initialize validator + validator = TemporalSyncValidator(output_dir="reports/test_sync", auto_export_reports=False) + + # Analyze streams + report = validator.analyze_streams(streams, dataset_name="test_sync", include_visualizations=False) + + assert report is not None + assert len(report.streams) == 3 + assert len(report.pair_results) > 0 + + +def test_pattern_sync_validator_with_drift(): + """ + author: xinxin + reviewer: sayali + category: pattern test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + import numpy as np + + # Create streams with temporal drift + base_timestamps = np.arange(0, 1000000000, 100000000, dtype=np.int64) + camera_left_timestamps = list(base_timestamps) + # Add progressive drift to lidar (1ms per frame) + lidar_timestamps = list(base_timestamps + np.arange(len(base_timestamps)) * 1000000) + + streams = { + "camera_left": SensorStream( + name="camera_left", + source_path="/fake/camera", + timestamps_ns=camera_left_timestamps, + expected_frequency=10.0, + ), + "lidar": SensorStream( + name="lidar", + source_path="/fake/lidar", + timestamps_ns=lidar_timestamps, + expected_frequency=10.0, + ), + } + + validator = TemporalSyncValidator(auto_export_reports=False) + report = validator.analyze_streams(streams, dataset_name="test_drift", include_visualizations=False) + + assert len(report.pair_results) > 0 + for pair_result in report.pair_results.values(): + assert hasattr(pair_result, "drift_rate_ms_per_s") + + +def test_pattern_multi_sensor_synchronization(): + """ + author: xinxin + reviewer: sayali + category: pattern test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + + # Create multiple sensor streams + camera_left_ts = list(range(0, 1000000000, 100000000)) # 10 Hz + camera_right_ts = list(range(0, 1000000000, 100000000)) # 10 Hz + lidar_ts = list(range(0, 1000000000, 100000000)) # 10 Hz + imu_ts = list(range(0, 1000000000, 10000000)) # 100 Hz + + streams = { + "camera_left": SensorStream( + name="camera_left", + source_path="/fake/cam_left", + timestamps_ns=camera_left_ts, + expected_frequency=10.0, + ), + "camera_right": SensorStream( + name="camera_right", + source_path="/fake/cam_right", + timestamps_ns=camera_right_ts, + expected_frequency=10.0, + ), + "lidar": SensorStream( + name="lidar", + source_path="/fake/lidar", + timestamps_ns=lidar_ts, + expected_frequency=10.0, + ), + "imu": SensorStream( + name="imu", + source_path="/fake/imu", + timestamps_ns=imu_ts, + expected_frequency=100.0, + ), + } + + validator = TemporalSyncValidator(auto_export_reports=False) + report = validator.analyze_streams(streams, dataset_name="multi_sensor", include_visualizations=False) + + assert len(report.streams) == 4 + assert len(report.pair_results) > 0 + + +def test_pattern_sync_report_serialization(): + """ + author: xinxin + reviewer: sayali + category: pattern test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator, SensorStream + + timestamps_camera = list(range(0, 500000000, 100000000)) + timestamps_lidar = list(range(0, 500000000, 100000000)) + + streams = { + "camera_left": SensorStream( + name="camera_left", + source_path="/fake/camera", + timestamps_ns=timestamps_camera, + expected_frequency=10.0, + ), + "lidar": SensorStream( + name="lidar", + source_path="/fake/lidar", + timestamps_ns=timestamps_lidar, + expected_frequency=10.0, + ), + } + + validator = TemporalSyncValidator(auto_export_reports=False) + report = validator.analyze_streams(streams, dataset_name="test_serial", include_visualizations=False) + + # Test serialization + report_dict = report.to_dict() + + assert isinstance(report_dict, dict) + assert "streams" in report_dict + assert "metrics" in report_dict + assert "recommendations" in report_dict + assert "generated_at" in report_dict def test_pattern_iterative_detection(): @@ -229,3 +396,321 @@ def test_pattern_full_pipeline(): assert len(processed) > 0 assert isinstance(detection_result, DetectionResult) assert report is not None + + +def test_pattern_fusion_quality_assessment(): + """ + author: dharinesh + reviewer: architjain + category: pattern test + """ + from roboqa_temporal.fusion import ( + CalibrationQualityValidator, + CalibrationStream, + CalibrationPairResult + ) + + # Create a validator instance + validator = CalibrationQualityValidator( + output_dir="reports/test_fusion", + config={"edge_threshold": 0.7} + ) + + assert validator is not None + assert validator.output_dir.exists() + + +def test_pattern_calibration_stream_workflow(): + """ + author: dharinesh + reviewer: architjain + category: pattern test + """ + from roboqa_temporal.fusion import CalibrationStream + + # Create multiple calibration streams + streams = [] + for i in range(3): + stream = CalibrationStream( + name=f"pair_{i}", + image_paths=[f"/fake/img_{i}_{j}.png" for j in range(5)], + pointcloud_paths=[f"/fake/pc_{i}_{j}.bin" for j in range(5)], + calibration_file=f"/fake/calib_{i}.txt", + camera_id=f"cam_0{i}", + lidar_id="velodyne" + ) + streams.append(stream) + + # Verify streams + assert len(streams) == 3 + for stream in streams: + assert len(stream.image_paths) == 5 + assert len(stream.pointcloud_paths) == 5 + + +def test_pattern_projection_error_tracking(): + """ + author: dharinesh + reviewer: architjain + category: pattern test + """ + from roboqa_temporal.fusion import ProjectionErrorFrame + + # Simulate projection errors over time + error_frames = [] + for i in range(10): + # Simulate increasing error trend + error = 1.0 + i * 0.5 + error_frame = ProjectionErrorFrame( + frame_index=i, + timestamp=1000.0 + i * 100, + reprojection_error=error, + max_error_point=(float(i * 10), float(i * 20)), + projected_points_count=100 - i * 5, + error_trend="increasing" if i > 5 else "stable" + ) + error_frames.append(error_frame) + + assert len(error_frames) == 10 + assert error_frames[0].reprojection_error < error_frames[-1].reprojection_error + assert sum(1 for f in error_frames if f.error_trend == "increasing") > 0 + + +def test_pattern_illumination_detection(): + """ + author: dharinesh + reviewer: architjain + category: pattern test + """ + from roboqa_temporal.fusion import IlluminationFrame + + # Simulate illumination changes + illum_frames = [] + for i in range(8): + brightness = 100.0 + np.random.randn() * 20.0 + illum_frame = IlluminationFrame( + frame_index=i, + timestamp=1000.0 + i * 100, + brightness_mean=brightness, + brightness_std=15.0 + np.random.randn() * 5.0, + contrast=0.6 + np.random.rand() * 0.2, + scene_change_score=0.1 if i < 4 else 0.8, + light_source_change=(i == 4) + ) + illum_frames.append(illum_frame) + + assert len(illum_frames) == 8 + assert sum(1 for f in illum_frames if f.light_source_change) == 1 + assert any(f.scene_change_score > 0.5 for f in illum_frames) + + +def test_pattern_moving_object_consistency(): + """ + author: dharinesh + reviewer: architjain + category: pattern test + """ + from roboqa_temporal.fusion import MovingObjectFrame + + # Simulate moving object detection + obj_frames = [] + for i in range(12): + obj_frame = MovingObjectFrame( + frame_index=i, + timestamp=1000.0 + i * 100, + detected_objects=2 + (i % 3), + detection_confidence=0.8 + np.random.rand() * 0.15, + consistency_score=0.85 if i < 6 else 0.70, + fusion_quality_score=0.80 + np.random.rand() * 0.1 + ) + obj_frames.append(obj_frame) + + assert len(obj_frames) == 12 + avg_confidence = np.mean([f.detection_confidence for f in obj_frames]) + assert 0.8 <= avg_confidence <= 1.0 + assert all(f.detected_objects > 0 for f in obj_frames) + + +def test_pattern_fusion_quality_report_generation(): + """ + author: dharinesh + reviewer: architjain + category: pattern test + """ + from roboqa_temporal.fusion import ( + CalibrationQualityReport, + CalibrationPairResult, + ProjectionErrorFrame + ) + + # Create sample pair result + pair_result = CalibrationPairResult( + geom_edge_score=0.82, + mutual_information=0.78, + contrastive_score=0.80, + pass_geom_edge=True, + pass_mi=True, + pass_contrastive=True, + overall_pass=True, + details={"frames": 10} + ) + + # Create sample projection errors + proj_errors = [ + ProjectionErrorFrame( + frame_index=i, + timestamp=1000.0 + i * 100, + reprojection_error=2.0 + i * 0.1, + projected_points_count=100 + ) + for i in range(5) + ] + + # Create report + report = CalibrationQualityReport( + dataset_name="test_dataset", + metrics={"overall_score": 0.85}, + pair_results={"cam_lidar": pair_result}, + projection_errors=proj_errors, + illumination_changes=[], + moving_objects=[], + recommendations=["Calibration is stable"], + parameter_file="/fake/params.yaml" + ) + + assert report.dataset_name == "test_dataset" + assert len(report.pair_results) == 1 + assert len(report.projection_errors) == 5 + assert len(report.recommendations) == 1 + + +def test_pattern_health_scoring_workflow(): + """ + author: sayali + reviewer: xinxin + category: pattern test + """ + from roboqa_temporal.health_reporting import ( + compute_temporal_score, + compute_anomaly_score, + compute_completeness_metrics + ) + import numpy as np + + # Create sensor timestamps + camera_ts = np.arange(0, 2000, 100).astype('datetime64[ns]') + lidar_ts = np.arange(0, 2000, 100).astype('datetime64[ns]') + imu_ts = np.arange(0, 2000, 10).astype('datetime64[ns]') + + # Compute temporal health + camera_temporal = compute_temporal_score(camera_ts) + lidar_temporal = compute_temporal_score(lidar_ts) + imu_temporal = compute_temporal_score(imu_ts) + + # Compute anomaly detection + camera_anomaly = compute_anomaly_score(camera_ts) + lidar_anomaly = compute_anomaly_score(lidar_ts) + + # Verify scores + assert 0.0 <= camera_temporal <= 1.0 + assert 0.0 <= lidar_temporal <= 1.0 + assert 0.0 <= camera_anomaly <= 1.0 + assert camera_temporal > 0.8 + + +def test_pattern_completeness_tracking(): + """ + author: sayali + reviewer: xinxin + category: pattern test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + import numpy as np + + # Simulate multiple sequences with varying completeness + sequences_data = [] + for seq_id in range(3): + # Each sequence has different frame counts + max_frames = 100 + n_frames = 100 - (seq_id * 20) + timestamps = np.arange(0, n_frames * 100, 100).astype('datetime64[ns]') + metrics = compute_completeness_metrics(timestamps, max_frames) + sequences_data.append(metrics) + + assert len(sequences_data) == 3 + assert sequences_data[0]["message_availability"] == 1.0 + assert sequences_data[1]["message_availability"] == 0.8 + assert sequences_data[2]["message_availability"] == 0.6 + + +def test_pattern_curation_recommendation_generation(): + """ + author: sayali + reviewer: xinxin + category: pattern test + """ + from roboqa_temporal.health_reporting import generate_curation_recommendations + import pandas as pd + + # Create sample per-sequence data + df_per_sequence = pd.DataFrame({ + "sequence": ["seq_1", "seq_2", "seq_3"], + "temporal_score": [0.9, 0.5, 0.2], + "anomaly_score": [0.95, 0.7, 0.3], + "dim_timeliness": [0.92, 0.6, 0.25], + "dim_completeness": [0.95, 0.85, 0.4], + "overall_quality_score": [0.92, 0.65, 0.3], + }) + + df_per_sensor = pd.DataFrame() + + # Generate recommendations + recs = generate_curation_recommendations( + df_per_sensor, + df_per_sequence, + temporal_threshold=0.6, + completeness_threshold=0.6, + quality_threshold=0.5 + ) + + assert isinstance(recs, list) + assert len(recs) > 0 + seq_names = [r.sequence for r in recs] + assert "seq_2" in seq_names or "seq_3" in seq_names + + +def test_pattern_multi_sensor_health_assessment(): + """ + author: sayali + reviewer: xinxin + category: pattern test + """ + from roboqa_temporal.health_reporting import ( + compute_temporal_score, + compute_completeness_metrics + ) + import numpy as np + + # Simulate multi-sensor sequence + sensors = { + "camera_left": np.arange(0, 2000, 100).astype('datetime64[ns]'), + "camera_right": np.arange(0, 2000, 100).astype('datetime64[ns]'), + "lidar": np.arange(0, 2000, 100).astype('datetime64[ns]'), + "imu": np.arange(0, 2000, 10).astype('datetime64[ns]'), + } + + results = {} + max_frames = 20 + + for sensor_name, timestamps in sensors.items(): + temporal = compute_temporal_score(timestamps) + completeness = compute_completeness_metrics(timestamps, max_frames) + results[sensor_name] = { + "temporal": temporal, + "completeness": completeness["message_availability"] + } + + assert len(results) == 4 + for sensor_name, metrics in results.items(): + assert 0.0 <= metrics["temporal"] <= 1.0 + assert 0.0 <= metrics["completeness"] <= 1.0 diff --git a/tests/test_smoke.py b/tests/test_smoke.py index fc2925d..f702df0 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -69,3 +69,210 @@ def test_report_generator_instantiation(): from roboqa_temporal.reporting import ReportGenerator generator = ReportGenerator() assert generator is not None + + +def test_temporal_sync_validator_importable(): + """ + author: xinxin + reviewer: sayali + category: smoke test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator + assert TemporalSyncValidator is not None + + +def test_temporal_sync_validator_instantiation(): + """ + author: xinxin + reviewer: sayali + category: smoke test + """ + from roboqa_temporal.synchronization import TemporalSyncValidator + validator = TemporalSyncValidator() + assert validator is not None + assert hasattr(validator, 'validate') + assert hasattr(validator, 'analyze_streams') + + +def test_sensor_stream_importable(): + """ + author: xinxin + reviewer: sayali + category: smoke test + """ + from roboqa_temporal.synchronization import SensorStream + assert SensorStream is not None + + +def test_temporal_sync_report_importable(): + """ + author: xinxin + reviewer: sayali + category: smoke test + """ + from roboqa_temporal.synchronization import TemporalSyncReport + assert TemporalSyncReport is not None + + +def test_pairwise_drift_result_importable(): + """ + author: xinxin + reviewer: sayali + category: smoke test + """ + from roboqa_temporal.synchronization import PairwiseDriftResult + assert PairwiseDriftResult is not None + + +def test_calibration_quality_validator_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import CalibrationQualityValidator + assert CalibrationQualityValidator is not None + + +def test_calibration_quality_validator_instantiation(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import CalibrationQualityValidator + validator = CalibrationQualityValidator(output_dir="reports/test_fusion") + assert validator is not None + assert hasattr(validator, 'analyze_dataset') + + +def test_calibration_stream_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import CalibrationStream + assert CalibrationStream is not None + + +def test_calibration_quality_report_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import CalibrationQualityReport + assert CalibrationQualityReport is not None + + +def test_calibration_pair_result_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import CalibrationPairResult + assert CalibrationPairResult is not None + + +def test_projection_error_frame_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import ProjectionErrorFrame + assert ProjectionErrorFrame is not None + + +def test_illumination_frame_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import IlluminationFrame + assert IlluminationFrame is not None + + +def test_moving_object_frame_importable(): + """ + author: dharinesh + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.fusion import MovingObjectFrame + assert MovingObjectFrame is not None + + +def test_health_check_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting import run_health_check + assert run_health_check is not None + + +def test_temporal_score_computation_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting import compute_temporal_score + assert compute_temporal_score is not None + + +def test_anomaly_score_computation_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting import compute_anomaly_score + assert compute_anomaly_score is not None + + +def test_completeness_metrics_computation_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting import compute_completeness_metrics + assert compute_completeness_metrics is not None + + +def test_curation_recommendation_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting.curation import CurationRecommendation + assert CurationRecommendation is not None + + +def test_curation_recommendations_function_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting import generate_curation_recommendations + assert generate_curation_recommendations is not None + + +def test_export_functions_importable(): + """ + author: sayali + reviewer: xinxin + category: smoke test + """ + from roboqa_temporal.health_reporting import export_csv, export_json, export_yaml + assert export_csv is not None + assert export_json is not None + assert export_yaml is not None