Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benchmarks/benchmark_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mkdir -p "$PYTHONPYCACHEPREFIX" 2>/dev/null || true

GPU_MONITOR_PID=""
GPU_METRICS_CSV="/workspace/gpu_metrics.csv"
export GPU_METRICS_CSV

# Start background GPU monitoring that logs metrics every second to CSV.
# Auto-detects NVIDIA (nvidia-smi) or AMD (amd-smi) GPUs.
Expand All @@ -32,6 +33,7 @@ start_gpu_monitor() {
done

GPU_METRICS_CSV="$output"
export GPU_METRICS_CSV

if command -v nvidia-smi &>/dev/null; then
nvidia-smi --query-gpu=timestamp,index,power.draw,temperature.gpu,clocks.current.sm,clocks.current.memory,utilization.gpu,utilization.memory \
Expand Down
6 changes: 6 additions & 0 deletions perf-changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3083,3 +3083,9 @@
description:
- "Update SGLang image from v0.5.10.post1-cu130 / v0.5.11-cu130 (30d old) to v0.5.12-cu130"
pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1451

- config-keys:
- qwen3.5-fp8-h200-sglang
description:
- "Smoke run validating measured-power aggregation pipeline. No config change; entry exists to trigger a sweep that produces the first agg_<run>.json with avg_power_w + joules_per_output_token populated by aggregate_power.py."
pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1551

Check warning on line 3091 in perf-changelog.yaml

View check run for this annotation

Claude / Claude Code Review

perf-changelog pr-link points to superseded fork PR #1551 instead of this PR #1558

The new `perf-changelog.yaml` entry's `pr-link` points to https://github.com/SemiAnalysisAI/InferenceX/pull/1551, but the PR description states this PR (#1558) supersedes #1551 and #1551 will be closed without merge. Update the link to `/pull/1558` so the changelog navigates to the PR with the actually-merged commit history.
271 changes: 271 additions & 0 deletions utils/aggregate_power.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
"""Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON.

Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi),
filters samples to the benchmark load window using start/end Unix timestamps
written by benchmark_serving.py, and patches two keys into the aggregated
result JSON consumed by InferenceX-app's ETL:

- avg_power_w: mean per-GPU power draw (W) during the load window
- joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens

The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric
field in the agg JSON into the `metrics` JSONB column, so no schema migration
is required.

Vendor schema detection is regex-based: any timestamp-like column + any column
whose name contains "power" (excluding "limit"/"cap"/"max") is picked up.
NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are
handled.

This script is best-effort. Missing or malformed CSV exits 0 without patching
so a monitoring hiccup never breaks the benchmark upload.
"""

from __future__ import annotations

import argparse
import csv
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean


_POWER_COL_RE = re.compile(r"power", re.IGNORECASE)
_POWER_EXCLUDE_RE = re.compile(r"limit|cap|max|min", re.IGNORECASE)
_TIMESTAMP_COL_RE = re.compile(r"time", re.IGNORECASE)
_GPU_INDEX_COL_RE = re.compile(r"^(index|gpu|gpu_id|gpu_index|card|device)$", re.IGNORECASE)
_NUMBER_RE = re.compile(r"-?\d+(?:\.\d+)?")


def _parse_timestamp(value: str) -> float | None:
"""Best-effort timestamp parse to Unix epoch seconds (local wall clock).

Handles the formats observed in practice:
- nvidia-smi: "2025/01/15 12:34:56.789" (local time, no TZ)
- amd-smi: ISO 8601 "2025-01-15T12:34:56.789" or epoch seconds
- Plain numeric epoch (int or float, s or ms)
"""
value = value.strip()
if not value:
return None
# Plain epoch number — accept both seconds and milliseconds.
if _NUMBER_RE.fullmatch(value):
n = float(value)
return n / 1000.0 if n > 1e12 else n
# nvidia-smi: "YYYY/MM/DD HH:MM:SS.ffffff"
for fmt in ("%Y/%m/%d %H:%M:%S.%f", "%Y/%m/%d %H:%M:%S"):
try:
return datetime.strptime(value, fmt).timestamp()
except ValueError:
pass
# ISO 8601 (amd-smi variants). fromisoformat tolerates 'T' or space separator
# in Python 3.11+; older versions need 'T'.
iso_value = value.replace(" ", "T", 1) if " " in value and "T" not in value else value
try:
dt = datetime.fromisoformat(iso_value)
except ValueError:
return None
if dt.tzinfo is None:
# Treat naive timestamps as local time (matches nvidia-smi convention).
return dt.timestamp()
return dt.astimezone(timezone.utc).timestamp()


def _parse_power(value: str) -> float | None:
"""Extract the first numeric value from a power cell.

nvidia-smi formats power as "412.34 W"; some configurations report
"[N/A]" when power capping is disabled. AMD reports a bare number.
"""
value = value.strip()
if not value or value.lower() in {"[n/a]", "n/a", "na"}:
return None
m = _NUMBER_RE.search(value)
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None


def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | None]:
"""Return (timestamp_col, power_col, gpu_index_col) from a CSV header.

Power column: contains "power" and not "limit"/"cap"/"max"/"min".
Timestamp column: contains "time".
GPU index column: optional — used to count distinct GPUs per sample.
"""
timestamp_col = next((c for c in header if _TIMESTAMP_COL_RE.search(c)), None)
power_col = next(
(c for c in header if _POWER_COL_RE.search(c) and not _POWER_EXCLUDE_RE.search(c)),
None,
)
gpu_col = next((c for c in header if _GPU_INDEX_COL_RE.match(c.strip())), None)
return timestamp_col, power_col, gpu_col


