feat(aggregator): offset metrics, per-event-name system metrics, and time-bucket persistence#68
Merged
hariharan-devarajan merged 3 commits intoMay 20, 2026
Conversation
There was a problem hiding this comment.
Pull request overview
This PR extends the DFT aggregator + dfanalyzer export path to support (1) offset metric aggregation, (2) per-event-name keying and scanning for system metrics stored in the SYSTEM_METRICS RocksDB column family, and (3) persistence of time-bucket bounds so read-only reopens can recover the trace’s time origin.
Changes:
- Add
offsetmetric tracking throughout aggregation, merge, and (de)serialization, and exposeoffset_min/offset_maxin the dfanalyzer Arrow schema/export. - Change system-metrics key serialization to include an event
name, add raw scanning APIs for theSYSTEM_METRICSCF, and export those metrics via a two-pass Arrow builder in the Python binding. - Persist min/max time-bucket bounds during SST index build and align counter/profile bucket timestamps to intended bucket boundaries; rename
lustre_staging→shared_stagingin the Dask distributed index path.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/utilities/composites/dft/aggregators/test_system_metrics.cpp | Updates key round-trip test for the new system-metrics key format including name. |
| tests/python/test_distributed_manifest.py | Updates tests for lustre_staging → shared_staging rename. |
| src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp | Calls persist_time_bounds() after SST merge/build so read-only reopen can recover origin. |
| src/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.cpp | Extends system-metrics key serialization/deserialization to [hhash][name][time_bucket]. |
| src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp | Adds scan_system_metrics_raw_fn and persist_time_bounds() implementations. |
| src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp | Adds reserved offset args, aligns profile bucket timestamps, ingests offset args, and keys system metrics by event name. |
| src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.cpp | Includes offset metric stats in aggregation value serialization/deserialization. |
| src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.cpp | Implements AggregationMetrics::update_offset and merges offset stats. |
| src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp | Adjusts preaggregated metric application and adds offset ingestion + reserved-arg filtering. |
| src/dftracer/utils/python/batch_indexer.cpp | Extends dfanalyzer schema/export with offset columns, time alignment for profile rows, unresolved-hash behavior, and a two-pass export for SYSTEM_METRICS. |
| python/dftracer/utils/dask.py | Renames lustre_staging → shared_staging and updates movement logic/docstrings. |
| include/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h | Updates SystemMetricKey and key serialization API to include name. |
| include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h | Declares new system-metrics scan and persist_time_bounds() APIs. |
| include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h | Extends AggMetrics view structs/parsers with offset fields and stddev helper. |
| include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.h | Adds MetricStats offset and update_offset() to AggregationMetrics. |
Comments suppressed due to low confidence (1)
src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp:30
apply_preaggregated_metricnow updatesstats.counteven when the pre-aggregated*_sum/plain value is missing and only*_min/*_maxare present. In that casetotalis forced to 0, which will skewmean(and any later derived stats) by increasing the denominator without increasing the numerator. Consider only incrementingstats.count/stats.total(and recomputingmean) whensum_val.exists(), while still allowingmin/maxto update independently when present.
if (!sum_val.exists() && !min_val.exists() && !max_val.exists()) return;
const auto total = sum_val.exists() ? sum_val.get<std::uint64_t>() : 0;
stats.count += ev_count;
stats.total += total;
if (min_val.exists()) {
stats.min = std::min(stats.min, min_val.get<std::uint64_t>());
}
if (max_val.exists()) {
stats.max = std::max(stats.max, max_val.get<std::uint64_t>());
}
if (stats.count > 0) {
stats.mean =
static_cast<double>(stats.total) / static_cast<double>(stats.count);
stats.m2 = 0.0;
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
hariharan-devarajan
approved these changes
May 20, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Extends the aggregator and dfanalyzer integration path with three independent
improvements:
SYSTEM_METRICScolumn family so distinctcounters (cpu, memory, etc.) keep separate buckets and can be pivoted into
named columns on the dfanalyzer side.
SST-built index can recover the trace origin.
Changes
Offset metric
MetricStats offsetfield onAggregationMetrics, wired through thecopy/assign/merge paths and
update_offset().AggMetricsView,AggMetricsFullView, the fast andfull parsers, and
serialize_agg_value_into/deserialize_agg_value) nowcarry
offset_total/min/maxplusmean/m2and anoffset_stddev()helper.offset/offset_sum/offset_min/offset_maxevent args. Offset has no meaningful sum, soingestion triggers on any of the offset args being present. The reserved-arg
filters in both
aggregation_logic.cppandaggregation_visitor.cppexcludethese keys from custom-metric tracking.
apply_preaggregated_metricno longer requiressumto exist; it now alsofires when only
min/maxare present.offset_min/offset_maxcolumns. They are emittedas null when no offset was ever recorded (
MetricStatsdefaultmin=UINT64_MAX, max=0), since0is itself a valid offset.Per-event-name system metrics
SystemMetricKeygains anamefield; key serialization becomes[hhash][name][time_bucket].handle_system_eventkeys buffers byev.name.EventAggregator::scan_system_metrics_raw[_fn]provides a sequentialscan of the SYSTEM_METRICS CF (its keys carry no shard prefix).
scan_system_metrics_buffer()in the Python binding does a two-passscan: pass 1 discovers the dynamic metric column names, pass 2 emits rows.
The schema must be declared up front for
RecordBatchBuilder. Results areappended to
Indexer_iter_arrow_dfanalyzer_all.Time-bucket persistence and bucket alignment
EventAggregator::persist_time_bounds()writes the in-memory min/maxtime bucket to the AGGREGATION CF. The SST build path now calls it
explicitly after
merge_chunk()inresolve_and_build_index, so a laterread-only reopen recovers the trace origin instead of emitting
time_rangeas an absolute bucket index.
ph="C") events report stats for the period ending atev.ts, soa boundary-aligned timestamp is assigned to the bucket it summarizes (the
one before it). Plain events keep their own timestamp.
time_start/time_endto the bucket grid; plain events keep precise min/max event timestamps.
Hash resolution
the hash itself. The dfanalyzer side treats empty
file_name/host_nameas missing (NA).
Rename:
lustre_staging->shared_stagingdistributed_indexand_build_sst_taskrename thelustre_stagingparameter to
shared_staging(it need not be Lustre, only a shared FS).Docstrings and tests updated accordingly.
Tests
test_system_metrics.cpp: key round-trip test updated for the newnamefield.
test_distributed_manifest.py: updated for theshared_stagingrename.