Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions perf-changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3171,3 +3171,9 @@
description:
- "Validates measured-power aggregation pipeline (PR #1558) on both NVIDIA (H200) and AMD (MI355X) hardware — different SMI tools (nvidia-smi vs amd-smi), different CSV schemas (power.draw [W] vs socket_power), same aggregator. No config change. Entry intentionally kept past merge so run-sweep produces canonical agg JSONs with avg_power_w + joules_per_output_token on main for both vendors, seeding the dashboard's day-zero data."
pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1558

- config-keys:
- dsv4-fp4-gb300-dynamo-sglang
description:
- "Smoke run validating multinode measured-power aggregation (PR #1574). No config change; entry exists to trigger a sweep that produces the first multinode agg JSON with avg_power_w + joules_per_*_token populated from per-node srt-slurm perfmon CSVs. Validates per-source GPU-id namespacing in aggregate_power.py (without it, 14 nodes × 4 GPUs would report num_gpus=4 instead of 56) and the GPU_METRICS_CSV_GLOB env var bridge in process_result.py. Only the gb300-cw runner has the perfmon launcher changes; any gb300-nv runs in the sweep will succeed normally without power fields, which the dashboard handles gracefully (chart gates on field presence)."
pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1574
41 changes: 39 additions & 2 deletions runners/launch_gb300-cw.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ if [[ $MODEL_PREFIX == "dsv4" && $PRECISION == "fp4" ]]; then
export MODEL_PATH="/mnt/vast/models/dsv4"

if [[ $FRAMEWORK == "dynamo-sglang" ]]; then
SRT_SLURM_RECIPES_REPO="https://github.com/NVIDIA/srt-slurm.git"
SRT_SLURM_RECIPES_REF="main"
# Pinned to our SemiAnalysisAI fork of NVIDIA/srt-slurm to pick up
# PR #35 (per-node nvidia-smi monitoring during the benchmark sweep)
# ahead of its upstream merge. The branch tracks PR #35's head SHA:
# to bump, re-fetch refs/pull/35/head from NVIDIA/srt-slurm and force-
# push to SemiAnalysisAI/srt-slurm:feat/inferencex-perfmon.
SRT_SLURM_RECIPES_REPO="https://github.com/SemiAnalysisAI/srt-slurm.git"
SRT_SLURM_RECIPES_REF="feat/inferencex-perfmon"
SRT_RECIPE_SRC="$GITHUB_WORKSPACE/benchmarks/multi_node/srt-slurm-recipes/sglang/deepseek-v4"
SRT_RECIPE_DST="recipes/sglang/deepseek-v4"
elif [[ $FRAMEWORK == "dynamo-vllm" ]]; then
Expand Down Expand Up @@ -106,6 +111,19 @@ git checkout "$SRT_SLURM_RECIPES_REF"
mkdir -p "$SRT_RECIPE_DST"
cp -rT "$SRT_RECIPE_SRC" "$SRT_RECIPE_DST"