def aggregate_power(
csv_path: Path,
start_unix: float,
end_unix: float,
) -> tuple[float, int] | None:
"""Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end].

Returns None if the CSV is missing, empty, has no detectable power column,
or no rows fall in the window.
"""
if not csv_path.is_file() or csv_path.stat().st_size == 0:
return None
if end_unix <= start_unix:
return None

try:
with csv_path.open("r", newline="", encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f, skipinitialspace=True)
header = [c.strip() for c in (reader.fieldnames or [])]
reader.fieldnames = header
timestamp_col, power_col, gpu_col = _detect_columns(header)
if not timestamp_col or not power_col:
return None

# Group power readings by sample timestamp so per-sample total power
# (sum across GPUs) is computed correctly even if rows are interleaved.
per_sample_total: dict[float, float] = {}
per_sample_gpus: dict[float, set[str]] = {}
gpu_keys: set[str] = set()

for row in reader:
ts_raw = (row.get(timestamp_col) or "").strip()
pw_raw = (row.get(power_col) or "").strip()
ts = _parse_timestamp(ts_raw)
pw = _parse_power(pw_raw)
if ts is None or pw is None:
continue
if ts < start_unix or ts > end_unix:
continue
# Bucket by sample timestamp (rounded to ms to absorb sub-ms drift).
bucket = round(ts, 3)
per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw
gpu_id = (row.get(gpu_col) or "0").strip() if gpu_col else "0"
per_sample_gpus.setdefault(bucket, set()).add(gpu_id)
gpu_keys.add(gpu_id)
except (OSError, csv.Error):
return None

if not per_sample_total:
return None

# Number of distinct GPUs seen across the window.
num_gpus = max(len(gpu_keys), 1)
# Per-sample mean power per GPU = sum across GPUs at that timestamp / GPUs seen at that timestamp.
per_sample_mean_per_gpu = [
total / max(len(per_sample_gpus[ts]), 1) for ts, total in per_sample_total.items()
]
return mean(per_sample_mean_per_gpu), num_gpus

Check warning on line 168 in utils/aggregate_power.py

View check run for this annotation

Claude / Claude Code Review

avg_power_w misreports total system power when CSV lacks a GPU index column

Latent robustness bug in `aggregate_power` (utils/aggregate_power.py:141-168): when `_detect_columns` can't find a GPU index column (strict regex `^(index|gpu|gpu_id|gpu_index|card|device)$`), every row collapses to `gpu_id='0'`, causing `avg_power_w` to report system-total power (N × W) instead of per-GPU mean. Today's nvidia-smi (`index`) and amd-smi (`gpu`) headers match the regex so the in-tree pipeline is safe, but any future amd-smi schema variant (e.g. `device_id`, `GPU ID`) would silentl


