From 3c9c481a2fec2219ff603e7df889b118695461e8 Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 19 May 2026 20:11:03 -0500 Subject: [PATCH 1/3] feat(aggregator): add system metrics scan and serialization with per-event-name keying --- .../dft/aggregators/event_aggregator.h | 14 +++ .../system_metrics_serialization.h | 9 +- python/dftracer/utils/dask.py | 24 ++--- src/dftracer/utils/python/batch_indexer.cpp | 96 +++++++++++++++++++ .../dft/aggregators/aggregation_visitor.cpp | 2 +- .../dft/aggregators/event_aggregator.cpp | 17 ++++ .../system_metrics_serialization.cpp | 11 ++- tests/python/test_distributed_manifest.py | 10 +- .../dft/aggregators/test_system_metrics.cpp | 4 +- 9 files changed, 160 insertions(+), 27 deletions(-) diff --git a/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h b/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h index 077de51a..8d69216a 100644 --- a/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h +++ b/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h @@ -44,6 +44,20 @@ class EventAggregator { std::uint16_t shard_end, RawScanCallbackFn fn, void* ctx) const; + /// Sequential scan of the SYSTEM_METRICS CF (no shard prefix in keys). + std::size_t scan_system_metrics_raw_fn(RawScanCallbackFn fn, + void* ctx) const; + + template + std::size_t scan_system_metrics_raw(F&& callback) const { + auto adapter = + +[](void* ctx, std::string_view k, std::string_view v) -> bool { + return (*static_cast*>(ctx))(k, v); + }; + return scan_system_metrics_raw_fn(adapter, + static_cast(&callback)); + } + /// Template wrapper: forwards any callable `(sv, sv) -> bool` into the /// raw scan with zero heap allocations. The adapter lambda is a captureless /// `+[]` so it decays to a plain function pointer. diff --git a/include/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h b/include/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h index db5a16f8..5305ac9c 100644 --- a/include/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h +++ b/include/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h @@ -9,17 +9,18 @@ namespace dftracer::utils::utilities::composites::dft::aggregators { -// System metrics key: [hhash:var][time_bucket:varint] -// Simpler key than regular aggregation since system metrics are host-level - +// Per-event-name keying so cpu/memory/etc keep separate buckets and the +// dfanalyzer side can pivot them into named columns (sys_cpu_idle_pct, ...). struct SystemMetricKey { std::string hhash; + std::string name; std::uint64_t time_bucket = 0; }; void serialize_system_key_into(std::string& out, std::string_view hhash, + std::string_view name, std::uint64_t time_bucket); -std::string serialize_system_key(std::string_view hhash, +std::string serialize_system_key(std::string_view hhash, std::string_view name, std::uint64_t time_bucket); struct DeserializedSystemKey { diff --git a/python/dftracer/utils/dask.py b/python/dftracer/utils/dask.py index f48ad108..75c06311 100644 --- a/python/dftracer/utils/dask.py +++ b/python/dftracer/utils/dask.py @@ -733,7 +733,7 @@ def _build_sst_task( file_ids: List[int], file_slices: Optional[List[Any]], local_staging: str, - lustre_staging: str, + shared_staging: str, batch_id: str, index_dir: str, checkpoint_size: int, @@ -782,9 +782,9 @@ def _build_sst_task( t_build = _time.monotonic() n_moved = 0 - if lustre_staging and lustre_staging != local_staging: + if shared_staging and shared_staging != local_staging: # Keep per-sink subdir to avoid aggregation.sst collisions. - base = os.path.join(lustre_staging, batch_id) + base = os.path.join(shared_staging, batch_id) relocated: List[Dict[str, Optional[str]]] = [] for i, d in enumerate(artifact_dicts): relocated.append(move_artifacts(d, os.path.join(base, f"sub_{i}"))) @@ -820,7 +820,7 @@ def distributed_index( files: Optional[List[str]] = None, index_path: str = "", local_staging: str = "", - lustre_staging: str = "", + shared_staging: str = "", client: Optional["Client"] = None, checkpoint_size: int = 32 * 1024 * 1024, bloom_dimensions: Optional[List[str]] = None, @@ -841,7 +841,7 @@ def distributed_index( file_ids and writes DEFAULT-CF entries once). 4. Submit one Dask task per non-empty worker that runs the existing indexer pipeline with an SST sink, writing SSTs to `local_staging` - and (if different) moving them to `lustre_staging`. + and (if different) moving them to `shared_staging`. 5. Collect artifact dicts into an SstArtifactRegistry; coordinator calls bulk_ingest + rebuild_root_summaries. @@ -849,9 +849,9 @@ def distributed_index( directory: Directory containing trace files. files: Explicit file list (alternative to directory). index_path: Target .dftindex path (coordinator-writable). - local_staging: Per-worker SST build dir. If equal to lustre_staging, + local_staging: Per-worker SST build dir. If equal to shared_staging, no post-build move. - lustre_staging: Shared FS dir the coordinator reads SSTs from during + shared_staging: Shared FS dir the coordinator reads SSTs from during ingest. Must be on the same filesystem as index_path for the cheapest ingest. client: Dask distributed Client. None -> run tasks inline. @@ -872,8 +872,8 @@ def distributed_index( raise ValueError("index_path is required") if not local_staging: raise ValueError("local_staging is required") - if not lustre_staging: - lustre_staging = local_staging + if not shared_staging: + shared_staging = local_staging import logging as _logging import time as _time @@ -987,7 +987,7 @@ def distributed_index( # parallel lists. A file split across workers appears once per slice. index_dir = os.path.dirname(index_path.rstrip("/")) os.makedirs(local_staging, exist_ok=True) - os.makedirs(lustre_staging, exist_ok=True) + os.makedirs(shared_staging, exist_ok=True) worker_file_lists: List[List[str]] = [] worker_file_ids: List[List[int]] = [] @@ -1038,7 +1038,7 @@ def distributed_index( ids_w, slices_w, local_staging, - lustre_staging, + shared_staging, f"worker_{w}", index_dir, checkpoint_size, @@ -1068,7 +1068,7 @@ def distributed_index( ids_w, slices_w, local_staging, - lustre_staging, + shared_staging, f"worker_{w}", index_dir, checkpoint_size, diff --git a/src/dftracer/utils/python/batch_indexer.cpp b/src/dftracer/utils/python/batch_indexer.cpp index ef7bb8be..1ae82fc0 100644 --- a/src/dftracer/utils/python/batch_indexer.cpp +++ b/src/dftracer/utils/python/batch_indexer.cpp @@ -1690,6 +1690,98 @@ DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) { return output; } +// Two-pass scan over SYSTEM_METRICS CF: pass 1 discovers metric column names +// (dynamic per workload), pass 2 emits rows. Needed because RecordBatchBuilder +// requires the schema up front. +std::vector scan_system_metrics_buffer( + const EventAggregator* agg, const DfanalyzerContext* ctx, + Py_ssize_t batch_size) { + std::vector results; + if (!agg) return results; + + std::vector metric_names_ordered; + std::unordered_map metric_name_index; + agg->scan_system_metrics_raw([&](std::string_view, + std::string_view val_bytes) -> bool { + auto m = deserialize_system_value(val_bytes); + if (m.metrics) { + for (const auto& [name, _] : *m.metrics) { + if (metric_name_index.find(name) == metric_name_index.end()) { + metric_name_index.emplace(name, + metric_names_ordered.size()); + metric_names_ordered.push_back(name); + } + } + } + return true; + }); + + if (metric_names_ordered.empty()) return results; + + std::vector schema; + schema.reserve(5 + metric_names_ordered.size()); + schema.push_back({"host_hash", ColumnType::DICT_STRING}); + schema.push_back({"name", ColumnType::DICT_STRING}); + schema.push_back({"ts", ColumnType::INT64}); + schema.push_back({"te", ColumnType::INT64}); + schema.push_back({"count", ColumnType::INT64}); + for (const auto& mn : metric_names_ordered) { + schema.push_back({mn, ColumnType::DOUBLE}); + } + + RecordBatchBuilder builder; + builder.declare_schema(schema); + builder.reserve(static_cast(batch_size)); + + auto flush = [&](std::size_t& row_count) { + if (row_count == 0) return; + auto arrow = builder.finish(); + if (arrow.valid()) results.push_back(std::move(arrow)); + builder.reset(true); + builder.reserve(static_cast(batch_size)); + row_count = 0; + }; + + std::size_t row_count = 0; + const std::size_t n_metric_cols = metric_names_ordered.size(); + + agg->scan_system_metrics_raw( + [&](std::string_view key_bytes, std::string_view val_bytes) -> bool { + auto k = deserialize_system_key(key_bytes); + auto m = deserialize_system_value(val_bytes); + + std::size_t ci = 0; + builder.append_dict_string(ci++, k.key.hhash); + builder.append_dict_string(ci++, k.key.name); + builder.append_int64(ci++, static_cast(m.ts)); + builder.append_int64(ci++, static_cast(m.te)); + builder.append_int64(ci++, static_cast(m.count)); + + for (std::size_t i = 0; i < n_metric_cols; ++i) { + const auto& mn = metric_names_ordered[i]; + bool present = false; + if (m.metrics) { + auto it = m.metrics->find(mn); + if (it != m.metrics->end()) { + builder.append_double(ci++, it->second.mean); + present = true; + } + } + if (!present) builder.append_null(ci++); + } + builder.end_row(); + row_count++; + if (static_cast(row_count) >= batch_size) { + flush(row_count); + } + return true; + }); + flush(row_count); + + (void)ctx; + return results; +} + } // namespace static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args, @@ -2006,6 +2098,10 @@ static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self, for (auto& r : out.system) system_results.push_back(std::move(r)); } + + auto sys_buf = + scan_system_metrics_buffer(handle->agg.get(), &ctx, batch_size); + for (auto& r : sys_buf) system_results.push_back(std::move(r)); } } catch (const std::exception& e) { error_msg = e.what(); diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp index 5db29fcb..37569e78 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp @@ -311,7 +311,7 @@ void AggregationVisitor::handle_system_event(const EventRecord& record) { if (time_bucket < min_time_bucket_) min_time_bucket_ = time_bucket; if (time_bucket > max_time_bucket_) max_time_bucket_ = time_bucket; - serialize_system_key_into(system_key_buf_, hhash, time_bucket); + serialize_system_key_into(system_key_buf_, hhash, ev.name, time_bucket); auto [it, inserted] = system_buffer_.try_emplace(system_key_buf_, config_.sketch_accuracy); diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp index a840a2a0..2d9f9ace 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp @@ -194,6 +194,23 @@ std::size_t EventAggregator::scan_shard_range_raw_fn(std::uint16_t shard_begin, return count; } +std::size_t EventAggregator::scan_system_metrics_raw_fn(RawScanCallbackFn fn, + void* ctx) const { + if (!rocksdb_mode_ || !db_) return 0; + + auto it = db_->new_iterator(rcf::SYSTEM_METRICS); + std::size_t count = 0; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + auto key_slice = it->key(); + auto val_slice = it->value(); + count++; + if (!fn(ctx, std::string_view(key_slice.data(), key_slice.size()), + std::string_view(val_slice.data(), val_slice.size()))) + break; + } + return count; +} + std::size_t EventAggregator::scan_shard_range(std::uint16_t shard_begin, std::uint16_t shard_end, ScanCallback callback) const { diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.cpp index d2db20c7..d41d5cd3 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.cpp @@ -56,25 +56,28 @@ FloatMetricStats deserialize_float_metric_stats(BinaryReader& r, } // namespace void serialize_system_key_into(std::string& out, std::string_view hhash, + std::string_view name, std::uint64_t time_bucket) { out.clear(); - out.reserve(2 + hhash.size() + 10); + out.reserve(4 + hhash.size() + name.size() + 10); put_str(out, hhash); + put_str(out, name); put_varint(out, time_bucket); } -std::string serialize_system_key(std::string_view hhash, +std::string serialize_system_key(std::string_view hhash, std::string_view name, std::uint64_t time_bucket) { std::string out; - serialize_system_key_into(out, hhash, time_bucket); + serialize_system_key_into(out, hhash, name, time_bucket); return out; } DeserializedSystemKey deserialize_system_key(std::string_view data) { BinaryReader r(data); auto hhash = r.str(); + auto name = r.str(); auto time_bucket = r.varint(); - return {{std::string(hhash), time_bucket}}; + return {{std::string(hhash), std::string(name), time_bucket}}; } void serialize_system_value_into(std::string& out, diff --git a/tests/python/test_distributed_manifest.py b/tests/python/test_distributed_manifest.py index c24d183d..9dd13aed 100644 --- a/tests/python/test_distributed_manifest.py +++ b/tests/python/test_distributed_manifest.py @@ -34,7 +34,7 @@ def _build_distributed(env, pids, num_events=100, rebuild_root=True): files=files, index_path=index_path, local_staging=staging, - lustre_staging=staging, + shared_staging=staging, client=None, aggregation_config=AGG_CFG, rebuild_root_summaries=rebuild_root, @@ -108,9 +108,9 @@ def test_move_artifacts_preserves_per_file_agg_ssts(self): for p in [1, 2, 3, 4] ] local_staging = os.path.join(env.temp_dir, "local_stage") - lustre_staging = os.path.join(env.temp_dir, "lustre_stage") + shared_staging = os.path.join(env.temp_dir, "lustre_stage") os.makedirs(local_staging, exist_ok=True) - os.makedirs(lustre_staging, exist_ok=True) + os.makedirs(shared_staging, exist_ok=True) index_dir = os.path.join(env.temp_dir, "idx") os.makedirs(index_dir, exist_ok=True) index_path = os.path.join(index_dir, ".dftindex") @@ -119,7 +119,7 @@ def test_move_artifacts_preserves_per_file_agg_ssts(self): files=files, index_path=index_path, local_staging=local_staging, - lustre_staging=lustre_staging, + shared_staging=shared_staging, client=None, aggregation_config=AGG_CFG, ) @@ -195,7 +195,7 @@ def test_multi_worker_with_local_cluster(self): files=files, index_path=index_path, local_staging=staging, - lustre_staging=staging, + shared_staging=staging, client=client, aggregation_config=AGG_CFG, ) diff --git a/tests/utilities/composites/dft/aggregators/test_system_metrics.cpp b/tests/utilities/composites/dft/aggregators/test_system_metrics.cpp index a30880ad..2c086187 100644 --- a/tests/utilities/composites/dft/aggregators/test_system_metrics.cpp +++ b/tests/utilities/composites/dft/aggregators/test_system_metrics.cpp @@ -235,12 +235,14 @@ TEST_SUITE("SystemAggregationMetrics") { TEST_SUITE("SystemMetricsSerialization") { TEST_CASE("key serialization round-trip") { std::string hhash = "host123"; + std::string name = "cpu"; std::uint64_t time_bucket = 42; - std::string serialized = serialize_system_key(hhash, time_bucket); + std::string serialized = serialize_system_key(hhash, name, time_bucket); auto deserialized = deserialize_system_key(serialized); CHECK(deserialized.key.hhash == hhash); + CHECK(deserialized.key.name == name); CHECK(deserialized.key.time_bucket == time_bucket); } From a4eaed4d45a175e1c9a4b4142305a0180a8f4435 Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Wed, 20 May 2026 08:18:32 -0500 Subject: [PATCH 2/3] feat(aggregator): add offset metric tracking and time bucket persistence --- .../dft/aggregators/aggregation_metrics.h | 5 ++ .../aggregators/aggregation_serialization.h | 15 ++++++ .../dft/aggregators/event_aggregator.h | 6 +++ src/dftracer/utils/python/batch_indexer.cpp | 50 +++++++++++++++---- .../dft/aggregators/aggregation_logic.cpp | 21 ++++++-- .../dft/aggregators/aggregation_metrics.cpp | 6 +++ .../aggregators/aggregation_serialization.cpp | 8 ++- .../dft/aggregators/aggregation_visitor.cpp | 41 ++++++++++++++- .../dft/aggregators/event_aggregator.cpp | 11 ++++ .../dft/indexing/resolve_and_build.cpp | 5 ++ 10 files changed, 151 insertions(+), 17 deletions(-) diff --git a/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.h b/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.h index e3fb9576..bc340617 100644 --- a/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.h +++ b/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.h @@ -79,6 +79,7 @@ struct AggregationMetrics { MetricStats duration; MetricStats size; + MetricStats offset; std::uint64_t ts = std::numeric_limits::max(); std::uint64_t te = 0; @@ -94,12 +95,14 @@ struct AggregationMetrics { explicit AggregationMetrics(double relative_accuracy = 0.01) : duration(relative_accuracy), size(relative_accuracy), + offset(relative_accuracy), sketch_accuracy(relative_accuracy) {} AggregationMetrics(const AggregationMetrics& other) : count(other.count), duration(other.duration), size(other.size), + offset(other.offset), ts(other.ts), te(other.te), boundary_associations( @@ -120,6 +123,7 @@ struct AggregationMetrics { count = other.count; duration = other.duration; size = other.size; + offset = other.offset; ts = other.ts; te = other.te; boundary_associations = @@ -143,6 +147,7 @@ struct AggregationMetrics { void update_duration(std::uint64_t dur, bool compute_percentiles = false); void update_size(std::uint64_t sz, bool compute_percentiles = false); + void update_offset(std::uint64_t off, bool compute_percentiles = false); void update_timestamp(std::uint64_t event_ts, std::uint64_t dur); void update_timestamp_clamped(std::uint64_t event_ts, std::uint64_t dur, std::uint64_t bucket_start, diff --git a/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h b/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h index 08d596a3..ac116bec 100644 --- a/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h +++ b/include/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h @@ -184,6 +184,9 @@ struct AggMetricsView { std::uint64_t size_total; std::uint64_t size_min; std::uint64_t size_max; + std::uint64_t offset_total; + std::uint64_t offset_min; + std::uint64_t offset_max; std::uint64_t ts; std::uint64_t te; }; @@ -202,6 +205,11 @@ struct AggMetricsFullView { std::uint64_t size_max; double size_mean; double size_m2; + std::uint64_t offset_total; + std::uint64_t offset_min; + std::uint64_t offset_max; + double offset_mean; + double offset_m2; std::uint64_t ts; std::uint64_t te; @@ -212,6 +220,10 @@ struct AggMetricsFullView { return count > 1 ? std::sqrt(size_m2 / static_cast(count)) : 0.0; } + double offset_stddev() const { + return count > 1 ? std::sqrt(offset_m2 / static_cast(count)) + : 0.0; + } }; /// Fast value parser for Arrow export - skips mean/m2/m3/m4/sketch. @@ -258,6 +270,7 @@ inline bool parse_agg_value_view(std::string_view data, AggMetricsView& out) { out.count = read_varint(); read_metric_stats_partial(out.dur_total, out.dur_min, out.dur_max); read_metric_stats_partial(out.size_total, out.size_min, out.size_max); + read_metric_stats_partial(out.offset_total, out.offset_min, out.offset_max); out.ts = read_varint(); out.te = read_varint(); @@ -323,6 +336,8 @@ inline bool parse_agg_value_full_view(std::string_view data, out.dur_mean, out.dur_m2); read_metric_stats_full(out.size_total, out.size_min, out.size_max, out.size_mean, out.size_m2); + read_metric_stats_full(out.offset_total, out.offset_min, out.offset_max, + out.offset_mean, out.offset_m2); out.ts = read_varint(); out.te = read_varint(); diff --git a/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h b/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h index 8d69216a..722dcae0 100644 --- a/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h +++ b/include/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h @@ -103,6 +103,12 @@ class EventAggregator { }; TimeBoundsResult query_time_bounds() const; + /// Persist the in-memory min/max time bucket to the AGGREGATION CF so a + /// later read-only reopen can recover the trace origin. finalize() does + /// this too; the SST build path needs it called explicitly after + /// merge_chunk(). + void persist_time_bounds(); + bool is_rocksdb_mode() const { return rocksdb_mode_; } std::shared_ptr db() const { return db_; } std::uint32_t config_hash() const { return config_hash_; } diff --git a/src/dftracer/utils/python/batch_indexer.cpp b/src/dftracer/utils/python/batch_indexer.cpp index 1ae82fc0..7052ecd6 100644 --- a/src/dftracer/utils/python/batch_indexer.cpp +++ b/src/dftracer/utils/python/batch_indexer.cpp @@ -1026,18 +1026,18 @@ class HashResolver { } } + // Unresolved hashes resolve to empty (not the hash itself): the + // dfanalyzer side treats empty file_name/host_name as missing (NA). std::string_view resolve_file(std::string_view hash) { if (hash.empty()) return hash; - auto interned = intern_.intern(hash); - auto it = file_map_.find(interned); - return it != file_map_.end() ? it->second : interned; + auto it = file_map_.find(intern_.intern(hash)); + return it != file_map_.end() ? it->second : std::string_view{}; } std::string_view resolve_host(std::string_view hash) { if (hash.empty()) return hash; - auto interned = intern_.intern(hash); - auto it = host_map_.find(interned); - return it != host_map_.end() ? it->second : interned; + auto it = host_map_.find(intern_.intern(hash)); + return it != host_map_.end() ? it->second : std::string_view{}; } std::string_view intern(std::string_view sv) { return intern_.intern(sv); } @@ -1086,6 +1086,8 @@ static const std::vector DFANALYZER_SCHEMA = { {"time_max", ColumnType::DOUBLE}, {"size_min", ColumnType::INT64}, {"size_max", ColumnType::INT64}, + {"offset_min", ColumnType::INT64}, + {"offset_max", ColumnType::INT64}, {"time_range", ColumnType::INT64}, {"time_start", ColumnType::INT64}, {"time_end", ColumnType::INT64}, @@ -1354,16 +1356,39 @@ DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) { builder.append_null(ci++); } + // offset_min > offset_max only when no offset was ever recorded + // (MetricStats default min=UINT64_MAX, max=0); 0 is a valid offset. + if (mv.offset_min <= mv.offset_max) { + builder.append_int64(ci++, + static_cast(mv.offset_min)); + builder.append_int64(ci++, + static_cast(mv.offset_max)); + } else { + builder.append_null(ci++); + builder.append_null(ci++); + } + auto time_range = bucket_width_us > 0 ? static_cast( (kv.time_bucket - input.ctx->time_origin) / bucket_width_us) : 0; builder.append_int64(ci++, time_range); - builder.append_int64( - ci++, static_cast(mv.ts - input.ctx->time_origin)); - builder.append_int64( - ci++, static_cast(mv.te - input.ctx->time_origin)); + // Counter (profile) rows align to the bucket grid: time_start is the + // bucket start, time_end one bucket later. Plain events keep the + // precise min/max event timestamps. + if (kv.map_type == AggMapType::PROFILE) { + auto bucket_start = static_cast( + kv.time_bucket - input.ctx->time_origin); + builder.append_int64(ci++, bucket_start); + builder.append_int64(ci++, bucket_start + static_cast( + bucket_width_us)); + } else { + builder.append_int64(ci++, static_cast( + mv.ts - input.ctx->time_origin)); + builder.append_int64(ci++, static_cast( + mv.te - input.ctx->time_origin)); + } builder.end_row(); count++; @@ -1719,9 +1744,10 @@ std::vector scan_system_metrics_buffer( if (metric_names_ordered.empty()) return results; std::vector schema; - schema.reserve(5 + metric_names_ordered.size()); + schema.reserve(6 + metric_names_ordered.size()); schema.push_back({"host_hash", ColumnType::DICT_STRING}); schema.push_back({"name", ColumnType::DICT_STRING}); + schema.push_back({"time_bucket", ColumnType::INT64}); schema.push_back({"ts", ColumnType::INT64}); schema.push_back({"te", ColumnType::INT64}); schema.push_back({"count", ColumnType::INT64}); @@ -1753,6 +1779,8 @@ std::vector scan_system_metrics_buffer( std::size_t ci = 0; builder.append_dict_string(ci++, k.key.hhash); builder.append_dict_string(ci++, k.key.name); + builder.append_int64(ci++, + static_cast(k.key.time_bucket)); builder.append_int64(ci++, static_cast(m.ts)); builder.append_int64(ci++, static_cast(m.te)); builder.append_int64(ci++, static_cast(m.count)); diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp index 66642760..ea58ab3d 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp @@ -11,9 +11,9 @@ void apply_preaggregated_metric(MetricStats& stats, std::uint64_t ev_count, const ArgsValueProxy& sum_val, const ArgsValueProxy& min_val, const ArgsValueProxy& max_val) { - if (!sum_val.exists()) return; + if (!sum_val.exists() && !min_val.exists() && !max_val.exists()) return; - const auto total = sum_val.get(); + const auto total = sum_val.exists() ? sum_val.get() : 0; stats.count += ev_count; stats.total += total; if (min_val.exists()) { @@ -118,6 +118,15 @@ void update_aggregation_entry(const DFTracerEvent& ev, apply_preaggregated_metric(metrics.size, ev_count, a_size_sum, a_size_min, a_size_max); + auto a_off_sum = ev.args["offset_sum"]; + if (!a_off_sum.exists()) a_off_sum = ev.args["offset"]; + auto a_off_min = ev.args["offset_min"]; + if (!a_off_min.exists()) a_off_min = ev.args["offset"]; + auto a_off_max = ev.args["offset_max"]; + if (!a_off_max.exists()) a_off_max = ev.args["offset"]; + apply_preaggregated_metric(metrics.offset, ev_count, a_off_sum, + a_off_min, a_off_max); + metrics.update_timestamp(ev.ts, config.time_interval_us); } else { metrics.update_duration(ev.dur, config.compute_percentiles); @@ -129,6 +138,11 @@ void update_aggregation_entry(const DFTracerEvent& ev, std::uint64_t size = ret.get(); metrics.update_size(size, config.compute_percentiles); } + auto off = ev.args["offset"]; + if (off.exists()) { + metrics.update_offset(off.get(), + config.compute_percentiles); + } } auto track_metric_field = [&](std::string_view field) { @@ -176,7 +190,8 @@ void update_aggregation_entry(const DFTracerEvent& ev, return k == "hhash" || k == "fhash" || k == "dft_cnt" || k == "dur" || k == "dur_sum" || k == "dur_min" || k == "dur_max" || k == "ret" || k == "ret_sum" || - k == "ret_min" || k == "ret_max"; + k == "ret_min" || k == "ret_max" || k == "offset" || + k == "offset_sum" || k == "offset_min" || k == "offset_max"; }; auto is_preagg_suffix = [](std::string_view k) { diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.cpp index 76759b18..0fa260f4 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_metrics.cpp @@ -117,6 +117,11 @@ void AggregationMetrics::update_size(std::uint64_t sz, size.update(sz, compute_percentiles); } +void AggregationMetrics::update_offset(std::uint64_t off, + bool compute_percentiles) { + offset.update(off, compute_percentiles); +} + void AggregationMetrics::update_timestamp(std::uint64_t event_ts, std::uint64_t dur) { if (event_ts < ts) ts = event_ts; @@ -158,6 +163,7 @@ void AggregationMetrics::merge_from(const AggregationMetrics& other) { duration.merge_from(other.duration); size.merge_from(other.size); + offset.merge_from(other.offset); ts = std::min(ts, other.ts); te = std::max(te, other.te); diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.cpp index faa8b849..75abd932 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.cpp @@ -263,7 +263,7 @@ DeserializedAggKey deserialize_agg_key(std::string_view data) { void serialize_agg_value_into(std::string& out, const AggregationMetrics& m) { // Fast path: no sketches anywhere. Pre-size to a conservative upper // bound and write directly via pointer, then shrink. - bool has_sketch = m.duration.sketch || m.size.sketch; + bool has_sketch = m.duration.sketch || m.size.sketch || m.offset.sketch; if (!has_sketch && m.custom_metrics) { for (const auto& [_, ms] : *m.custom_metrics) { if (ms.sketch) { @@ -283,7 +283,8 @@ void serialize_agg_value_into(std::string& out, const AggregationMetrics& m) { } const std::size_t max_total = 10 /*count*/ + METRIC_STATS_MAX_BYTES_NO_SKETCH /*dur*/ + - METRIC_STATS_MAX_BYTES_NO_SKETCH /*size*/ + 10 + 10 + + METRIC_STATS_MAX_BYTES_NO_SKETCH /*size*/ + + METRIC_STATS_MAX_BYTES_NO_SKETCH /*offset*/ + 10 + 10 + 10 /*ts/te/parent*/ + 10 /*num_custom*/ + custom_bytes; out.resize(max_total); char* begin = out.data(); @@ -291,6 +292,7 @@ void serialize_agg_value_into(std::string& out, const AggregationMetrics& m) { p = write_varint(p, m.count); p = write_metric_stats(p, m.duration); p = write_metric_stats(p, m.size); + p = write_metric_stats(p, m.offset); p = write_varint(p, m.ts); p = write_varint(p, m.te); p = write_varint(p, m.parent_pid); @@ -313,6 +315,7 @@ void serialize_agg_value_into(std::string& out, const AggregationMetrics& m) { put_varint(out, m.count); serialize_metric_stats(out, m.duration); serialize_metric_stats(out, m.size); + serialize_metric_stats(out, m.offset); put_varint(out, m.ts); put_varint(out, m.te); put_varint(out, m.parent_pid); @@ -342,6 +345,7 @@ AggregationMetrics deserialize_agg_value(std::string_view data) { m.count = r.varint(); m.duration = deserialize_metric_stats(r, m.sketch_accuracy); m.size = deserialize_metric_stats(r, m.sketch_accuracy); + m.offset = deserialize_metric_stats(r, m.sketch_accuracy); m.ts = r.varint(); m.te = r.varint(); m.parent_pid = r.varint(); diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp index 37569e78..b142d82d 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp @@ -28,6 +28,9 @@ inline bool is_reserved_arg(std::string_view k) { case 'r': return k == "ret" || k == "ret_sum" || k == "ret_min" || k == "ret_max"; + case 'o': + return k == "offset" || k == "offset_sum" || k == "offset_min" || + k == "offset_max"; } return false; } @@ -140,7 +143,12 @@ void AggregationVisitor::on_event(const EventRecord& record) { auto hhash = ev.args["hhash"].get(); auto fhash = ev.args["fhash"].get(); - auto time_bucket = compute_time_bucket(ev.ts, ev.dur, config_); + // Counter (ph="C") events report stats for the period ending at ev.ts, so + // a boundary-aligned timestamp belongs to the bucket it summarizes (the + // one before it). Plain events keep their own timestamp. + auto bucket_ts = + (map_type == AggMapType::PROFILE && ev.ts > 0) ? ev.ts - 1 : ev.ts; + auto time_bucket = compute_time_bucket(bucket_ts, ev.dur, config_); if (time_bucket < min_time_bucket_) min_time_bucket_ = time_bucket; if (time_bucket > max_time_bucket_) max_time_bucket_ = time_bucket; @@ -221,6 +229,33 @@ void AggregationVisitor::on_event(const EventRecord& record) { entry.size.merge_from(tmp); } + // offset has no meaningful "sum"; a counter event may carry only + // offset_min/offset_max, so trigger on any of the offset args. + auto a_off_sum = ev.args["offset_sum"]; + auto a_off_plain = ev.args["offset"]; + auto a_off_min = ev.args["offset_min"]; + auto a_off_max = ev.args["offset_max"]; + if (a_off_sum.exists() || a_off_plain.exists() || a_off_min.exists() || + a_off_max.exists()) { + MetricStats tmp(config_.sketch_accuracy); + tmp.count = ev_count; + tmp.total = a_off_sum.exists() ? a_off_sum.get() + : a_off_plain.exists() + ? a_off_plain.get() + : 0; + tmp.min = a_off_min.exists() ? a_off_min.get() + : a_off_plain.exists() ? a_off_plain.get() + : tmp.total; + tmp.max = a_off_max.exists() ? a_off_max.get() + : a_off_plain.exists() ? a_off_plain.get() + : tmp.total; + if (tmp.count > 0) { + tmp.mean = static_cast(tmp.total) / + static_cast(tmp.count); + } + entry.offset.merge_from(tmp); + } + entry.update_timestamp(ev.ts, config_.time_interval_us); } else { entry.update_duration(ev.dur, compute_percentiles); @@ -230,6 +265,10 @@ void AggregationVisitor::on_event(const EventRecord& record) { if (ret.exists() && internal::is_data_transfer_op(ev.cat, ev.name)) { entry.update_size(ret.get(), compute_percentiles); } + auto off = ev.args["offset"]; + if (off.exists()) { + entry.update_offset(off.get(), compute_percentiles); + } } if (config_.track_default_args) { diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp index 2d9f9ace..f3bd3081 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp @@ -449,6 +449,17 @@ void EventAggregator::update_time_bounds(std::uint64_t time_bucket) { } } +void EventAggregator::persist_time_bounds() { + if (!rocksdb_mode_ || !db_) return; + auto min_tb = min_time_bucket_.load(std::memory_order_relaxed); + auto max_tb = max_time_bucket_.load(std::memory_order_relaxed); + if (min_tb != UINT64_MAX && max_tb != 0 && min_tb <= max_tb) { + std::string time_bounds_val = rocks::KeyCodec::encode_be64(min_tb); + time_bounds_val += rocks::KeyCodec::encode_be64(max_tb); + db_->put(TIME_BOUNDS_DB_KEY, time_bounds_val, rcf::AGGREGATION); + } +} + std::uint64_t EventAggregator::min_time_bucket() const { return min_time_bucket_.load(std::memory_order_relaxed); } diff --git a/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp b/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp index fb8e65a3..09f0bf9e 100644 --- a/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp +++ b/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp @@ -132,6 +132,11 @@ coro::CoroTask resolve_and_build_index( file_visitors.clear(); } + // Persist accumulated min/max time bucket so a later read-only + // reopen recovers the trace origin (otherwise time_range is + // emitted as an absolute bucket index). + merger->persist_time_bounds(); + // Write global config and per-file markers if (!processed_files.empty()) { namespace rcf = dftracer::utils::rocksdb::cf; From 359e8363c1321ce12e30b90cd336d90515a01e7b Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Wed, 20 May 2026 09:11:47 -0500 Subject: [PATCH 3/3] feat(aggregator): improve system metrics scanning and persistence error handling --- src/dftracer/utils/python/batch_indexer.cpp | 30 +++++++++++-------- .../dft/aggregators/event_aggregator.cpp | 16 ++++++++-- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/dftracer/utils/python/batch_indexer.cpp b/src/dftracer/utils/python/batch_indexer.cpp index 7052ecd6..21ca318b 100644 --- a/src/dftracer/utils/python/batch_indexer.cpp +++ b/src/dftracer/utils/python/batch_indexer.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include using dftracer::utils::CoroScope; @@ -1725,24 +1726,27 @@ std::vector scan_system_metrics_buffer( if (!agg) return results; std::vector metric_names_ordered; - std::unordered_map metric_name_index; - agg->scan_system_metrics_raw([&](std::string_view, - std::string_view val_bytes) -> bool { - auto m = deserialize_system_value(val_bytes); - if (m.metrics) { - for (const auto& [name, _] : *m.metrics) { - if (metric_name_index.find(name) == metric_name_index.end()) { - metric_name_index.emplace(name, - metric_names_ordered.size()); - metric_names_ordered.push_back(name); + std::unordered_set metric_name_seen; + agg->scan_system_metrics_raw( + [&](std::string_view, std::string_view val_bytes) -> bool { + auto m = deserialize_system_value(val_bytes); + if (m.metrics) { + for (const auto& [name, _] : *m.metrics) { + if (metric_name_seen.insert(name).second) { + metric_names_ordered.push_back(name); + } } } - } - return true; - }); + return true; + }); if (metric_names_ordered.empty()) return results; + // SystemAggregationMetrics::metrics is an unordered_map; sort the + // discovered column names so the emitted Arrow schema is deterministic + // across runs and builds. + std::sort(metric_names_ordered.begin(), metric_names_ordered.end()); + std::vector schema; schema.reserve(6 + metric_names_ordered.size()); schema.push_back({"host_hash", ColumnType::DICT_STRING}); diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp index f3bd3081..d4a0c960 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.cpp @@ -208,6 +208,10 @@ std::size_t EventAggregator::scan_system_metrics_raw_fn(RawScanCallbackFn fn, std::string_view(val_slice.data(), val_slice.size()))) break; } + if (!it->status().ok()) { + DFTRACER_UTILS_LOG_ERROR("SYSTEM_METRICS scan iterator error: %s", + it->status().ToString().c_str()); + } return count; } @@ -453,10 +457,18 @@ void EventAggregator::persist_time_bounds() { if (!rocksdb_mode_ || !db_) return; auto min_tb = min_time_bucket_.load(std::memory_order_relaxed); auto max_tb = max_time_bucket_.load(std::memory_order_relaxed); - if (min_tb != UINT64_MAX && max_tb != 0 && min_tb <= max_tb) { + // min_tb == UINT64_MAX is the only "no events seen" sentinel; a real + // bucket range can legitimately be [0, 0] (relative time, first bucket). + if (min_tb != UINT64_MAX && min_tb <= max_tb) { std::string time_bounds_val = rocks::KeyCodec::encode_be64(min_tb); time_bounds_val += rocks::KeyCodec::encode_be64(max_tb); - db_->put(TIME_BOUNDS_DB_KEY, time_bounds_val, rcf::AGGREGATION); + auto status = + db_->put(TIME_BOUNDS_DB_KEY, time_bounds_val, rcf::AGGREGATION); + if (!status.ok()) { + DFTRACER_UTILS_LOG_ERROR( + "Failed to persist aggregation time bounds: %s", + status.ToString().c_str()); + } } }