diff --git a/.github/workflows/bench-pr.yml b/.github/workflows/bench-pr.yml index 0216a38b8d0..cfab4f7fd3b 100644 --- a/.github/workflows/bench-pr.yml +++ b/.github/workflows/bench-pr.yml @@ -98,16 +98,8 @@ jobs: run: | set -Eeu -o pipefail -x - base_commit_sha=$(\ - curl -L \ - -H "Accept: application/vnd.github+json" \ - -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ - https://api.github.com/repos/vortex-data/vortex/actions/workflows/bench.yml/runs\?branch\=develop\&status\=success\&per_page\=1 \ - | jq -r '.workflow_runs[].head_sha' \ - ) - python3 scripts/s3-download.py s3://vortex-ci-benchmark-results/data.json.gz data.json.gz --no-sign-request - gzip -d -c data.json.gz | grep $base_commit_sha > base.json + gzip -d -c data.json.gz > base.json echo '# Benchmarks: ${{ matrix.benchmark.name }}' > comment.md echo '' >> comment.md diff --git a/.github/workflows/sql-benchmarks.yml b/.github/workflows/sql-benchmarks.yml index 03b09aee6d8..b46fe455a34 100644 --- a/.github/workflows/sql-benchmarks.yml +++ b/.github/workflows/sql-benchmarks.yml @@ -443,16 +443,8 @@ jobs: run: | set -Eeu -o pipefail -x - base_commit_sha=$(\ - curl -L \ - -H "Accept: application/vnd.github+json" \ - -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ - https://api.github.com/repos/vortex-data/vortex/actions/workflows/bench.yml/runs\?branch\=develop\&status\=success\&per_page\=1 \ - | jq -r '.workflow_runs[].head_sha' \ - ) - python3 scripts/s3-download.py s3://vortex-ci-benchmark-results/data.json.gz data.json.gz --no-sign-request - gzip -d -c data.json.gz | grep $base_commit_sha > base.json + gzip -d -c data.json.gz > base.json echo '# Benchmarks: ${{ matrix.name }}' > comment.md echo '' >> comment.md diff --git a/scripts/compare-benchmark-jsons.py b/scripts/compare-benchmark-jsons.py index e864209d011..7d560c144a3 100644 --- a/scripts/compare-benchmark-jsons.py +++ b/scripts/compare-benchmark-jsons.py @@ -4,6 +4,7 @@ # "numpy", # "pandas", # "tabulate", +# "orjson" # ] # /// @@ -18,6 +19,7 @@ from typing import Any import numpy as np +import orjson import pandas as pd # Analysis overview: @@ -59,9 +61,7 @@ def extract_dataset_key(df: pd.DataFrame) -> pd.DataFrame: if "dataset" not in df.columns: df["dataset_key"] = pd.NA else: - df["dataset_key"] = df["dataset"].apply( - lambda x: str(sorted(x.items())) if pd.notna(x) and isinstance(x, dict) else pd.NA - ) + df["dataset_key"] = df["dataset"].apply(dataset_key) return df @@ -77,6 +77,126 @@ def split_file_size_rows(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: return df[mask].copy(), df[~mask].copy() +def identity_value(value: Any) -> Any: + """Normalize missing values so benchmark identities compare reliably.""" + + return None if pd.isna(value) else value + + +def dataset_key(value: Any) -> str | None: + """Normalize dataset metadata into the join-key representation.""" + + if isinstance(value, dict): + return str(sorted(value.items())) + return None + + +def benchmark_identity(row: Any) -> tuple[Any, Any, Any] | None: + """Return the timing-row identity used to find a matching baseline.""" + + if row.get("metric") == FILE_SIZE_METRIC or row.get("file_size") is not None: + return None + + name = row.get("name") + if name is None: + return None + + return ( + identity_value(name), + identity_value(row.get("storage")), + dataset_key(row.get("dataset")), + ) + + +def benchmark_identity_rows(df: pd.DataFrame) -> pd.DataFrame: + """Return timing rows with the identity used to match a PR benchmark.""" + + _file_size_rows, timing_rows = split_file_size_rows(df) + if timing_rows.empty or "name" not in timing_rows.columns: + return pd.DataFrame(columns=["commit_id", "benchmark_identity"]) + + timing_rows = timing_rows.copy() + if "storage" not in timing_rows.columns: + timing_rows["storage"] = pd.NA + if "commit_id" not in timing_rows.columns: + timing_rows["commit_id"] = pd.NA + + timing_rows = extract_dataset_key(timing_rows) + timing_rows["benchmark_identity"] = [ + tuple(identity_value(row[column]) for column in ("name", "storage", "dataset_key")) + for _, row in timing_rows.iterrows() + ] + + return timing_rows[["commit_id", "benchmark_identity"]] + + +def read_jsonl_rows_for_commit(path: str, commit_id: str) -> pd.DataFrame: + """Read only rows matching a commit from a JSONL benchmark history.""" + + rows = [] + with open(path, encoding="utf-8") as lines: + for line in lines: + if '"commit_id"' not in line or f'"{commit_id}"' not in line: + continue + record = orjson.loads(line) + if record.get("commit_id") == commit_id: + rows.append(record) + return pd.DataFrame(rows) + + +def read_latest_baseline_rows(path: str, pr: pd.DataFrame) -> pd.DataFrame: + """Read rows from the latest history commit matching the PR benchmark.""" + + pr_identities = set(benchmark_identity_rows(pr)["benchmark_identity"]) + if not pr_identities: + return pd.read_json(path, lines=True) + + baseline_commit_id = None + with open(path, encoding="utf-8") as lines: + for line in lines: + if '"name"' not in line or '"commit_id"' not in line: + continue + record = orjson.loads(line) + if benchmark_identity(record) in pr_identities: + commit_id = record.get("commit_id") + if commit_id is not None: + baseline_commit_id = commit_id + + if baseline_commit_id is None: + raise ValueError("No baseline rows found for the benchmark under test") + + return read_jsonl_rows_for_commit(path, baseline_commit_id) + + +def select_latest_baseline_rows(base: pd.DataFrame, pr: pd.DataFrame) -> pd.DataFrame: + """Select rows from the latest baseline commit containing this benchmark. + + The persisted benchmark history is append-only. A row only appears after + that benchmark job uploaded results, so the newest commit with matching row + identities is the latest successful baseline for the benchmark under test. + """ + + if base.empty or "commit_id" not in base.columns: + return base + + commit_ids = base["commit_id"].dropna().unique() + if len(commit_ids) <= 1: + return base + + pr_identities = set(benchmark_identity_rows(pr)["benchmark_identity"]) + if not pr_identities: + return base + + base_identities = benchmark_identity_rows(base) + matches = base_identities[base_identities["benchmark_identity"].isin(pr_identities)] + matches = matches[matches["commit_id"].notna()] + if matches.empty: + raise ValueError("No baseline rows found for the benchmark under test") + + baseline_commit_id = matches["commit_id"].iloc[-1] + return base[base["commit_id"] == baseline_commit_id].copy() + + def extract_target_fields(name: str) -> pd.Series: """Parse query, engine, and format from the benchmark name.""" @@ -702,8 +822,8 @@ def main() -> None: benchmark_name = sys.argv[3] if len(sys.argv) > 3 else "" - base = pd.read_json(sys.argv[1], lines=True) pr = pd.read_json(sys.argv[2], lines=True) + base = read_latest_baseline_rows(sys.argv[1], pr) base_commit_id = set(base["commit_id"].unique()) pr_commit_id = set(pr["commit_id"].unique()) diff --git a/scripts/tests/test_benchmark_reporting.py b/scripts/tests/test_benchmark_reporting.py index ac356eff130..7925161e91e 100644 --- a/scripts/tests/test_benchmark_reporting.py +++ b/scripts/tests/test_benchmark_reporting.py @@ -33,6 +33,112 @@ def timing_row(name: str, base: int, pr: int) -> dict[str, object]: } +def stored_timing_row( + commit: str, + name: str, + value: int, + storage: str | None = None, + dataset: dict[str, object] | None = None, +) -> dict[str, object]: + row: dict[str, object] = { + "name": name, + "unit": "ns", + "value": value, + "all_runtimes": [value, value, value], + "commit_id": commit, + } + if storage is not None: + row["storage"] = storage + if dataset is not None: + row["dataset"] = dataset + return row + + +def test_select_latest_baseline_rows_uses_latest_matching_benchmark_commit() -> None: + compare = load_compare_module() + history = pd.DataFrame( + [ + stored_timing_row( + "base-old", + "tpch_q01/datafusion:parquet", + 100, + "nvme", + {"scale_factor": "1.0"}, + ), + file_size_record_for("base-old", 100, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"), + stored_timing_row( + "base-current", + "tpch_q01/datafusion:parquet", + 110, + "nvme", + {"scale_factor": "1.0"}, + ), + file_size_record_for("base-current", 120, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"), + stored_timing_row("base-other", "clickbench_q01/datafusion:parquet", 200, "nvme"), + ] + ) + pr = pd.DataFrame( + [ + stored_timing_row( + "pr-sha", + "tpch_q01/datafusion:parquet", + 115, + "nvme", + {"scale_factor": "1.0"}, + ), + ] + ) + + selected = compare.select_latest_baseline_rows(history, pr) + + assert set(selected["commit_id"]) == {"base-current"} + assert len(selected) == 2 + + +def test_read_latest_baseline_rows_streams_latest_matching_benchmark_commit(tmp_path: Path) -> None: + compare = load_compare_module() + history_path = tmp_path / "history.jsonl" + history_rows = [ + stored_timing_row( + "base-old", + "tpch_q01/datafusion:parquet", + 100, + "nvme", + {"scale_factor": "1.0"}, + ), + file_size_record_for("base-old", 100, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"), + stored_timing_row( + "base-current", + "tpch_q01/datafusion:parquet", + 110, + "nvme", + {"scale_factor": "1.0"}, + ), + file_size_record_for("base-current", 120, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"), + stored_timing_row("base-other", "clickbench_q01/datafusion:parquet", 200, "nvme"), + ] + history_path.write_text( + "".join(f"{json.dumps(row)}\n" for row in history_rows), + encoding="utf-8", + ) + pr = pd.DataFrame( + [ + stored_timing_row( + "pr-sha", + "tpch_q01/datafusion:parquet", + 115, + "nvme", + {"scale_factor": "1.0"}, + ), + ] + ) + + selected = compare.read_latest_baseline_rows(history_path, pr) + + assert set(selected["commit_id"]) == {"base-current"} + assert len(selected) == 2 + + def test_within_engine_analysis_uses_each_engines_own_parquet_control() -> None: compare = load_compare_module() rows = [