Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions .github/workflows/bench-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,8 @@ jobs:
run: |
set -Eeu -o pipefail -x

base_commit_sha=$(\
curl -L \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \
https://api.github.com/repos/vortex-data/vortex/actions/workflows/bench.yml/runs\?branch\=develop\&status\=success\&per_page\=1 \
| jq -r '.workflow_runs[].head_sha' \
)

python3 scripts/s3-download.py s3://vortex-ci-benchmark-results/data.json.gz data.json.gz --no-sign-request
gzip -d -c data.json.gz | grep $base_commit_sha > base.json
gzip -d -c data.json.gz > base.json

echo '# Benchmarks: ${{ matrix.benchmark.name }}' > comment.md
echo '' >> comment.md
Expand Down
10 changes: 1 addition & 9 deletions .github/workflows/sql-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -443,16 +443,8 @@ jobs:
run: |
set -Eeu -o pipefail -x

base_commit_sha=$(\
curl -L \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \
https://api.github.com/repos/vortex-data/vortex/actions/workflows/bench.yml/runs\?branch\=develop\&status\=success\&per_page\=1 \
| jq -r '.workflow_runs[].head_sha' \
)

python3 scripts/s3-download.py s3://vortex-ci-benchmark-results/data.json.gz data.json.gz --no-sign-request
gzip -d -c data.json.gz | grep $base_commit_sha > base.json
gzip -d -c data.json.gz > base.json

echo '# Benchmarks: ${{ matrix.name }}' > comment.md
echo '' >> comment.md
Expand Down
128 changes: 124 additions & 4 deletions scripts/compare-benchmark-jsons.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# "numpy",
# "pandas",
# "tabulate",
# "orjson"
# ]
# ///

Expand All @@ -18,6 +19,7 @@
from typing import Any

import numpy as np
import orjson
import pandas as pd

# Analysis overview:
Expand Down Expand Up @@ -59,9 +61,7 @@ def extract_dataset_key(df: pd.DataFrame) -> pd.DataFrame:
if "dataset" not in df.columns:
df["dataset_key"] = pd.NA
else:
df["dataset_key"] = df["dataset"].apply(
lambda x: str(sorted(x.items())) if pd.notna(x) and isinstance(x, dict) else pd.NA
)
df["dataset_key"] = df["dataset"].apply(dataset_key)
return df


Expand All @@ -77,6 +77,126 @@ def split_file_size_rows(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
return df[mask].copy(), df[~mask].copy()


def identity_value(value: Any) -> Any:
"""Normalize missing values so benchmark identities compare reliably."""

return None if pd.isna(value) else value


def dataset_key(value: Any) -> str | None:
"""Normalize dataset metadata into the join-key representation."""

if isinstance(value, dict):
return str(sorted(value.items()))
return None


def benchmark_identity(row: Any) -> tuple[Any, Any, Any] | None:
"""Return the timing-row identity used to find a matching baseline."""

if row.get("metric") == FILE_SIZE_METRIC or row.get("file_size") is not None:
return None

name = row.get("name")
if name is None:
return None

return (
identity_value(name),
identity_value(row.get("storage")),
dataset_key(row.get("dataset")),
)


def benchmark_identity_rows(df: pd.DataFrame) -> pd.DataFrame:
"""Return timing rows with the identity used to match a PR benchmark."""

_file_size_rows, timing_rows = split_file_size_rows(df)
if timing_rows.empty or "name" not in timing_rows.columns:
return pd.DataFrame(columns=["commit_id", "benchmark_identity"])

timing_rows = timing_rows.copy()
if "storage" not in timing_rows.columns:
timing_rows["storage"] = pd.NA
if "commit_id" not in timing_rows.columns:
timing_rows["commit_id"] = pd.NA

timing_rows = extract_dataset_key(timing_rows)
timing_rows["benchmark_identity"] = [
tuple(identity_value(row[column]) for column in ("name", "storage", "dataset_key"))
for _, row in timing_rows.iterrows()
]

return timing_rows[["commit_id", "benchmark_identity"]]


def read_jsonl_rows_for_commit(path: str, commit_id: str) -> pd.DataFrame:
"""Read only rows matching a commit from a JSONL benchmark history."""

rows = []
with open(path, encoding="utf-8") as lines:
for line in lines:
if '"commit_id"' not in line or f'"{commit_id}"' not in line:
continue
record = orjson.loads(line)
if record.get("commit_id") == commit_id:
rows.append(record)
return pd.DataFrame(rows)


def read_latest_baseline_rows(path: str, pr: pd.DataFrame) -> pd.DataFrame:
"""Read rows from the latest history commit matching the PR benchmark."""

pr_identities = set(benchmark_identity_rows(pr)["benchmark_identity"])
if not pr_identities:
return pd.read_json(path, lines=True)

baseline_commit_id = None
with open(path, encoding="utf-8") as lines:
for line in lines:
if '"name"' not in line or '"commit_id"' not in line:
continue
record = orjson.loads(line)
if benchmark_identity(record) in pr_identities:
commit_id = record.get("commit_id")
if commit_id is not None:
baseline_commit_id = commit_id

if baseline_commit_id is None:
raise ValueError("No baseline rows found for the benchmark under test")

return read_jsonl_rows_for_commit(path, baseline_commit_id)


def select_latest_baseline_rows(base: pd.DataFrame, pr: pd.DataFrame) -> pd.DataFrame:
"""Select rows from the latest baseline commit containing this benchmark.

The persisted benchmark history is append-only. A row only appears after
that benchmark job uploaded results, so the newest commit with matching row
identities is the latest successful baseline for the benchmark under test.
"""

if base.empty or "commit_id" not in base.columns:
return base

commit_ids = base["commit_id"].dropna().unique()
if len(commit_ids) <= 1:
return base

pr_identities = set(benchmark_identity_rows(pr)["benchmark_identity"])
if not pr_identities:
return base

base_identities = benchmark_identity_rows(base)
matches = base_identities[base_identities["benchmark_identity"].isin(pr_identities)]
matches = matches[matches["commit_id"].notna()]
if matches.empty:
raise ValueError("No baseline rows found for the benchmark under test")

baseline_commit_id = matches["commit_id"].iloc[-1]
return base[base["commit_id"] == baseline_commit_id].copy()


def extract_target_fields(name: str) -> pd.Series:
"""Parse query, engine, and format from the benchmark name."""

Expand Down Expand Up @@ -702,8 +822,8 @@ def main() -> None:

benchmark_name = sys.argv[3] if len(sys.argv) > 3 else ""

base = pd.read_json(sys.argv[1], lines=True)
pr = pd.read_json(sys.argv[2], lines=True)
base = read_latest_baseline_rows(sys.argv[1], pr)

base_commit_id = set(base["commit_id"].unique())
pr_commit_id = set(pr["commit_id"].unique())
Expand Down
106 changes: 106 additions & 0 deletions scripts/tests/test_benchmark_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,112 @@ def timing_row(name: str, base: int, pr: int) -> dict[str, object]:
}


