Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct AggregationMetrics {

MetricStats duration;
MetricStats size;
MetricStats offset;

std::uint64_t ts = std::numeric_limits<std::uint64_t>::max();
std::uint64_t te = 0;
Expand All @@ -94,12 +95,14 @@ struct AggregationMetrics {
explicit AggregationMetrics(double relative_accuracy = 0.01)
: duration(relative_accuracy),
size(relative_accuracy),
offset(relative_accuracy),
sketch_accuracy(relative_accuracy) {}

AggregationMetrics(const AggregationMetrics& other)
: count(other.count),
duration(other.duration),
size(other.size),
offset(other.offset),
ts(other.ts),
te(other.te),
boundary_associations(
Expand All @@ -120,6 +123,7 @@ struct AggregationMetrics {
count = other.count;
duration = other.duration;
size = other.size;
offset = other.offset;
ts = other.ts;
te = other.te;
boundary_associations =
Expand All @@ -143,6 +147,7 @@ struct AggregationMetrics {

void update_duration(std::uint64_t dur, bool compute_percentiles = false);
void update_size(std::uint64_t sz, bool compute_percentiles = false);
void update_offset(std::uint64_t off, bool compute_percentiles = false);
void update_timestamp(std::uint64_t event_ts, std::uint64_t dur);
void update_timestamp_clamped(std::uint64_t event_ts, std::uint64_t dur,
std::uint64_t bucket_start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ struct AggMetricsView {
std::uint64_t size_total;
std::uint64_t size_min;
std::uint64_t size_max;
std::uint64_t offset_total;
std::uint64_t offset_min;
std::uint64_t offset_max;
std::uint64_t ts;
std::uint64_t te;
};
Expand All @@ -202,6 +205,11 @@ struct AggMetricsFullView {
std::uint64_t size_max;
double size_mean;
double size_m2;
std::uint64_t offset_total;
std::uint64_t offset_min;
std::uint64_t offset_max;
double offset_mean;
double offset_m2;
std::uint64_t ts;
std::uint64_t te;

Expand All @@ -212,6 +220,10 @@ struct AggMetricsFullView {
return count > 1 ? std::sqrt(size_m2 / static_cast<double>(count))
: 0.0;
}
double offset_stddev() const {
return count > 1 ? std::sqrt(offset_m2 / static_cast<double>(count))
: 0.0;
}
};

/// Fast value parser for Arrow export - skips mean/m2/m3/m4/sketch.
Expand Down Expand Up @@ -258,6 +270,7 @@ inline bool parse_agg_value_view(std::string_view data, AggMetricsView& out) {
out.count = read_varint();
read_metric_stats_partial(out.dur_total, out.dur_min, out.dur_max);
read_metric_stats_partial(out.size_total, out.size_min, out.size_max);
read_metric_stats_partial(out.offset_total, out.offset_min, out.offset_max);
out.ts = read_varint();
out.te = read_varint();

Expand Down Expand Up @@ -323,6 +336,8 @@ inline bool parse_agg_value_full_view(std::string_view data,
out.dur_mean, out.dur_m2);
read_metric_stats_full(out.size_total, out.size_min, out.size_max,
out.size_mean, out.size_m2);
read_metric_stats_full(out.offset_total, out.offset_min, out.offset_max,
out.offset_mean, out.offset_m2);
out.ts = read_varint();
out.te = read_varint();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ class EventAggregator {
std::uint16_t shard_end,
RawScanCallbackFn fn, void* ctx) const;

/// Sequential scan of the SYSTEM_METRICS CF (no shard prefix in keys).
std::size_t scan_system_metrics_raw_fn(RawScanCallbackFn fn,
void* ctx) const;

template <typename F>
std::size_t scan_system_metrics_raw(F&& callback) const {
auto adapter =
+[](void* ctx, std::string_view k, std::string_view v) -> bool {
return (*static_cast<std::decay_t<F>*>(ctx))(k, v);
};
return scan_system_metrics_raw_fn(adapter,
static_cast<void*>(&callback));
}

/// Template wrapper: forwards any callable `(sv, sv) -> bool` into the
/// raw scan with zero heap allocations. The adapter lambda is a captureless
/// `+[]` so it decays to a plain function pointer.
Expand Down Expand Up @@ -89,6 +103,12 @@ class EventAggregator {
};
TimeBoundsResult query_time_bounds() const;

/// Persist the in-memory min/max time bucket to the AGGREGATION CF so a
/// later read-only reopen can recover the trace origin. finalize() does
/// this too; the SST build path needs it called explicitly after
/// merge_chunk().
void persist_time_bounds();

bool is_rocksdb_mode() const { return rocksdb_mode_; }
std::shared_ptr<rocksdb::RocksDatabase> db() const { return db_; }
std::uint32_t config_hash() const { return config_hash_; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@

namespace dftracer::utils::utilities::composites::dft::aggregators {

// System metrics key: [hhash:var][time_bucket:varint]
// Simpler key than regular aggregation since system metrics are host-level

// Per-event-name keying so cpu/memory/etc keep separate buckets and the
// dfanalyzer side can pivot them into named columns (sys_cpu_idle_pct, ...).
struct SystemMetricKey {
std::string hhash;
std::string name;
std::uint64_t time_bucket = 0;
};

void serialize_system_key_into(std::string& out, std::string_view hhash,
std::string_view name,
std::uint64_t time_bucket);
std::string serialize_system_key(std::string_view hhash,
std::string serialize_system_key(std::string_view hhash, std::string_view name,
std::uint64_t time_bucket);

struct DeserializedSystemKey {
Expand Down
24 changes: 12 additions & 12 deletions python/dftracer/utils/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ def _build_sst_task(
file_ids: List[int],
file_slices: Optional[List[Any]],
local_staging: str,
lustre_staging: str,
shared_staging: str,
batch_id: str,
index_dir: str,
checkpoint_size: int,
Expand Down Expand Up @@ -782,9 +782,9 @@ def _build_sst_task(
t_build = _time.monotonic()

n_moved = 0
if lustre_staging and lustre_staging != local_staging:
if shared_staging and shared_staging != local_staging:
# Keep per-sink subdir to avoid aggregation.sst collisions.
base = os.path.join(lustre_staging, batch_id)
base = os.path.join(shared_staging, batch_id)
relocated: List[Dict[str, Optional[str]]] = []
for i, d in enumerate(artifact_dicts):
relocated.append(move_artifacts(d, os.path.join(base, f"sub_{i}")))
Expand Down Expand Up @@ -820,7 +820,7 @@ def distributed_index(
files: Optional[List[str]] = None,
index_path: str = "",
local_staging: str = "",
lustre_staging: str = "",
shared_staging: str = "",
client: Optional["Client"] = None,
checkpoint_size: int = 32 * 1024 * 1024,
bloom_dimensions: Optional[List[str]] = None,
Expand All @@ -841,17 +841,17 @@ def distributed_index(
file_ids and writes DEFAULT-CF entries once).
4. Submit one Dask task per non-empty worker that runs the existing
indexer pipeline with an SST sink, writing SSTs to `local_staging`
and (if different) moving them to `lustre_staging`.
and (if different) moving them to `shared_staging`.
5. Collect artifact dicts into an SstArtifactRegistry; coordinator
calls bulk_ingest + rebuild_root_summaries.

Args:
directory: Directory containing trace files.
files: Explicit file list (alternative to directory).
index_path: Target .dftindex path (coordinator-writable).
local_staging: Per-worker SST build dir. If equal to lustre_staging,
local_staging: Per-worker SST build dir. If equal to shared_staging,
no post-build move.
lustre_staging: Shared FS dir the coordinator reads SSTs from during
shared_staging: Shared FS dir the coordinator reads SSTs from during
ingest. Must be on the same filesystem as index_path for the
cheapest ingest.
client: Dask distributed Client. None -> run tasks inline.
Expand All @@ -872,8 +872,8 @@ def distributed_index(
raise ValueError("index_path is required")
if not local_staging:
raise ValueError("local_staging is required")
if not lustre_staging:
lustre_staging = local_staging
if not shared_staging:
shared_staging = local_staging

import logging as _logging
import time as _time
Expand Down Expand Up @@ -987,7 +987,7 @@ def distributed_index(
# parallel lists. A file split across workers appears once per slice.
index_dir = os.path.dirname(index_path.rstrip("/"))
os.makedirs(local_staging, exist_ok=True)
os.makedirs(lustre_staging, exist_ok=True)
os.makedirs(shared_staging, exist_ok=True)

worker_file_lists: List[List[str]] = []
worker_file_ids: List[List[int]] = []
Expand Down Expand Up @@ -1038,7 +1038,7 @@ def distributed_index(
ids_w,
slices_w,
local_staging,
lustre_staging,
shared_staging,
f"worker_{w}",
index_dir,
checkpoint_size,
Expand Down Expand Up @@ -1068,7 +1068,7 @@ def distributed_index(
ids_w,
slices_w,
local_staging,
lustre_staging,
shared_staging,
f"worker_{w}",
index_dir,
checkpoint_size,
Expand Down
Loading
Loading