def _load_bench_window(bench_result_path: Path) -> tuple[float, float, float, int] | None:
"""Read (start_unix, end_unix, duration_s, total_output_tokens) from the raw bench JSON."""
try:
bench = json.loads(bench_result_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
start = bench.get("benchmark_start_time_unix")
end = bench.get("benchmark_end_time_unix")
duration = bench.get("duration")
total_output = bench.get("total_output_tokens")
if not all(isinstance(v, (int, float)) for v in (start, end, duration)):
return None
if not isinstance(total_output, int) or total_output <= 0:
return None
return float(start), float(end), float(duration), int(total_output)


def patch_agg_result(
agg_path: Path,
avg_power_w: float,
joules_per_output_token: float,
) -> None:
"""Read the agg JSON, add the two power keys, and write it back atomically."""
data = json.loads(agg_path.read_text(encoding="utf-8"))
data["avg_power_w"] = round(avg_power_w, 3)
data["joules_per_output_token"] = round(joules_per_output_token, 6)
tmp_path = agg_path.with_suffix(agg_path.suffix + ".tmp")
tmp_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp_path.replace(agg_path)


def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:
window = _load_bench_window(bench_result)
if window is None:
print(
f"[aggregate_power] No bench window in {bench_result} — skipping power aggregation",
file=sys.stderr,
)
return 0
start, end, duration, total_output = window

result = aggregate_power(csv_path, start, end)
if result is None:
print(
f"[aggregate_power] No usable power samples in {csv_path} for "
f"window [{start}, {end}] — skipping",
file=sys.stderr,
)
return 0
avg_power_w, num_gpus = result

# Joules consumed by the system during the bench window / output tokens.
joules_per_output_token = (avg_power_w * num_gpus * duration) / total_output

if not agg_result.is_file():
print(
f"[aggregate_power] Agg result {agg_result} missing — cannot patch",
file=sys.stderr,
)
return 0

try:
patch_agg_result(agg_result, avg_power_w, joules_per_output_token)
except (OSError, json.JSONDecodeError) as exc:
print(f"[aggregate_power] Failed to patch {agg_result}: {exc}", file=sys.stderr)
return 0

print(
f"[aggregate_power] avg_power_w={avg_power_w:.2f} (per GPU, n={num_gpus}) "
f"joules_per_output_token={joules_per_output_token:.4f} "
f"duration={duration:.1f}s output_tokens={total_output} -> {agg_result}"
)
return 0


def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument(
"--csv",
type=Path,
default=Path("/workspace/gpu_metrics.csv"),
help="Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)",
)
parser.add_argument(
"--bench-result",
type=Path,
required=True,
help="Path to the raw benchmark_serving.py result JSON (provides bench window + token counts)",
)
parser.add_argument(
"--agg-result",
type=Path,
required=True,
help="Path to the agg_<run>.json output of process_result.py (will be patched in place)",
)
args = parser.parse_args()
return run(args.csv, args.bench_result, args.agg_result)


if __name__ == "__main__":
sys.exit(main())
4 changes: 4 additions & 0 deletions utils/bench_serving/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ async def limited_request_func(request_func_input, pbar):
print("Starting main benchmark run...")

benchmark_start_time = time.perf_counter()
benchmark_start_time_unix = time.time()
tasks: List[asyncio.Task] = []
async for request in get_request(input_requests, request_rate, burstiness):
prompt, prompt_len, output_len, mm_content = request
Expand Down Expand Up @@ -615,6 +616,7 @@ async def limited_request_func(request_func_input, pbar):
pbar.close()

benchmark_duration = time.perf_counter() - benchmark_start_time
benchmark_end_time_unix = time.time()

metrics, actual_output_lens = calculate_metrics(
input_requests=input_requests,
Expand Down Expand Up @@ -645,6 +647,8 @@ async def limited_request_func(request_func_input, pbar):

result = {
"duration": benchmark_duration,
"benchmark_start_time_unix": benchmark_start_time_unix,
"benchmark_end_time_unix": benchmark_end_time_unix,
"completed": metrics.completed,
"total_input_tokens": metrics.total_input,
"total_output_tokens": metrics.total_output,
Expand Down
25 changes: 24 additions & 1 deletion utils/process_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,28 @@ def get_required_env_vars(required_vars):

print(json.dumps(data, indent=2))

with open(f'agg_{result_filename}.json', 'w') as f:
agg_path = Path(f'agg_{result_filename}.json')
with open(agg_path, 'w') as f:
json.dump(data, f, indent=2)

# Best-effort: patch measured power into the agg JSON. Never fails the run.
try:
from aggregate_power import run as _aggregate_power_run

_csv_candidates = [
os.environ.get('GPU_METRICS_CSV'),
'gpu_metrics.csv',
'/workspace/gpu_metrics.csv',
]
_csv_path = next(
(Path(p) for p in _csv_candidates if p and Path(p).is_file()),
None,
)
if _csv_path is not None:
_aggregate_power_run(
csv_path=_csv_path,
bench_result=Path(f'{result_filename}.json'),
agg_result=agg_path,
)
except Exception as exc: # noqa: BLE001 — never block on telemetry
print(f'[process_result] power aggregation skipped: {exc}', file=sys.stderr)
Loading