def stored_timing_row(
commit: str,
name: str,
value: int,
storage: str | None = None,
dataset: dict[str, object] | None = None,
) -> dict[str, object]:
row: dict[str, object] = {
"name": name,
"unit": "ns",
"value": value,
"all_runtimes": [value, value, value],
"commit_id": commit,
}
if storage is not None:
row["storage"] = storage
if dataset is not None:
row["dataset"] = dataset
return row


def test_select_latest_baseline_rows_uses_latest_matching_benchmark_commit() -> None:
compare = load_compare_module()
history = pd.DataFrame(
[
stored_timing_row(
"base-old",
"tpch_q01/datafusion:parquet",
100,
"nvme",
{"scale_factor": "1.0"},
),
file_size_record_for("base-old", 100, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"),
stored_timing_row(
"base-current",
"tpch_q01/datafusion:parquet",
110,
"nvme",
{"scale_factor": "1.0"},
),
file_size_record_for("base-current", 120, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"),
stored_timing_row("base-other", "clickbench_q01/datafusion:parquet", 200, "nvme"),
]
)
pr = pd.DataFrame(
[
stored_timing_row(
"pr-sha",
"tpch_q01/datafusion:parquet",
115,
"nvme",
{"scale_factor": "1.0"},
),
]
)

selected = compare.select_latest_baseline_rows(history, pr)

assert set(selected["commit_id"]) == {"base-current"}
assert len(selected) == 2


def test_read_latest_baseline_rows_streams_latest_matching_benchmark_commit(tmp_path: Path) -> None:
compare = load_compare_module()
history_path = tmp_path / "history.jsonl"
history_rows = [
stored_timing_row(
"base-old",
"tpch_q01/datafusion:parquet",
100,
"nvme",
{"scale_factor": "1.0"},
),
file_size_record_for("base-old", 100, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"),
stored_timing_row(
"base-current",
"tpch_q01/datafusion:parquet",
110,
"nvme",
{"scale_factor": "1.0"},
),
file_size_record_for("base-current", 120, "tpch", "1.0", "vortex-file-compressed", "part-0.vortex"),
stored_timing_row("base-other", "clickbench_q01/datafusion:parquet", 200, "nvme"),
]
history_path.write_text(
"".join(f"{json.dumps(row)}\n" for row in history_rows),
encoding="utf-8",
)
pr = pd.DataFrame(
[
stored_timing_row(
"pr-sha",
"tpch_q01/datafusion:parquet",
115,
"nvme",
{"scale_factor": "1.0"},
),
]
)

selected = compare.read_latest_baseline_rows(history_path, pr)

assert set(selected["commit_id"]) == {"base-current"}
assert len(selected) == 2


def test_within_engine_analysis_uses_each_engines_own_parquet_control() -> None:
compare = load_compare_module()
rows = [
Expand Down
Loading