diff --git a/cosmos_framework/scripts/curator_to_sft_jsonl.py b/cosmos_framework/scripts/curator_to_sft_jsonl.py new file mode 100644 index 0000000..e70d046 --- /dev/null +++ b/cosmos_framework/scripts/curator_to_sft_jsonl.py @@ -0,0 +1,350 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: OpenMDW-1.1 +"""Convert cosmos-curator splitting-pipeline outputs into the SFT training JSONL format. + +The SFT dataset loader (``sft_dataset.py``) expects each JSONL line to have:: + + uuid, duration, width, height, vision_path, t2w_windows + +where ``t2w_windows`` is a list of ``{start_frame, end_frame, temporal_interval, caption}``. + +Curator writes a richer schema per clip at +``/metas_jsonl/v0/*.jsonl``. This script renames and trims those +rows into the loader's format, applies the same hard filters the loader applies +silently at train time (so dataset counts match), and writes a sidecar +``.summary.json`` with per-reason drop counts. + +Usage +----- + python -m cosmos_framework.scripts.curator_to_sft_jsonl \\ + --curator-output outputs/curator_split/ \\ + -o outputs/curator_split/cosmos3_sft.jsonl + + # With explicit caption-model preference (e.g. when curator ran with + # both qwen captioning and qwen_lm enhancement): + python -m cosmos_framework.scripts.curator_to_sft_jsonl \\ + --curator-output outputs/curator_split/ \\ + -o outputs/curator_split/cosmos3_sft.jsonl \\ + --caption-model qwen \\ + --enhanced-caption-model qwen_lm + +Curator must have been run with ``--upload-clip-info-in-chunks`` so that +``metas_jsonl/v0/`` is populated; otherwise the converter has no input. +""" + +import json +import os +import sys +from collections import Counter +from collections.abc import Iterator +from pathlib import Path +from typing import Annotated, Any + +import tyro + +# Hard filters mirror sft_dataset.py defaults so the converter drops the same +# rows the loader would silently drop at train time. +MAX_VIDEO_DURATION_S: float = 61.0 +MIN_WINDOW_FRAMES: int = 61 +DEFAULT_TEMPORAL_INTERVAL: int = 1 + +# Suffixes curator writes for captions / enhanced captions in metas_jsonl rows. +_CAPTION_SUFFIX = "_caption" +_ENHANCED_SUFFIX = "_enhanced_caption" + + +def _relativize_vision_path(vision_path: str, output_jsonl: Path) -> str: + """Rewrite ``vision_path`` relative to the output JSONL's directory. + + The SFT loader resolves relative paths against the JSONL's directory + (``sft_dataset.py:548-550``). Curator's ``clip_location`` is typically an + absolute filesystem path, which the loader also accepts but which doesn't + survive moving the dataset to a different mount or container. + + Behavior: + + - URIs containing ``://`` (e.g. ``s3://bucket/key``) pass through unchanged. + - Absolute or relative filesystem paths are rewritten relative to + ``output_jsonl.parent``. ``os.path.relpath`` will emit ``../`` segments if + the clip lives outside the JSONL's parent tree; that still satisfies the + loader, which simply joins the two. + """ + if "://" in vision_path: + return vision_path + return os.path.relpath(vision_path, start=output_jsonl.parent) + + +def _iter_metas_jsonl_files(curator_output: Path) -> Iterator[Path]: + """Find curator metas_jsonl files under the given curator output root. + + Accepts either the splitting-pipeline output root (recommended) or the + ``metas_jsonl/v0/`` directory itself. + """ + nested = curator_output / "metas_jsonl" / "v0" + if nested.is_dir(): + yield from sorted(nested.glob("*.jsonl")) + return + if curator_output.is_dir(): + direct = sorted(curator_output.glob("*.jsonl")) + if direct: + yield from direct + return + # Nothing matched. Caller treats an empty stream as a fatal error. + + +def _resolve_window_caption( + window: dict[str, Any], + *, + caption_model: str | None, + enhanced_caption_model: str | None, +) -> str | None: + """Pick a single caption text for a window using a deterministic chain. + + Resolution order: + + 1. ``{enhanced_caption_model}_enhanced_caption`` when configured and non-empty. + 2. ``{caption_model}_caption`` when configured and non-empty. + 3. First non-empty ``*_enhanced_caption`` value (alphabetical key). + 4. First non-empty ``*_caption`` value (alphabetical key). + 5. ``None`` — caller drops the window. + """ + if enhanced_caption_model: + candidate = (window.get(f"{enhanced_caption_model}{_ENHANCED_SUFFIX}") or "").strip() + if candidate: + return candidate + + if caption_model: + candidate = (window.get(f"{caption_model}{_CAPTION_SUFFIX}") or "").strip() + if candidate: + return candidate + + for key in sorted(window.keys()): + if key.endswith(_ENHANCED_SUFFIX): + candidate = (window.get(key) or "").strip() + if candidate: + return candidate + + for key in sorted(window.keys()): + if key.endswith(_CAPTION_SUFFIX) and not key.endswith(_ENHANCED_SUFFIX): + candidate = (window.get(key) or "").strip() + if candidate: + return candidate + + return None + + +def _compute_duration(record: dict[str, Any]) -> float | None: + """Derive clip duration in seconds from a curator row. + + Prefer ``num_frames / framerate`` for accuracy; fall back to + ``duration_span[1] - duration_span[0]`` if the post-transcode metadata is + missing. + """ + num_frames = record.get("num_frames") + framerate = record.get("framerate") + if isinstance(num_frames, int) and isinstance(framerate, int | float) and framerate > 0: + return float(num_frames) / float(framerate) + span = record.get("duration_span") + if isinstance(span, list | tuple) and len(span) == 2: + try: + return float(span[1]) - float(span[0]) + except (TypeError, ValueError): + return None + return None + + +def _build_sft_row( + record: dict[str, Any], + *, + caption_model: str | None, + enhanced_caption_model: str | None, + min_short_edge: int, + min_window_frames: int, + max_duration_s: float, + temporal_interval: int, +) -> tuple[dict[str, Any] | None, str | None]: + """Translate a curator metas_jsonl row into an SFT row, or report a drop reason. + + Returns ``(row, None)`` for kept records and ``(None, reason)`` for drops. + """ + clip_uuid = record.get("span_uuid") + clip_location = record.get("clip_location") + width = record.get("width") + height = record.get("height") + num_frames = record.get("num_frames") + framerate = record.get("framerate") + duration_s = _compute_duration(record) + + if not clip_uuid or not clip_location: + return None, "missing_identity" + if width is None or height is None or num_frames is None or framerate is None or duration_s is None: + return None, "missing_clip_metadata" + if duration_s > max_duration_s: + return None, "duration_too_long" + if min_short_edge > 0 and min(int(width), int(height)) < min_short_edge: + return None, "short_edge_too_small" + + windows = record.get("windows") or [] + t2w_windows: list[dict[str, Any]] = [] + for window in windows: + start_frame = window.get("start_frame") + end_frame = window.get("end_frame") + if not isinstance(start_frame, int) or not isinstance(end_frame, int): + continue + frames_in_window = end_frame - start_frame + 1 + if frames_in_window < min_window_frames: + continue + caption_text = _resolve_window_caption( + window, + caption_model=caption_model, + enhanced_caption_model=enhanced_caption_model, + ) + if caption_text is None: + continue + t2w_windows.append( + { + "start_frame": start_frame, + "end_frame": end_frame, + "temporal_interval": temporal_interval, + "caption": caption_text, + } + ) + + if not t2w_windows: + return None, "no_valid_window" + + row: dict[str, Any] = { + "uuid": str(clip_uuid), + "duration": float(duration_s), + "width": int(width), + "height": int(height), + "nb_frames": int(num_frames), + "framerate": float(framerate), + "vision_path": str(clip_location), + "t2w_windows": t2w_windows, + } + return row, None + + +def main( # noqa: PLR0913 + curator_output: Annotated[ + Path, + tyro.conf.arg(help="Curator splitting-pipeline output root (the dir containing metas_jsonl/v0/)."), + ], + output: Annotated[Path, tyro.conf.arg(aliases=("-o",), help="Output JSONL path.")], + caption_model: Annotated[ + str | None, + tyro.conf.arg( + help="Curator caption-model name (e.g. 'qwen'); used to pick {model}_caption fields. " + "Defaults to the first *_caption key encountered when unset.", + ), + ] = None, + enhanced_caption_model: Annotated[ + str | None, + tyro.conf.arg( + help="Curator enhancement-model name (e.g. 'qwen_lm'); used to pick " + "{model}_enhanced_caption fields. Preferred over caption_model when both are present.", + ), + ] = None, + min_short_edge: Annotated[ + int, + tyro.conf.arg(help="Drop clips whose shortest spatial edge is below this value. 0 disables."), + ] = 0, + min_window_frames: Annotated[ + int, + tyro.conf.arg( + help=f"Drop windows shorter than this. Default {MIN_WINDOW_FRAMES} matches sft_dataset.py.", + ), + ] = MIN_WINDOW_FRAMES, + max_duration_s: Annotated[ + float, + tyro.conf.arg( + help=f"Drop clips longer than this. Default {MAX_VIDEO_DURATION_S} matches sft_dataset.py.", + ), + ] = MAX_VIDEO_DURATION_S, + temporal_interval: Annotated[ + int, + tyro.conf.arg(help="temporal_interval to record on every t2w_window."), + ] = DEFAULT_TEMPORAL_INTERVAL, +) -> None: + """Build an SFT JSONL from a curator splitting-pipeline output directory.""" + jsonl_files = list(_iter_metas_jsonl_files(curator_output)) + if not jsonl_files: + print( + f"ERROR: No metas_jsonl files found under {curator_output}. " + "Re-run the curator splitting pipeline with --upload-clip-info-in-chunks.", + file=sys.stderr, + ) + sys.exit(1) + + kept_rows: list[dict[str, Any]] = [] + drops: Counter[str] = Counter() + seen_records = 0 + + for jsonl_path in jsonl_files: + with jsonl_path.open("r") as src: + for line in src: + line = line.strip() + if not line: + continue + seen_records += 1 + try: + record = json.loads(line) + except json.JSONDecodeError as exc: + print(f" SKIP malformed JSON in {jsonl_path}: {exc}") + drops["malformed_json"] += 1 + continue + row, reason = _build_sft_row( + record, + caption_model=caption_model, + enhanced_caption_model=enhanced_caption_model, + min_short_edge=min_short_edge, + min_window_frames=min_window_frames, + max_duration_s=max_duration_s, + temporal_interval=temporal_interval, + ) + if row is None: + drops[reason or "unknown"] += 1 + continue + row["vision_path"] = _relativize_vision_path(row["vision_path"], output) + kept_rows.append(row) + + output.parent.mkdir(parents=True, exist_ok=True) + with output.open("w") as dst: + for row in kept_rows: + dst.write(json.dumps(row) + "\n") + + summary = { + "curator_output": str(curator_output), + "output_jsonl": str(output), + "shards_read": len(jsonl_files), + "records_seen": seen_records, + "records_kept": len(kept_rows), + "records_dropped": sum(drops.values()), + "drops_by_reason": dict(drops), + "filters": { + "max_duration_s": max_duration_s, + "min_window_frames": min_window_frames, + "min_short_edge": min_short_edge, + }, + "caption_model": caption_model, + "enhanced_caption_model": enhanced_caption_model, + "temporal_interval": temporal_interval, + } + summary_path = output.with_suffix(output.suffix + ".summary.json") + summary_path.write_text(json.dumps(summary, indent=2) + "\n") + + print(f"Read {seen_records} records from {len(jsonl_files)} shard(s) under {curator_output}") + print(f"Wrote {len(kept_rows)} records → {output}") + if drops: + print("Drops by reason:") + for reason, count in sorted(drops.items(), key=lambda kv: (-kv[1], kv[0])): + print(f" {reason}: {count}") + print(f"Summary: {summary_path}") + if not kept_rows: + print("ERROR: No valid records written.", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + tyro.cli(main) diff --git a/cosmos_framework/scripts/curator_to_sft_jsonl_test.py b/cosmos_framework/scripts/curator_to_sft_jsonl_test.py new file mode 100644 index 0000000..16845e0 --- /dev/null +++ b/cosmos_framework/scripts/curator_to_sft_jsonl_test.py @@ -0,0 +1,583 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: OpenMDW-1.1 +"""Unit tests for the curator -> SFT JSONL converter.""" + +import json +from pathlib import Path +from typing import Any + +import pytest + +from cosmos_framework.scripts.curator_to_sft_jsonl import ( + DEFAULT_TEMPORAL_INTERVAL, + MAX_VIDEO_DURATION_S, + MIN_WINDOW_FRAMES, + _build_sft_row, + _relativize_vision_path, + _resolve_window_caption, + main, +) + +# Pick window-frame bounds well above the loader's 61-frame floor so test +# fixtures can pass / fail the filter for the reason a test actually targets. +_PASSING_WINDOW = (0, MIN_WINDOW_FRAMES + 10) +_SHORT_WINDOW = (0, MIN_WINDOW_FRAMES - 10) + + +def _make_record( + *, + span_uuid: str = "clip-uuid-0", + clip_location: str = "/out/clips/clip-uuid-0.mp4", + width: int = 256, + height: int = 256, + num_frames: int = 120, + framerate: float = 24.0, + windows: list[dict[str, Any]] | None = None, + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build a minimal curator metas_jsonl row used by the test fixtures.""" + record: dict[str, Any] = { + "span_uuid": span_uuid, + "source_video": "/in/video.mp4", + "duration_span": [0.0, 5.0], + "clip_location": clip_location, + "width": width, + "height": height, + "num_frames": num_frames, + "framerate": framerate, + "windows": windows + if windows is not None + else [ + { + "start_frame": _PASSING_WINDOW[0], + "end_frame": _PASSING_WINDOW[1], + "qwen_caption": "A robotic arm grasping a cube.", + } + ], + } + if extra: + record.update(extra) + return record + + +def _default_kwargs() -> dict[str, Any]: + return { + "caption_model": None, + "enhanced_caption_model": None, + "min_short_edge": 0, + "min_window_frames": MIN_WINDOW_FRAMES, + "max_duration_s": MAX_VIDEO_DURATION_S, + "temporal_interval": DEFAULT_TEMPORAL_INTERVAL, + } + + +@pytest.mark.L0 +def test_resolve_window_caption_prefers_enhanced_then_base() -> None: + window = { + "qwen_caption": "base text", + "qwen_lm_enhanced_caption": "enhanced text", + } + assert _resolve_window_caption(window, caption_model="qwen", enhanced_caption_model="qwen_lm") == "enhanced text" + + +@pytest.mark.L0 +def test_resolve_window_caption_falls_back_to_base_when_enhanced_missing() -> None: + window = {"qwen_caption": "base text"} + assert _resolve_window_caption(window, caption_model="qwen", enhanced_caption_model="qwen_lm") == "base text" + + +@pytest.mark.L0 +def test_resolve_window_caption_alphabetical_fallback_when_unspecified() -> None: + window = { + "nemotron_caption": "nemo text", + "qwen_caption": "qwen text", + } + assert _resolve_window_caption(window, caption_model=None, enhanced_caption_model=None) == "nemo text" + + +@pytest.mark.L0 +def test_resolve_window_caption_returns_none_when_empty() -> None: + window = {"qwen_caption": " "} + assert _resolve_window_caption(window, caption_model="qwen", enhanced_caption_model=None) is None + + +@pytest.mark.L0 +def test_build_sft_row_happy_path_emits_loader_schema() -> None: + record = _make_record() + row, reason = _build_sft_row(record, **_default_kwargs()) + assert reason is None + assert row == { + "uuid": "clip-uuid-0", + "duration": pytest.approx(120 / 24.0), + "width": 256, + "height": 256, + "nb_frames": 120, + "framerate": 24.0, + "vision_path": "/out/clips/clip-uuid-0.mp4", + "t2w_windows": [ + { + "start_frame": _PASSING_WINDOW[0], + "end_frame": _PASSING_WINDOW[1], + "temporal_interval": DEFAULT_TEMPORAL_INTERVAL, + "caption": "A robotic arm grasping a cube.", + } + ], + } + + +@pytest.mark.L0 +def test_build_sft_row_drops_clip_longer_than_max_duration() -> None: + # 120 frames at 1 fps = 120 s > 61 s. + record = _make_record(num_frames=120, framerate=1.0) + row, reason = _build_sft_row(record, **_default_kwargs()) + assert row is None + assert reason == "duration_too_long" + + +@pytest.mark.L0 +def test_build_sft_row_keeps_clip_at_exactly_max_duration() -> None: + # 61 frames at 1 fps = 61.0 s, must pass because loader uses strict >. + record = _make_record( + num_frames=61, + framerate=1.0, + windows=[ + { + "start_frame": 0, + "end_frame": MIN_WINDOW_FRAMES - 1, + "qwen_caption": "boundary", + } + ], + ) + row, reason = _build_sft_row(record, **_default_kwargs()) + assert reason is None + assert row is not None + assert row["duration"] == pytest.approx(MAX_VIDEO_DURATION_S) + + +@pytest.mark.L0 +def test_build_sft_row_drops_when_short_edge_below_threshold() -> None: + record = _make_record(width=128, height=512) + kwargs = _default_kwargs() | {"min_short_edge": 256} + row, reason = _build_sft_row(record, **kwargs) + assert row is None + assert reason == "short_edge_too_small" + + +@pytest.mark.L0 +def test_build_sft_row_drops_when_all_windows_too_short() -> None: + record = _make_record( + windows=[ + { + "start_frame": _SHORT_WINDOW[0], + "end_frame": _SHORT_WINDOW[1], + "qwen_caption": "short window", + } + ], + ) + row, reason = _build_sft_row(record, **_default_kwargs()) + assert row is None + assert reason == "no_valid_window" + + +@pytest.mark.L0 +def test_build_sft_row_drops_when_no_window_has_caption() -> None: + record = _make_record( + windows=[ + { + "start_frame": _PASSING_WINDOW[0], + "end_frame": _PASSING_WINDOW[1], + # no caption keys at all + } + ], + ) + row, reason = _build_sft_row(record, **_default_kwargs()) + assert row is None + assert reason == "no_valid_window" + + +@pytest.mark.L0 +def test_build_sft_row_keeps_only_windows_with_captions() -> None: + record = _make_record( + windows=[ + { + "start_frame": _PASSING_WINDOW[0], + "end_frame": _PASSING_WINDOW[1], + "qwen_caption": "kept", + }, + { + "start_frame": _PASSING_WINDOW[1] + 1, + "end_frame": _PASSING_WINDOW[1] + 1 + (_PASSING_WINDOW[1] - _PASSING_WINDOW[0]), + # no caption -> dropped window, but row should survive. + }, + ], + ) + row, reason = _build_sft_row(record, **_default_kwargs()) + assert reason is None + assert row is not None + assert len(row["t2w_windows"]) == 1 + assert row["t2w_windows"][0]["caption"] == "kept" + + +@pytest.mark.L0 +def test_build_sft_row_duration_falls_back_to_span_when_clip_metadata_missing() -> None: + record = _make_record(num_frames=None, framerate=None, extra={"duration_span": [0.0, 4.5]}) + # Without num_frames we can't emit a valid SFT row (loader expects nb_frames), + # so the row should be dropped for missing_clip_metadata, not duration math. + row, reason = _build_sft_row(record, **_default_kwargs()) + assert row is None + assert reason == "missing_clip_metadata" + + +@pytest.mark.L0 +def test_relativize_vision_path_rewrites_filesystem_path_relative_to_jsonl_dir(tmp_path: Path) -> None: + """Filesystem paths get rewritten relative to the output JSONL's parent dir.""" + output_jsonl = tmp_path / "out" / "cosmos3_sft.jsonl" + output_jsonl.parent.mkdir(parents=True) + clip = str(tmp_path / "out" / "clips" / "abc.mp4") + assert _relativize_vision_path(clip, output_jsonl) == "clips/abc.mp4" + + +@pytest.mark.L0 +def test_relativize_vision_path_emits_parent_segments_when_clip_outside_jsonl_tree(tmp_path: Path) -> None: + """Clip in a sibling tree is still expressible as a relative path.""" + output_jsonl = tmp_path / "outputs" / "cosmos3_sft.jsonl" + output_jsonl.parent.mkdir(parents=True) + clip = str(tmp_path / "curator_out" / "clips" / "abc.mp4") + assert _relativize_vision_path(clip, output_jsonl) == "../curator_out/clips/abc.mp4" + + +@pytest.mark.L0 +def test_relativize_vision_path_passes_through_uris_unchanged() -> None: + """s3://, gs://, https:// — anything with a scheme — must not be rewritten.""" + output_jsonl = Path("/anywhere/out.jsonl") + assert _relativize_vision_path("s3://bucket/clips/abc.mp4", output_jsonl) == "s3://bucket/clips/abc.mp4" + assert _relativize_vision_path("gs://bucket/clips/abc.mp4", output_jsonl) == "gs://bucket/clips/abc.mp4" + assert ( + _relativize_vision_path("https://cdn.example.com/clips/abc.mp4", output_jsonl) + == "https://cdn.example.com/clips/abc.mp4" + ) + + +@pytest.mark.L0 +def test_main_emits_relative_vision_paths(tmp_path: Path) -> None: + """End-to-end: written JSONL rows must carry relative paths the loader can resolve. + + The cosmos team requested relative paths so the loader's + relative-to-JSONL branch (sft_dataset.py:548-550) fires and datasets stay + portable across mount points. + """ + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + record = _make_record(clip_location=str(curator_output / "clips" / "abc.mp4")) + (metas_dir / "shard_0.jsonl").write_text(json.dumps(record) + "\n") + + output = curator_output / "cosmos3_sft.jsonl" + main( + curator_output=curator_output, + output=output, + caption_model="qwen", + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + + rows = [json.loads(line) for line in output.read_text().splitlines() if line] + assert len(rows) == 1 + # JSONL lives at curator_out/cosmos3_sft.jsonl; clip lives at curator_out/clips/abc.mp4. + # Relative form must be just "clips/abc.mp4" — not absolute, not starting with "./". + assert rows[0]["vision_path"] == "clips/abc.mp4" + + +@pytest.mark.L0 +def test_main_end_to_end_writes_jsonl_and_summary(tmp_path: Path) -> None: + """End-to-end: drop a curator-style metas_jsonl fixture, run main(), assert outputs.""" + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + + kept = _make_record(span_uuid="kept-1") + too_long = _make_record(span_uuid="too-long", num_frames=120, framerate=1.0) + no_caption = _make_record( + span_uuid="no-caption", + windows=[ + { + "start_frame": _PASSING_WINDOW[0], + "end_frame": _PASSING_WINDOW[1], + } + ], + ) + shard_path = metas_dir / "video-uuid_0.jsonl" + shard_path.write_text( + "\n".join(json.dumps(r) for r in [kept, too_long, no_caption]) + "\n", + ) + + output = curator_output / "cosmos3_sft.jsonl" + main( + curator_output=curator_output, + output=output, + caption_model="qwen", + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + + assert output.exists() + rows = [json.loads(line) for line in output.read_text().splitlines() if line] + assert len(rows) == 1 + assert rows[0]["uuid"] == "kept-1" + assert rows[0]["t2w_windows"][0]["caption"] == "A robotic arm grasping a cube." + + summary_path = output.with_suffix(output.suffix + ".summary.json") + assert summary_path.exists() + summary = json.loads(summary_path.read_text()) + assert summary["records_seen"] == 3 + assert summary["records_kept"] == 1 + assert summary["drops_by_reason"]["duration_too_long"] == 1 + assert summary["drops_by_reason"]["no_valid_window"] == 1 + + +@pytest.mark.L0 +def test_main_partial_split_keeps_exactly_the_passing_rows(tmp_path: Path) -> None: + """A duration filter must split the batch — not drop everything, not pass everything. + + Guards against accidental drop-all / pass-all regressions in the filter logic. + Mirrors the bridge-sample dry-run shape: 5 rows, 3 short enough to pass at + --max-duration-s 5.0 and 2 too long; verify the exact uuids in each bucket. + """ + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + + # Build a mix where exactly 3 of 5 pass --max-duration-s 5.0. + # All 5 must have windows long enough to clear the min_window_frames filter + # so the only reason for drops is duration. + long_window = [ + { + "start_frame": 0, + "end_frame": MIN_WINDOW_FRAMES + 5, + "qwen_caption": "kept", + } + ] + rows = [ + _make_record(span_uuid="short-1", num_frames=80, framerate=24.0, windows=long_window), # 3.3s — keep + _make_record(span_uuid="long-1", num_frames=240, framerate=24.0, windows=long_window), # 10.0s — drop + _make_record(span_uuid="short-2", num_frames=96, framerate=24.0, windows=long_window), # 4.0s — keep + _make_record(span_uuid="long-2", num_frames=300, framerate=24.0, windows=long_window), # 12.5s — drop + _make_record(span_uuid="short-3", num_frames=72, framerate=24.0, windows=long_window), # 3.0s — keep + ] + (metas_dir / "shard_0.jsonl").write_text("\n".join(json.dumps(r) for r in rows) + "\n") + + output = curator_output / "cosmos3_sft.jsonl" + main( + curator_output=curator_output, + output=output, + caption_model="qwen", + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=5.0, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + + written = [json.loads(line) for line in output.read_text().splitlines() if line] + kept_uuids = {r["uuid"] for r in written} + assert kept_uuids == {"short-1", "short-2", "short-3"}, ( + f"Expected exactly the short uuids to pass; got {kept_uuids}" + ) + + summary = json.loads(output.with_suffix(output.suffix + ".summary.json").read_text()) + assert summary["records_seen"] == 5 + assert summary["records_kept"] == 3 + assert summary["records_dropped"] == 2 + assert summary["drops_by_reason"] == {"duration_too_long": 2} + + +@pytest.mark.L0 +def test_main_partial_split_on_min_window_frames(tmp_path: Path) -> None: + """min_window_frames must split the batch like duration does — guard against drop-all bugs. + + Build 4 rows where 2 have windows long enough to clear --min-window-frames 80 and + 2 don't. The kept set must match exactly by uuid. + """ + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + threshold = 80 + + def _row_with_window_len(uuid: str, length: int) -> dict[str, Any]: + return _make_record( + span_uuid=uuid, + windows=[ + { + "start_frame": 0, + "end_frame": length - 1, + "qwen_caption": "ok", + } + ], + ) + + rows = [ + _row_with_window_len("long-1", threshold + 5), # 85 frames — keep + _row_with_window_len("short-1", threshold - 5), # 75 frames — drop (no_valid_window) + _row_with_window_len("long-2", threshold + 20), # 100 frames — keep + _row_with_window_len("short-2", threshold - 1), # 79 frames — drop + ] + (metas_dir / "shard_0.jsonl").write_text("\n".join(json.dumps(r) for r in rows) + "\n") + + output = curator_output / "cosmos3_sft.jsonl" + main( + curator_output=curator_output, + output=output, + caption_model="qwen", + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=threshold, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + + written = [json.loads(line) for line in output.read_text().splitlines() if line] + assert {r["uuid"] for r in written} == {"long-1", "long-2"} + summary = json.loads(output.with_suffix(output.suffix + ".summary.json").read_text()) + assert summary["records_kept"] == 2 + assert summary["drops_by_reason"] == {"no_valid_window": 2} + + +@pytest.mark.L0 +def test_main_glob_reads_all_shards_in_metas_jsonl_dir(tmp_path: Path) -> None: + """Curator emits one .jsonl per (video_uuid, chunk_index); the converter must read them all.""" + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + + # Two shards, two rows each, distinct uuids so we can confirm every row arrives. + shard_a = [_make_record(span_uuid="a-0"), _make_record(span_uuid="a-1")] + shard_b = [_make_record(span_uuid="b-0"), _make_record(span_uuid="b-1")] + (metas_dir / "video-a_0.jsonl").write_text("\n".join(json.dumps(r) for r in shard_a) + "\n") + (metas_dir / "video-b_0.jsonl").write_text("\n".join(json.dumps(r) for r in shard_b) + "\n") + + output = curator_output / "cosmos3_sft.jsonl" + main( + curator_output=curator_output, + output=output, + caption_model="qwen", + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + + written = [json.loads(line) for line in output.read_text().splitlines() if line] + assert {r["uuid"] for r in written} == {"a-0", "a-1", "b-0", "b-1"} + summary = json.loads(output.with_suffix(output.suffix + ".summary.json").read_text()) + assert summary["shards_read"] == 2 + assert summary["records_seen"] == 4 + assert summary["records_kept"] == 4 + + +@pytest.mark.L0 +def test_main_caption_resolution_end_to_end_prefers_configured_enhanced_model(tmp_path: Path) -> None: + """End-to-end: confirm --enhanced-caption-model picks the right field even when other captions are present.""" + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + + record = _make_record( + windows=[ + { + "start_frame": _PASSING_WINDOW[0], + "end_frame": _PASSING_WINDOW[1], + "qwen_caption": "wrong-1: base qwen caption", + "nemotron_caption": "wrong-2: alphabetically-first base caption", + "gpt_oss_20b_enhanced_caption": "wrong-3: non-configured enhanced caption", + "qwen_lm_enhanced_caption": "correct: configured enhanced caption", + } + ], + ) + (metas_dir / "shard_0.jsonl").write_text(json.dumps(record) + "\n") + + output = curator_output / "cosmos3_sft.jsonl" + main( + curator_output=curator_output, + output=output, + caption_model="qwen", + enhanced_caption_model="qwen_lm", + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + + rows = [json.loads(line) for line in output.read_text().splitlines() if line] + assert len(rows) == 1 + assert rows[0]["t2w_windows"][0]["caption"] == "correct: configured enhanced caption" + + +@pytest.mark.L0 +def test_main_accepts_metas_jsonl_dir_directly(tmp_path: Path) -> None: + """User may pass the metas_jsonl/v0 dir itself instead of the curator root.""" + metas_dir = tmp_path / "metas_jsonl_v0" + metas_dir.mkdir() + shard_path = metas_dir / "video-uuid_0.jsonl" + shard_path.write_text(json.dumps(_make_record()) + "\n") + + output = tmp_path / "cosmos3_sft.jsonl" + main( + curator_output=metas_dir, + output=output, + caption_model=None, + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + assert output.exists() + assert len(output.read_text().splitlines()) == 1 + + +@pytest.mark.L0 +def test_main_exits_when_no_records_kept(tmp_path: Path) -> None: + curator_output = tmp_path / "curator_out" + metas_dir = curator_output / "metas_jsonl" / "v0" + metas_dir.mkdir(parents=True) + # Only emit a row that will fail filters. + bad = _make_record(span_uuid="bad", num_frames=120, framerate=1.0) + (metas_dir / "video-uuid_0.jsonl").write_text(json.dumps(bad) + "\n") + + output = curator_output / "cosmos3_sft.jsonl" + with pytest.raises(SystemExit) as exc_info: + main( + curator_output=curator_output, + output=output, + caption_model=None, + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + assert exc_info.value.code == 1 + + +@pytest.mark.L0 +def test_main_exits_when_no_metas_jsonl_found(tmp_path: Path) -> None: + output = tmp_path / "cosmos3_sft.jsonl" + with pytest.raises(SystemExit) as exc_info: + main( + curator_output=tmp_path, + output=output, + caption_model=None, + enhanced_caption_model=None, + min_short_edge=0, + min_window_frames=MIN_WINDOW_FRAMES, + max_duration_s=MAX_VIDEO_DURATION_S, + temporal_interval=DEFAULT_TEMPORAL_INTERVAL, + ) + assert exc_info.value.code == 1 diff --git a/docs/dataset_jsonl.md b/docs/dataset_jsonl.md index 1ee3f09..1cb3b79 100644 --- a/docs/dataset_jsonl.md +++ b/docs/dataset_jsonl.md @@ -190,3 +190,23 @@ python -m cosmos_framework.scripts.inference_prompts_to_json \ ``` This reads `val/captions//caption.json` and replaces the (dense) `prompt` with the serialized structured JSON, preserving `resolution`, `aspect_ratio`, `num_frames`, `fps`, and `vision_path`. Pass `--dry-run` to preview. + +### Create Dataset from a Cosmos-Curator output directory + +If your training videos came from the [Cosmos-Curator](https://github.com/nvidia/cosmos-curator) splitting pipeline, you can build the SFT JSONL directly from curator's per-clip metadata — no separate captioning step, no `ffprobe` re-read, and multi-window captions are preserved. + +**Prerequisite.** Curator must be invoked with `--upload-clip-info-in-chunks` so that `/metas_jsonl/v0/*.jsonl` is written. Without this flag the converter has no input. + +```shell +python -m cosmos_framework.scripts.curator_to_sft_jsonl \ + --curator-output outputs/curator_split/ \ + -o outputs/curator_split/cosmos3_sft.jsonl +``` + +By default the converter resolves each window's caption to the first non-empty `*_enhanced_caption` value, falling back to `*_caption`. Use `--caption-model` / `--enhanced-caption-model` to pin a specific captioner (e.g. `--caption-model qwen --enhanced-caption-model qwen_lm`). Pass `--min-short-edge N` to drop low-resolution clips, or `--min-window-frames N` / `--max-duration-s S` to tune the loader-matching filters. + +The converter mirrors `sft_dataset.py`'s silent filters (duration > 61.0 s, per-window frames < 61) so dataset counts match what training will actually consume. A sibling `cosmos3_sft.jsonl.summary.json` records the kept count and per-reason drop counts. + +Emitted `vision_path` values are rewritten relative to the output JSONL's directory (so the loader's relative-path branch resolves them). URIs like `s3://...` pass through unchanged. + +> This path emits only the dense `caption` string per window (not the structured `caption_json`). The loader trains on it via its dense-caption fallback (see [Format](#format)). To train on structured-JSON captions instead, use the captioning workflow above.