# Enable per-node GPU perfmon (PR #35) on every overlaid recipe. `monitoring`
# is a top-level SrtConfig field and defaults to None, so without this the
# orchestrator's _start_perf_monitor short-circuits and no perf_samples_*.csv
# are ever written — multinode measured-power aggregation would silently
# skip. Idempotent: skips recipes that already declare `monitoring:`.
for recipe in "$SRT_RECIPE_DST"/*.yaml; do
[ -f "$recipe" ] || continue
if ! grep -q '^monitoring:' "$recipe"; then
printf '\nmonitoring:\n enabled: true\n sample_interval: 1.0\n' >> "$recipe"
echo "[perfmon] enabled monitoring in recipe: $recipe"
fi
done

echo "Installing srtctl..."
# CRITICAL — uv install location.
# Runner pod is x86 but compute nodes are aarch64, and /mnt/home is
Expand Down Expand Up @@ -279,6 +297,25 @@ else
echo "Warning: Logs directory not found at $LOGS_DIR"
fi

# Hand the per-node perfmon CSVs off to the downstream "Process result" step
# in benchmark-multinode-tmpl.yml. srt-slurm's perfmon (PR #35) writes
# perf_samples_{node}.csv straight into $LOGS_DIR on the host. process_result.py
# already invokes aggregate_power.run() inline; teaching it to read
# GPU_METRICS_CSV_GLOB lets utils/aggregate_power.py do the multi-CSV
# aggregation (each agg JSON gets avg_power_w / joules_per_*_token patched in
# place). Use an absolute glob because process_result.py runs from
# $GITHUB_WORKSPACE, not from this srt-slurm checkout.
if [ -d "$LOGS_DIR" ]; then
perf_glob_dir="$(pwd)/$LOGS_DIR"
perf_csv_count=$(ls "$perf_glob_dir"/perf_samples_*.csv 2>/dev/null | wc -l | tr -d ' ')
if [ "$perf_csv_count" -gt 0 ]; then
echo "[perfmon] Found $perf_csv_count per-node perf_samples_*.csv under $perf_glob_dir/"
echo "GPU_METRICS_CSV_GLOB=$perf_glob_dir/perf_samples_*.csv" >> "$GITHUB_ENV"
else
echo "[perfmon] WARNING: monitoring enabled but no perf_samples_*.csv found in $perf_glob_dir — measured power aggregation will be skipped"
fi
fi

if [[ "${EVAL_ONLY:-false}" != "true" ]]; then
if [ ! -d "$LOGS_DIR" ]; then
exit 1
Expand Down
225 changes: 154 additions & 71 deletions utils/aggregate_power.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
"""Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON.

Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi),
filters samples to the benchmark load window using start/end Unix timestamps
written by benchmark_serving.py, and patches two keys into the aggregated
result JSON consumed by InferenceX-app's ETL:
Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi)
or by srt-slurm's per-node perfmon (multinode), filters samples to the benchmark
load window using start/end Unix timestamps written by benchmark_serving.py, and
patches three keys into the aggregated result JSON consumed by InferenceX-app's
ETL:

- avg_power_w: mean per-GPU power draw (W) during the load window
- joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens
- joules_per_total_token: same, divided by (input + output) tokens

Multinode: accepts multiple CSV paths (one per worker node). GPU indices are
namespaced by source CSV stem to avoid the same-index collision across nodes —
e.g. 8 nodes each reporting indices 0..3 would otherwise be miscounted as 4
total GPUs instead of 32.

The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric
field in the agg JSON into the `metrics` JSONB column, so no schema migration
is required.

Vendor schema detection is regex-based: any timestamp-like column + any column
whose name contains "power" (excluding "limit"/"cap"/"max") is picked up.
NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are
handled.
NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version; srt-slurm's
perfmon emits "power_w". All are handled.

This script is best-effort. Missing or malformed CSV exits 0 without patching
so a monitoring hiccup never breaks the benchmark upload.
Expand All @@ -25,9 +32,11 @@

import argparse
import csv
import glob as glob_module
import json
import re
import sys
from collections.abc import Iterable
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean
Expand Down Expand Up @@ -109,74 +118,84 @@ def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | No


def aggregate_power(
csv_path: Path,
csv_path: Path | Iterable[Path],
start_unix: float,
end_unix: float,
) -> tuple[float, int] | None:
"""Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end].

Returns None if the CSV is missing, empty, has no detectable power column,
or no rows fall in the window.
Accepts either a single Path (single-node case) or an iterable of Paths
(multinode case: one CSV per worker node, all written by srt-slurm's
perfmon). For multi-path inputs, GPU indices are namespaced by source
CSV stem so the distinct-id count reflects the true total — each node
independently reports indices 0..N, and without namespacing the union
would collapse to a single node's worth.

Returns None if no CSVs are usable, none have a detectable power column,
or no rows fall in the window across all paths.
"""
if not csv_path.is_file() or csv_path.stat().st_size == 0:
return None
if end_unix <= start_unix:
paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path)
if not paths or end_unix <= start_unix:
return None

try:
with csv_path.open("r", newline="", encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f, skipinitialspace=True)
header = [c.strip() for c in (reader.fieldnames or [])]
reader.fieldnames = header
timestamp_col, power_col, gpu_col = _detect_columns(header)
if not timestamp_col or not power_col:
return None

# Group power readings by sample timestamp so per-sample total power
# (sum across GPUs) is computed correctly even if rows are interleaved.
#
# per_sample_row_count is the structural divisor: it's incremented for
# every contributing row regardless of whether a GPU-index column was
# detected. per_sample_gpus / gpu_keys are only populated when gpu_col
# is present and provide the canonical num_gpus via distinct-id count.
# When gpu_col is absent (vendor schema variant whose header doesn't
# match _GPU_INDEX_COL_RE), we fall back to inferring num_gpus from
# the modal row count per timestamp — assuming one row per GPU per
# sample, which is what every SMI tool we've seen actually emits.
per_sample_total: dict[float, float] = {}
per_sample_row_count: dict[float, int] = {}
per_sample_gpus: dict[float, set[str]] = {}
gpu_keys: set[str] = set()

for row in reader:
ts_raw = (row.get(timestamp_col) or "").strip()
pw_raw = (row.get(power_col) or "").strip()
ts = _parse_timestamp(ts_raw)
pw = _parse_power(pw_raw)
if ts is None or pw is None:
continue
if ts < start_unix or ts > end_unix:
# Only namespace when there are multiple sources — keeps single-node
# gpu_keys identical to the pre-multinode behavior so existing callers
# see the same num_gpus values.
namespace = len(paths) > 1

# Per-sample state accumulates across ALL paths. Bucketed by ms-rounded
# timestamp so nodes whose clocks drift sub-ms still end up in the same
# bucket (they reliably do — all sample on `time.sleep(interval)` against
# the same NTP-synced cluster clock).
per_sample_total: dict[float, float] = {}
per_sample_row_count: dict[float, int] = {}
per_sample_gpus: dict[float, set[str]] = {}
gpu_keys: set[str] = set()
saw_gpu_col = False

for path in paths:
if not path.is_file() or path.stat().st_size == 0:
continue
try:
with path.open("r", newline="", encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f, skipinitialspace=True)
header = [c.strip() for c in (reader.fieldnames or [])]
reader.fieldnames = header
timestamp_col, power_col, gpu_col = _detect_columns(header)
if not timestamp_col or not power_col:
continue
# Bucket by sample timestamp (rounded to ms to absorb sub-ms drift).
bucket = round(ts, 3)
per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw
per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1
if gpu_col:
gpu_id = (row.get(gpu_col) or "").strip()
if gpu_id:
per_sample_gpus.setdefault(bucket, set()).add(gpu_id)
gpu_keys.add(gpu_id)
except (OSError, csv.Error):
return None
saw_gpu_col = True

for row in reader:
ts_raw = (row.get(timestamp_col) or "").strip()
pw_raw = (row.get(power_col) or "").strip()
ts = _parse_timestamp(ts_raw)
pw = _parse_power(pw_raw)
if ts is None or pw is None:
continue
if ts < start_unix or ts > end_unix:
continue
bucket = round(ts, 3)
per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw
per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1
if gpu_col:
gpu_id = (row.get(gpu_col) or "").strip()
if gpu_id:
ns_id = f"{path.stem}:{gpu_id}" if namespace else gpu_id
per_sample_gpus.setdefault(bucket, set()).add(ns_id)
gpu_keys.add(ns_id)
except (OSError, csv.Error):
continue

if not per_sample_total:
return None

# Per-sample divisor and overall num_gpus.
# - If a GPU column was detected, trust distinct GPU IDs (correct for any
# sampling pattern, including hot-swap or partial visibility).
# - Otherwise, infer from row count (one row per GPU per sample).
if gpu_col and gpu_keys:
# - If any path exposed a GPU column, trust distinct (namespaced) GPU IDs.
# - Otherwise, infer from row count (one row per GPU per sample, summed
# across all paths' rows that fell into the same timestamp bucket).
if saw_gpu_col and gpu_keys:
num_gpus = len(gpu_keys)
per_sample_mean_per_gpu = [
total / max(len(per_sample_gpus.get(ts, ())), 1)
Expand All @@ -194,7 +213,16 @@ def _load_bench_window(
bench_result_path: Path,
) -> tuple[float, float, float, int, int] | None:
"""Read (start_unix, end_unix, duration_s, total_output_tokens, total_input_tokens)
from the raw bench JSON. Returns None if any required field is missing.
from the raw bench JSON. Returns None if a window cannot be resolved.

Window resolution order, tried in turn:
1. benchmark_start_time_unix + benchmark_end_time_unix (our benchmark_serving.py
writes both — single-node, brackets the actual load window exactly).
2. date + duration (srt-slurm sa-bench writes "YYYYMMDD-HHMMSS" UTC as the
result write time — multinode; treat as bench end and subtract duration
for start. Overshoots by post-bench JSON serialization, typically <5s).
3. file mtime + duration (last resort if `date` is absent or unparseable —
same end-of-bench proxy as #2 via the result file's mtime).

total_input_tokens defaults to 0 if absent (older bench JSONs may not have it);
this only degrades joules_per_total_token to equal joules_per_output_token in
Expand All @@ -204,18 +232,52 @@ def _load_bench_window(
bench = json.loads(bench_result_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
start = bench.get("benchmark_start_time_unix")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multinode num_gpus wrong without GPU column and clock drift

Low Severity

When multiple CSV paths are provided but none have a recognized GPU column (no match for _GPU_INDEX_COL_RE), num_gpus falls back to max(per_sample_row_count.values()). With multinode clock drift, each timestamp bucket only contains rows from a single node, so max returns one node's GPU count instead of the cluster total. This underestimates num_gpus and produces an incorrect total_system_energy_j, leading to wrong joules_per_*_token values in the agg JSON.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8d30341. Configure here.

end = bench.get("benchmark_end_time_unix")
duration = bench.get("duration")
total_output = bench.get("total_output_tokens")
total_input = bench.get("total_input_tokens", 0)
if not all(isinstance(v, (int, float)) for v in (start, end, duration)):
if not isinstance(duration, (int, float)):
return None
if not isinstance(total_output, int) or total_output <= 0:
return None
if not isinstance(total_input, int) or total_input < 0:
total_input = 0
return float(start), float(end), float(duration), int(total_output), int(total_input)

# Tier 1: explicit Unix timestamps (single-node bench_serving.py).
start = bench.get("benchmark_start_time_unix")
end = bench.get("benchmark_end_time_unix")
if isinstance(start, (int, float)) and isinstance(end, (int, float)):
return float(start), float(end), float(duration), int(total_output), int(total_input)

# Tier 2: parse `date` field (srt-slurm sa-bench multinode). On observed
# runs the string matches file mtime to the second, confirming it's the
# JSON write time.
date_str = bench.get("date")
if isinstance(date_str, str):
try:
end_dt = datetime.strptime(date_str, "%Y%m%d-%H%M%S").replace(tzinfo=timezone.utc)
end_unix = end_dt.timestamp()
return (
float(end_unix - duration),
float(end_unix),
float(duration),
int(total_output),
int(total_input),
)
except ValueError:
pass

# Tier 3: file mtime as last-resort bench-end proxy.
try:
end_unix = bench_result_path.stat().st_mtime
except OSError:
return None
return (
float(end_unix - duration),
float(end_unix),
float(duration),
int(total_output),
int(total_input),
)


def patch_agg_result(
Expand All @@ -234,7 +296,7 @@ def patch_agg_result(
tmp_path.replace(agg_path)


def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:
def run(csv_path: Path | Iterable[Path], bench_result: Path, agg_result: Path) -> int:
window = _load_bench_window(bench_result)
if window is None:
print(
Expand All @@ -244,10 +306,12 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:
return 0
start, end, duration, total_output, total_input = window

result = aggregate_power(csv_path, start, end)
paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path)
result = aggregate_power(paths, start, end)
if result is None:
label = str(paths[0]) if len(paths) == 1 else f"{len(paths)} CSVs"
print(
f"[aggregate_power] No usable power samples in {csv_path} for "
f"[aggregate_power] No usable power samples in {label} for "
f"window [{start}, {end}] — skipping",
file=sys.stderr,
)
Expand Down Expand Up @@ -291,11 +355,20 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:

def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument(
source = parser.add_mutually_exclusive_group()
source.add_argument(
"--csv",
type=Path,
default=Path("/workspace/gpu_metrics.csv"),
help="Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)",
default=None,
help="Single gpu_metrics.csv from start_gpu_monitor (single-node). "
"Falls back to /workspace/gpu_metrics.csv when neither --csv nor --csv-glob is set.",
)
source.add_argument(
"--csv-glob",
type=str,
default=None,
help="Shell glob expanding to per-node perf_samples_*.csv files (multinode, "
"written by srt-slurm's perfmon). GPU indices are namespaced by source CSV stem.",
)
parser.add_argument(
"--bench-result",
Expand All @@ -310,7 +383,17 @@ def main() -> int:
help="Path to the agg_<run>.json output of process_result.py (will be patched in place)",
)
args = parser.parse_args()
return run(args.csv, args.bench_result, args.agg_result)

if args.csv_glob:
paths = sorted(Path(p) for p in glob_module.glob(args.csv_glob))
if not paths:
print(
f"[aggregate_power] No CSVs matched glob {args.csv_glob!r} — skipping",
file=sys.stderr,
)
return 0
return run(paths, args.bench_result, args.agg_result)
return run(args.csv or Path("/workspace/gpu_metrics.csv"), args.bench_result, args.agg_result)


if __name__ == "__main__":
Expand Down
Loading
Loading