diff --git a/docs/source/api/dfanalyzer.rst b/docs/source/api/dfanalyzer.rst new file mode 100644 index 00000000..b4df48f0 --- /dev/null +++ b/docs/source/api/dfanalyzer.rst @@ -0,0 +1,78 @@ +DFAnalyzer Module +================= + +The ``dftracer.utils.dfanalyzer`` module bridges the C++ aggregation index to +`dfanalyzer `_. It provides +the index-build, Arrow-IPC marshalling, and distributed high-level-metrics +(HLM) helpers that dfanalyzer drives over a Dask cluster. + +These helpers only depend on the :class:`~dftracer.utils.Indexer` and the Arrow +plumbing, so they live in ``dftracer-utils`` rather than being vendored inside +dfanalyzer. + +Dask is an optional dependency -- the distributed helpers require +``dask.distributed`` to be installed. + +Index Building +-------------- + +.. autofunction:: dftracer.utils.dfanalyzer.resolve_trace_inputs + +.. autofunction:: dftracer.utils.dfanalyzer.index_path_for + +.. autofunction:: dftracer.utils.dfanalyzer.build_index_distributed + +.. autofunction:: dftracer.utils.dfanalyzer.ensure_index + +Arrow IPC Marshalling +--------------------- + +The C extension yields Arrow data as PyCapsules. These helpers convert between +capsules, Arrow IPC byte streams (the wire format moved between Dask workers), +and pandas frames. + +.. autofunction:: dftracer.utils.dfanalyzer.batches_to_ipc + +.. autofunction:: dftracer.utils.dfanalyzer.ipc_to_pandas + +.. autofunction:: dftracer.utils.dfanalyzer.scan_to_ipc + +Distributed High-Level Metrics +------------------------------ + +The HLM pipeline aggregates the per-worker aggregation column family into a +Dask DataFrame. Each worker owns a disjoint PID set, so per-worker partials +have disjoint keys and need no cross-worker merge. + +.. autofunction:: dftracer.utils.dfanalyzer.distributed_hlm + +.. autofunction:: dftracer.utils.dfanalyzer.worker_hlm_partial + +.. autofunction:: dftracer.utils.dfanalyzer.make_empty_hlm + +View Groupby Partials +--------------------- + +These helpers implement mergeable per-partition view aggregation: each +partition emits partial aggregates (sum, count, min, max, sum-of-squares) that +are combined and finalized into mean/std without a global shuffle. + +.. autofunction:: dftracer.utils.dfanalyzer.partial_arrow_view_groupby + +.. autofunction:: dftracer.utils.dfanalyzer.finalize_view_partials + +.. autofunction:: dftracer.utils.dfanalyzer.build_partial_meta + +.. autofunction:: dftracer.utils.dfanalyzer.build_final_meta + +Dtype Coercion +-------------- + +Utilities that normalize Arrow-backed dtypes into the pandas-native dtypes +expected by dfanalyzer's downstream ``metrics.py``. + +.. autofunction:: dftracer.utils.dfanalyzer.normalize_arrow_dtypes + +.. autofunction:: dftracer.utils.dfanalyzer.coerce_arrow_numerics_to_pandas_native + +.. autofunction:: dftracer.utils.dfanalyzer.coerce_profile_dtypes diff --git a/docs/source/api/index.rst b/docs/source/api/index.rst index 7762582f..51571c73 100644 --- a/docs/source/api/index.rst +++ b/docs/source/api/index.rst @@ -12,6 +12,7 @@ This section contains the Python API documentation for dftracer utilities. runtime reader indexer + dfanalyzer Module Overview --------------- @@ -23,3 +24,4 @@ The dftracer utilities Python package provides the following main modules: - :doc:`runtime` - Coroutine runtime and Dask integration - :doc:`reader` - Lazy JSON object type - :doc:`indexer` - Indexing and searching capabilities +- :doc:`dfanalyzer` - dfanalyzer bridge: index build, Arrow IPC, distributed HLM diff --git a/docs/source/conf.py b/docs/source/conf.py index 687f4718..f89c572a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -1512,6 +1512,8 @@ def linkcode_resolve(domain: str, info: dict[str, str]) -> str | None: sys.path.insert(0, str(PYTHON_SOURCE_DIR)) _install_rtd_extension_stub() autodoc_mock_imports = [ + "numpy", + "pandas", "pyarrow", "dask", "dask.distributed", diff --git a/docs/source/cpp_api/utilities.rst b/docs/source/cpp_api/utilities.rst index 262c9094..cec723fb 100644 --- a/docs/source/cpp_api/utilities.rst +++ b/docs/source/cpp_api/utilities.rst @@ -96,10 +96,10 @@ Each module below has detailed class documentation in the API Reference: - :doc:`api/utilities/filesystem` * - File I/O - File reading, writing, chunk writing, async line generators - - :doc:`api/utilities/fileio` + - :doc:`api/utilities/fileio/index` * - Compression - Streaming zlib compression (GZIP, ZLIB, DEFLATE) - - :doc:`api/utilities/composites` + - :doc:`api/utilities/composites/index` * - Text - Line splitting, filtering, text processing - :doc:`api/utilities/text` diff --git a/include/dftracer/utils/utilities/composites/dft/internal/utils.h b/include/dftracer/utils/utilities/composites/dft/internal/utils.h index fe287abd..4576d11d 100644 --- a/include/dftracer/utils/utilities/composites/dft/internal/utils.h +++ b/include/dftracer/utils/utilities/composites/dft/internal/utils.h @@ -6,8 +6,13 @@ namespace dftracer::utils::utilities::composites::dft::internal { +// Lowercase `s`; returns a view over `s` when already lowercase (no copy), +// else lowercases into `storage` (which must outlive the returned view). +std::string_view to_lower_ascii(std::string_view s, std::string& storage); + +bool ascii_iequals(std::string_view a, std::string_view b); + // True when the event's return value represents bytes transferred. -// Checks both category (POSIX/STDIO) and function name. bool is_data_transfer_op(std::string_view cat, std::string_view name); /** diff --git a/python/dftracer/utils/dask.py b/python/dftracer/utils/dask.py index 75c06311..baad235e 100644 --- a/python/dftracer/utils/dask.py +++ b/python/dftracer/utils/dask.py @@ -5,10 +5,11 @@ from typing import Any, Dict, List, Optional, Union try: - from dask.distributed import Client, WorkerPlugin + from dask.distributed import Client, WorkerPlugin, get_client except ImportError: Client: Optional[Any] = None WorkerPlugin: Optional[Any] = None + get_client: Optional[Any] = None try: import dask @@ -27,7 +28,7 @@ if WorkerPlugin is not None: - class DFTracerUtilsDaskWorkerPlugin(WorkerPlugin): + class DFTracerUtilsDaskWorkerPlugin(WorkerPlugin): # ty: ignore[unsupported-base] """Creates a persistent Runtime per Dask worker.""" def __init__(self, threads=0, io_threads=0): @@ -54,6 +55,105 @@ def teardown(self, worker): pass del worker.dftracer_utils_runtime +else: + DFTracerUtilsDaskWorkerPlugin = None + + +_plugin_registered_schedulers: set = set() + + +def resolve_local_staging(client) -> str: + """Derive node-local SST scratch from each Dask worker's own scratch. + + Workers share the path *string* (e.g. ``/scratch/$USER``) but each resolves + it to its own node-local storage; falls back to ``/tmp`` when nothing is + reported. + """ + workers = client.scheduler_info().get("workers", {}) or {} + if workers: + worker_local_dir = next(iter(workers.values())).get("local_directory") or "/tmp" + else: + worker_local_dir = "/tmp" + return os.path.join(worker_local_dir, "dftracer-sst-staging") + + +def register_auto_thread_plugin() -> None: + """Register the DFTracer worker plugin on the active distributed client. + + Sizes each worker's C++ Runtime threads as hardware_concurrency / + n_workers_on_node so the Runtime uses all cores without oversubscription. + + Idempotent: re-registering the same plugin on the same scheduler triggers a + teardown+setup round-trip on every worker, which deadlocks if the previous + Runtime still has in-flight coroutines. Skips if already registered for the + scheduler address. A no-op when no distributed client is active. + """ + if DFTracerUtilsDaskWorkerPlugin is None: + return + try: + import logging + import time + from collections import Counter + + client = get_client() # ty: ignore[call-non-callable] + sched_addr = getattr(client.scheduler, "address", None) or "" + if sched_addr in _plugin_registered_schedulers: + return + + def _addr_to_host(addr: str) -> str: + return addr.split("://")[-1].rsplit(":", 1)[0] + + nthreads = client.nthreads() + for _ in range(10): + nthreads_next = client.nthreads() + if len(nthreads_next) >= len(nthreads): + nthreads = nthreads_next + if len(nthreads) > 0: + break + time.sleep(0.5) + host_counts = Counter(_addr_to_host(a) for a in nthreads.keys()) + + logging.getLogger("dftracer.dask_plugin").info( + "coord register_plugin: host_counts=%s total_workers=%d worker_addr_sample=%s", + dict(host_counts), + sum(host_counts.values()), + list(nthreads.keys())[:8], + ) + + class _AutoThreadPlugin(DFTracerUtilsDaskWorkerPlugin): + def __init__(self, host_worker_counts): + super().__init__(threads=0) + self._host_worker_counts = host_worker_counts + + def setup(self, worker): + total_cpus = ( + len(os.sched_getaffinity(0)) + if hasattr(os, "sched_getaffinity") + else os.cpu_count() or 1 + ) + my_host = worker.address.split("://")[-1].rsplit(":", 1)[0] + n_local = self._host_worker_counts.get(my_host, 1) + self.threads = max(1, total_cpus // n_local) + logging.getLogger("distributed.worker").info( + "DFTracer Runtime: host=%s cpus=%d workers_on_host=%d cpp_threads=%d dict_keys=%s", + my_host, + total_cpus, + n_local, + self.threads, + list(self._host_worker_counts.keys()), + ) + super().setup(worker) + + client.register_plugin(_AutoThreadPlugin(dict(host_counts))) + _plugin_registered_schedulers.add(sched_addr) + logging.getLogger("dftracer.dask_plugin").info( + "Registered DFTracerUtilsDaskWorkerPlugin host_worker_counts=%s total_workers=%d", + dict(host_counts), + sum(host_counts.values()), + ) + except (ValueError, ImportError): + pass + def _write_arrow_task( file_path: str, diff --git a/python/dftracer/utils/dfanalyzer.py b/python/dftracer/utils/dfanalyzer.py new file mode 100644 index 00000000..81d03c56 --- /dev/null +++ b/python/dftracer/utils/dfanalyzer.py @@ -0,0 +1,614 @@ +"""Helpers bridging the C++ aggregation index to dfanalyzer. + +These were previously vendored inside dfanalyzer; they belong here since they +only depend on the Indexer and Arrow plumbing. +""" + +from __future__ import annotations + +import glob +import json +import os +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.compute as pc + +from .dask import distributed_index, register_auto_thread_plugin, resolve_local_staging +from .indexer import AggregationConfig, Indexer + +try: + from dask.distributed import get_client +except ImportError: + get_client = None # ty: ignore[invalid-assignment] + +__all__ = [ + "batches_to_ipc", + "build_final_meta", + "build_index_distributed", + "build_partial_meta", + "coerce_arrow_numerics_to_pandas_native", + "coerce_profile_dtypes", + "distributed_hlm", + "ensure_index", + "finalize_view_partials", + "index_path_for", + "ipc_to_pandas", + "make_empty_hlm", + "normalize_arrow_dtypes", + "partial_arrow_view_groupby", + "resolve_trace_inputs", + "scan_to_ipc", + "worker_hlm_partial", +] + +_TRACE_SUFFIXES = (".pfw", ".pfw.gz") + + +def ipc_to_pandas(ipc_bytes: bytes): + """Decode Arrow IPC bytes to pandas, casting dictionary columns to string.""" + reader = pa.ipc.open_stream(pa.BufferReader(ipc_bytes)) + table = reader.read_all() + for i, field in enumerate(table.schema): + if pa.types.is_dictionary(field.type): + table = table.set_column(i, field.name, table.column(i).cast(pa.string())) + return table.to_pandas() + + +def batches_to_ipc(batches_by_type: Dict[str, Any]) -> Dict[str, Optional[bytes]]: + """Convert {type: [capsule, ...]} from the C extension into {type: IPC bytes}.""" + result: Dict[str, Optional[bytes]] = {} + for data_type in ("events", "profiles", "system"): + batches = [pa.record_batch(b) for b in batches_by_type.get(data_type, [])] + if batches: + sink = pa.BufferOutputStream() + writer = pa.ipc.new_stream(sink, batches[0].schema) + for batch in batches: + writer.write_batch(batch) + writer.close() + result[data_type] = sink.getvalue().to_pybytes() + else: + result[data_type] = None + return result + + +def scan_to_ipc(files, index_path, time_granularity, time_resolution, query): + """Dask worker task: full-scan the aggregation CF for `files`, return IPC bytes.""" + indexer = Indexer( + files=files, + index_dir=os.path.dirname(index_path) if index_path else "", + require_checkpoint=False, + require_bloom=False, + require_manifest=False, + require_aggregation=False, + force_rebuild=False, + ) + all_batches = indexer.iter_arrow_dfanalyzer_all( + time_granularity=time_granularity, + time_resolution=time_resolution, + query=query, + ) + return batches_to_ipc(all_batches) + + +def resolve_trace_inputs( + trace_path: str, + trace_groups: Optional[List[str]], +) -> Tuple[str, Optional[List[str]]]: + """Resolve a trace path into (directory, files) for the Indexer. + + If trace_path is a directory containing manifest.json (dftracer_organize + output) and trace_groups is set, glob only the subdirs for the requested + groups. Otherwise return (directory, None) or ("", files). + """ + if not os.path.isdir(trace_path): + matched = glob.glob(trace_path) if "*" in trace_path else [trace_path] + files = [f for f in matched if f.endswith(_TRACE_SUFFIXES)] + return "", files + + manifest_path = os.path.join(trace_path, "manifest.json") + if not os.path.isfile(manifest_path): + if trace_groups: + raise FileNotFoundError( + f"trace_groups={trace_groups} requested but no manifest.json at " + f"{manifest_path}. Run dftracer_organize to produce it, or unset " + "trace_groups." + ) + return trace_path, None + + with open(manifest_path, "r") as f: + manifest = json.load(f) + group_map = manifest.get("groups") or {} + + selected = trace_groups if trace_groups else sorted(group_map.keys()) + missing = [g for g in selected if g not in group_map] + if missing: + raise KeyError( + f"trace_groups {missing} not found in manifest at {manifest_path}; " + f"available groups: {sorted(group_map.keys())}" + ) + + files: List[str] = [] + for g in selected: + subdir = os.path.join(trace_path, group_map[g]) + files.extend(glob.glob(os.path.join(subdir, "*.pfw.gz"))) + files.extend(glob.glob(os.path.join(subdir, "*.pfw"))) + return "", files + + +def make_empty_hlm(hlm_groupby, hlm_agg, bin_cols, int_index_cols, float_metric_cols): + """Empty DataFrame matching the HLM meta schema. + + `int_index_cols` are groupby columns typed as Int64 (others are string); + `float_metric_cols` are metric columns typed as Float64 (others Int64). + """ + bin_set = set(bin_cols) + int_index_cols = set(int_index_cols) + float_metric_cols = set(float_metric_cols) + data_cols = {} + for col in hlm_agg: + if col in hlm_groupby or col in bin_set: + continue + dtype = "Float64" if col in float_metric_cols else "Int64" + data_cols[col] = pd.Series(dtype=dtype) + meta = pd.DataFrame(data_cols) + idx_arrays = [] + for col in hlm_groupby: + dtype = "Int64" if col in int_index_cols else "string" + idx_arrays.append(pd.array([], dtype=dtype)) + if idx_arrays: + meta.index = pd.MultiIndex.from_arrays(idx_arrays, names=list(hlm_groupby)) + return meta + + +def worker_hlm_partial( + ipc_result, data_type, hlm_groupby, hlm_agg, bin_cols, int_index_cols, float_metric_cols +): + """Per-worker partial HLM from already-resident IPC bytes. + + Workers own disjoint PID sets and proc_name is always in hlm_groupby, so + per-worker partials have disjoint keys and need no cross-worker merge. + """ + empty = lambda: make_empty_hlm( # noqa: E731 + hlm_groupby, hlm_agg, bin_cols, int_index_cols, float_metric_cols + ) + + ipc_bytes = ipc_result[data_type] if isinstance(ipc_result, dict) else None + if ipc_bytes is None: + return empty() + reader = pa.ipc.open_stream(pa.BufferReader(ipc_bytes)) + table = reader.read_all() + if table.num_rows == 0: + return empty() + + for i, field in enumerate(table.schema): + if pa.types.is_dictionary(field.type): + table = table.set_column(i, field.name, table.column(i).cast(pa.string())) + + time_col = table.column("time") + size_col = table.column("size") + table = table.append_column("time_sq", pc.multiply(time_col, time_col)) # ty: ignore[unresolved-attribute] + size_filled = pc.if_else(pc.is_null(size_col), pa.scalar(0, pa.int64()), size_col) # ty: ignore[unresolved-attribute] + table = table.append_column("size_sq", pc.multiply(size_filled, size_filled)) # ty: ignore[unresolved-attribute] + table = table.append_column("time_call_min", time_col) + table = table.append_column("time_call_max", time_col) + table = table.append_column("size_call_min", size_col) + table = table.append_column("size_call_max", size_col) + + available_groupby = [c for c in hlm_groupby if c in table.column_names] + if not available_groupby: + return empty() + + agg_specs = [] + for col, agg_fn in hlm_agg.items(): + if col in table.column_names and agg_fn in ("sum", "min", "max"): + agg_specs.append((col, agg_fn)) + + result = table.group_by(available_groupby).aggregate(agg_specs) + + rename = {f"{col}_{agg_fn}": col for col, agg_fn in agg_specs} + result = result.rename_columns([rename.get(c, c) for c in result.column_names]) + + cat_idx = result.schema.get_field_index("cat") + if cat_idx >= 0: + cat_col = result.column(cat_idx) + if pa.types.is_string(cat_col.type) or pa.types.is_large_string(cat_col.type): + result = result.set_column(cat_idx, "cat", pc.utf8_lower(cat_col)) # ty: ignore[unresolved-attribute] + + groupby_set = set(available_groupby) + for i, field in enumerate(result.schema): + if field.name in groupby_set: + continue + t_ = field.type + if pa.types.is_integer(t_) or pa.types.is_floating(t_): + col = result.column(i) + zero = pa.scalar(0 if pa.types.is_integer(t_) else 0.0, t_) + null = pa.scalar(None, t_) + result = result.set_column(i, field.name, pc.if_else(pc.equal(col, zero), null, col)) # ty: ignore[unresolved-attribute] + + # Materialize to pandas with native nullable dtypes. ArrowDtype numeric + # columns trip a pandas masked-arithmetic bug in downstream metrics.py. + pdf = result.to_pandas(types_mapper=pd.ArrowDtype) + for c in pdf.columns: + if c in available_groupby: + continue + dt = pdf[c].dtype + if isinstance(dt, pd.ArrowDtype): + pa_type = dt.pyarrow_dtype + if pa.types.is_floating(pa_type): + pdf[c] = pdf[c].astype("Float64") + elif pa.types.is_integer(pa_type): + pdf[c] = pdf[c].astype("Int64") + return pdf.set_index(available_groupby) + + +def partial_arrow_view_groupby( + df, + view_type, + full_cols, + sum_cols, + min_cols, + max_cols, + set_cols_items, + flatten_fn, +): + """Per-partition Arrow groupby emitting mergeable partial aggregates. + + `flatten_fn` flattens a grouped set-column series when its aggregation + object exposes no `chunk` method. + """ + view_type_in_index = (isinstance(df.index, pd.MultiIndex) and view_type in df.index.names) or ( + df.index.name == view_type + ) + work = df.reset_index() if view_type_in_index else df + if work.empty: + # Derive dtypes from the input columns so an empty partition matches + # the meta declared by the caller (which uses the same rule). + def _col_dtype(col, default=pd.ArrowDtype(pa.float64())): + if col in work.columns: + return work[col].dtype + return default + + empty_cols = {} + for c in full_cols: + empty_cols[f"{c}_sum"] = pd.Series(dtype=_col_dtype(c)) + empty_cols[f"{c}_count"] = pd.Series(dtype=pd.ArrowDtype(pa.int64())) + empty_cols[f"{c}_min"] = pd.Series(dtype=_col_dtype(c)) + empty_cols[f"{c}_max"] = pd.Series(dtype=_col_dtype(c)) + empty_cols[f"{c}_sumsq"] = pd.Series(dtype=pd.ArrowDtype(pa.float64())) + for c in sum_cols: + empty_cols[f"{c}_sum"] = pd.Series(dtype=_col_dtype(c)) + for c in min_cols: + empty_cols[f"{c}_min"] = pd.Series(dtype=_col_dtype(c)) + for c in max_cols: + empty_cols[f"{c}_max"] = pd.Series(dtype=_col_dtype(c)) + for c, _ in set_cols_items: + empty_cols[f"{c}_unique"] = pd.Series(dtype="object") + out = pd.DataFrame(empty_cols) + out.index = pd.Index( + [], + name=view_type, + dtype=_col_dtype(view_type, default=pd.ArrowDtype(pa.int64())), + ) + return out + + arrow_keep = [view_type] + for lst in (full_cols, sum_cols, min_cols, max_cols): + for c in lst: + if c in work.columns and c not in arrow_keep: + arrow_keep.append(c) + tbl = pa.Table.from_pandas(work[arrow_keep], preserve_index=False) + + agg_specs = [] + for c in full_cols: + if c not in tbl.schema.names: + continue + col_arr = pc.cast(tbl.column(c), pa.float64()) + tbl = tbl.append_column(f"{c}__sq", pc.multiply(col_arr, col_arr)) # ty: ignore[unresolved-attribute] + agg_specs += [ + (c, "sum"), + (c, "count"), + (c, "min"), + (c, "max"), + (f"{c}__sq", "sum"), + ] + for c in sum_cols: + if c in tbl.schema.names: + agg_specs.append((c, "sum")) + for c in min_cols: + if c in tbl.schema.names: + agg_specs.append((c, "min")) + for c in max_cols: + if c in tbl.schema.names: + agg_specs.append((c, "max")) + + if agg_specs: + result = tbl.group_by([view_type]).aggregate(agg_specs) + out = result.to_pandas(types_mapper=pd.ArrowDtype) + rename = {f"{c}__sq_sum": f"{c}_sumsq" for c in full_cols} + if rename: + out = out.rename(columns=rename) + out = out.set_index(view_type) + else: + uniq = work[view_type].drop_duplicates().reset_index(drop=True) + out = pd.DataFrame(index=pd.Index(uniq, name=view_type)) + + for col, agg in set_cols_items: + if col not in work.columns: + continue + sgb = work.groupby(view_type)[col] + chunk_fn = getattr(agg, "chunk", None) + partial = chunk_fn(sgb) if chunk_fn is not None else sgb.apply(flatten_fn) + partial.name = f"{col}_unique" + out = out.join(partial, how="left") + return out + + +def finalize_view_partials(df, full_cols): + """Compute mean/std per view_type row from merged partials; drop helper cols.""" + if df.empty: + return df + out = df.copy() + drop = [] + for c in full_cols: + sum_c = f"{c}_sum" + count_c = f"{c}_count" + sq_c = f"{c}_sumsq" + if sum_c not in out.columns or count_c not in out.columns: + continue + s = out[sum_c].astype("float64") + n = out[count_c].astype("float64") + mean_v = s / n + out[f"{c}_mean"] = mean_v.astype(pd.ArrowDtype(pa.float64())) + if sq_c in out.columns: + sq = out[sq_c].astype("float64") + # sample variance is undefined for n <= 1 -> std is NaN, matching + # pandas .std(ddof=1); avoids a divide-by-zero on (n - 1). + with np.errstate(invalid="ignore", divide="ignore"): + var_v = (sq - (s * s) / n) / (n - 1) + var_v = var_v.where(n > 1, np.nan) + var_v = var_v.where(var_v.isna() | (var_v >= 0), 0) + out[f"{c}_std"] = np.sqrt(var_v).astype(pd.ArrowDtype(pa.float64())) + drop.append(sq_c) + drop.append(count_c) + if drop: + out = out.drop(columns=drop) + return out + + +def build_partial_meta(records, view_type, full_cols, sum_cols, min_cols, max_cols, set_cols_items): + """Dask meta for the output of `partial_arrow_view_groupby`.""" + in_meta = records._meta + + def _dtype_of(col, default=pd.ArrowDtype(pa.float64())): + if col in in_meta.columns: + return in_meta[col].dtype + if isinstance(in_meta.index, pd.MultiIndex) and col in in_meta.index.names: + return in_meta.index.get_level_values(col).dtype + return default + + # Column order must exactly match what Arrow's group_by+aggregate emits. + cols = {} + for c in full_cols: + cols[f"{c}_sum"] = _dtype_of(c) + cols[f"{c}_count"] = pd.ArrowDtype(pa.int64()) + cols[f"{c}_min"] = _dtype_of(c) + cols[f"{c}_max"] = _dtype_of(c) + cols[f"{c}_sumsq"] = pd.ArrowDtype(pa.float64()) + for c in sum_cols: + cols[f"{c}_sum"] = _dtype_of(c) + for c in min_cols: + cols[f"{c}_min"] = _dtype_of(c) + for c in max_cols: + cols[f"{c}_max"] = _dtype_of(c) + for c, _ in set_cols_items: + cols[f"{c}_unique"] = "object" + + meta = pd.DataFrame({name: pd.Series(dtype=dt) for name, dt in cols.items()}) + idx_dtype = _dtype_of(view_type, default=pd.ArrowDtype(pa.int64())) + meta.index = pd.Index([], name=view_type, dtype=idx_dtype) + return meta + + +def build_final_meta(merged, full_cols): + """Dask meta for the output of `finalize_view_partials`.""" + cols = {} + for c in merged.columns: + if c.endswith("_count") and c[: -len("_count")] in full_cols: + continue + if c.endswith("_sumsq") and c[: -len("_sumsq")] in full_cols: + continue + cols[c] = merged._meta[c].dtype + for c in full_cols: + cols[f"{c}_mean"] = pd.ArrowDtype(pa.float64()) + cols[f"{c}_std"] = pd.ArrowDtype(pa.float64()) + meta = pd.DataFrame({name: pd.Series(dtype=dt) for name, dt in cols.items()}) + meta.index = pd.Index([], name=merged._meta.index.name, dtype=merged._meta.index.dtype) + return meta + + +def normalize_arrow_dtypes(df): + """Demote Arrow-backed category columns to object for downstream pandas ops.""" + for col in df.select_dtypes(include=["category"]).columns: + df[col] = df[col].astype("object") + return df + + +def index_path_for(trace_path: str) -> str: + """Convention: the dftracer index lives next to the traces as `.dftindex`. + + For a directory that's `/.dftindex`; for a file or glob it is + `/.dftindex` of the file (or first match). + """ + if os.path.isdir(trace_path): + return os.path.join(trace_path, ".dftindex") + if "*" in trace_path: + matches = sorted(glob.glob(trace_path)) + if matches: + return os.path.join(os.path.dirname(matches[0]), ".dftindex") + return os.path.join(os.path.dirname(trace_path) or ".", ".dftindex") + + +def coerce_arrow_numerics_to_pandas_native(df): + """Map pd.ArrowDtype int/float columns to pandas Int64/Float64.""" + if df.empty: + return df + for c in df.columns: + dt = df[c].dtype + if isinstance(dt, pd.ArrowDtype): + pa_type = dt.pyarrow_dtype + if pa.types.is_floating(pa_type): + df[c] = df[c].astype("Float64") + elif pa.types.is_integer(pa_type): + df[c] = df[c].astype("Int64") + return df + + +def coerce_profile_dtypes(df, output_columns, profile_window=None): + """Normalize C++ aggregator profile output to the `output_columns` schema. + + `output_columns` maps column name -> pandas dtype. When `profile_window` is + given, `time_end` is derived as `time_start + profile_window`. + """ + if df.empty: + return df + df = df.copy() + for col, dtype in output_columns.items(): + if col not in df.columns: + df[col] = pd.Series(pd.NA, index=df.index, dtype=dtype) + elif dtype == "string": + df[col] = df[col].astype("string").replace("", pd.NA) + else: + df[col] = df[col].astype(dtype) + if profile_window is not None: + df["time_end"] = df["time_start"] + int(profile_window) + return df + + +def build_index_distributed( + directory="", + files=None, + index_path="", + local_staging="", + shared_staging="", + client=None, + aggregation=None, +): + """Build the dftracer index across a Dask cluster. + + Workers build per-CF SSTs under `local_staging`, move them to + `shared_staging` for the coordinator to bulk-ingest. When `aggregation` is + given, the AGGREGATION + SYSTEM_METRICS tiers are filled in parallel. + + If `client` is None the active Dask client is looked up; if none exists + (or `dask.distributed` is not installed), tasks run inline serially. + """ + if client is None and get_client is not None: + try: + client = get_client() + except ValueError: + client = None + if client is not None: + register_auto_thread_plugin() + return distributed_index( + directory=directory, + files=files, + index_path=index_path, + local_staging=local_staging, + shared_staging=shared_staging, + client=client, + aggregation_config=aggregation, + ) + + +def ensure_index(trace_path, trace_groups, time_interval_ms, client=None): + """Build (or refresh) the dftracer index for `trace_path` via Dask. + + Idempotent: dftracer-utils skips files whose tiers already exist, so repeat + calls on the same path are cheap no-ops. With no active Dask client (or no + `dask.distributed`), the build runs inline serially. + """ + if client is None and get_client is not None: + try: + client = get_client() + except ValueError: + client = None + directory, files = resolve_trace_inputs(trace_path, trace_groups) + if not directory and not files: + return + index_path = index_path_for(trace_path) + local_staging = ( + resolve_local_staging(client) if client is not None else os.path.dirname(index_path) + ) + build_index_distributed( + directory=directory, + files=files, + index_path=index_path, + local_staging=local_staging, + shared_staging=os.path.dirname(index_path), + client=client, + aggregation=AggregationConfig(time_interval_ms=time_interval_ms), + ) + + +def distributed_hlm( + data_type, + view_types, + traces, + worker_ipc_futures, + worker_scan_args, + dask_client, + hlm_agg_base, + hlm_extra_cols, + int_index_cols, + float_metric_cols, +): + """Distributed high-level-metrics aggregation over per-worker IPC bytes. + + Submits one `worker_hlm_partial` task per worker, pinned to the worker that + already holds the IPC bytes, and assembles a Dask DataFrame. Returns None + when no worker IPC futures exist. + """ + import dask + import dask.dataframe as dd + + if not worker_ipc_futures: + return None + + hlm_groupby = list(dict.fromkeys(list(view_types) + list(hlm_extra_cols))) + bin_cols = [col for col in traces.columns if "_bin_" in col] + + hlm_agg = dict(hlm_agg_base) + hlm_agg.update({col: "sum" for col in bin_cols}) + hlm_agg["time_sq"] = "sum" + hlm_agg["size_sq"] = "sum" + hlm_agg["time_call_min"] = "min" + hlm_agg["time_call_max"] = "max" + hlm_agg["size_call_min"] = "min" + hlm_agg["size_call_max"] = "max" + + worker_addrs = [a for (a, _, _) in (worker_scan_args or [])] + if len(worker_addrs) < len(worker_ipc_futures): + worker_addrs += [None] * (len(worker_ipc_futures) - len(worker_addrs)) + + partial_futures = [] + for addr, ipc_future in zip(worker_addrs, worker_ipc_futures): + fut = dask_client.submit( + worker_hlm_partial, + ipc_future, + data_type, + list(hlm_groupby), + dict(hlm_agg), + list(bin_cols), + int_index_cols, + float_metric_cols, + workers=[addr] if addr else None, + pure=False, + ) + partial_futures.append(fut) + + partial_delayed = [dask.delayed(f) for f in partial_futures] + meta = make_empty_hlm(hlm_groupby, hlm_agg, bin_cols, int_index_cols, float_metric_cols) + return dd.from_delayed(partial_delayed, meta=meta) diff --git a/python/dftracer/utils/indexer.py b/python/dftracer/utils/indexer.py index c36754f6..7b37a49f 100644 --- a/python/dftracer/utils/indexer.py +++ b/python/dftracer/utils/indexer.py @@ -38,12 +38,15 @@ class IndexStatus: ready: Files that are fully indexed for requested tiers. needs_work: Files that need indexing. index_path: Path to the .dftindex store. + aggregation_interval_us: Time interval (us) of the cached aggregation + tier, or 0 if none. """ total_files: int ready: List[str] = field(default_factory=list) needs_work: List[str] = field(default_factory=list) index_path: str = "" + aggregation_interval_us: int = 0 class Indexer: @@ -157,6 +160,7 @@ def resolve(self) -> IndexStatus: ready=result["ready"], needs_work=result["needs_work"], index_path=result.get("index_path", ""), + aggregation_interval_us=result.get("aggregation_interval_us", 0), ) def build(self) -> None: @@ -181,6 +185,7 @@ def ensure_indexed(self) -> IndexStatus: ready=result["ready"], needs_work=result["needs_work"], index_path=result.get("index_path", ""), + aggregation_interval_us=result.get("aggregation_interval_us", 0), ) def get_checkpoint_indexer(self, file_path: str) -> _NativeCheckpointIndexer: diff --git a/src/dftracer/utils/python/batch_indexer.cpp b/src/dftracer/utils/python/batch_indexer.cpp index 21ca318b..d1100245 100644 --- a/src/dftracer/utils/python/batch_indexer.cpp +++ b/src/dftracer/utils/python/batch_indexer.cpp @@ -290,6 +290,11 @@ static PyObject* Indexer_resolve(IndexerObject* self, PyLong_FromSize_t(result.all_files.size())); PyDict_SetItemString(dict, "index_path", PyUnicode_FromString(result.index_path.c_str())); + PyDict_SetItemString( + dict, "aggregation_interval_us", + PyLong_FromUnsignedLongLong(result.stored_time_interval_us)); + PyDict_SetItemString(dict, "needs_rebuild", + PyBool_FromLong(result.needs_augmentation)); // Ready files PyObject* ready_list = PyList_New(result.cached.size()); @@ -413,9 +418,13 @@ static PyObject* Indexer_ensure_indexed(IndexerObject* self, PyObject* status = Indexer_resolve(self, nullptr); if (!status) return nullptr; - // Check if needs_work is non-empty + // Build if files need work, or the aggregation tier must be rebuilt + // (stored time interval differs from the requested one). PyObject* needs_work = PyDict_GetItemString(status, "needs_work"); - if (needs_work && PyList_Size(needs_work) > 0) { + PyObject* needs_rebuild = PyDict_GetItemString(status, "needs_rebuild"); + bool work_pending = needs_work && PyList_Size(needs_work) > 0; + bool rebuild_pending = needs_rebuild && PyObject_IsTrue(needs_rebuild); + if (work_pending || rebuild_pending) { Py_DECREF(status); // Build diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp index ea58ab3d..47025752 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_logic.cpp @@ -49,7 +49,9 @@ AggregationKey build_aggregation_key(const DFTracerEvent& ev, auto& intern = aggregation_intern(); AggregationKey key; - key.cat_id = intern.get_or_insert(ev.cat); + std::string cat_storage; + key.cat_id = + intern.get_or_insert(internal::to_lower_ascii(ev.cat, cat_storage)); key.name_id = intern.get_or_insert(ev.name); key.pid = ev.pid; key.tid = ev.tid; diff --git a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp index b142d82d..c9f5c376 100644 --- a/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp +++ b/src/dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.cpp @@ -167,7 +167,9 @@ void AggregationVisitor::on_event(const EventRecord& record) { if (!extra_keys_vec.empty()) extra_ptr = &extra_keys_vec; } - serialize_agg_key_into(key_buf_, config_hash_, map_type, ev.cat, ev.name, + std::string cat_storage; + std::string_view cat_lower = internal::to_lower_ascii(ev.cat, cat_storage); + serialize_agg_key_into(key_buf_, config_hash_, map_type, cat_lower, ev.name, ev.pid, ev.tid, hhash, fhash, time_bucket, extra_ptr); diff --git a/src/dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.cpp b/src/dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.cpp index 2cad9e6b..30a1484d 100644 --- a/src/dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.cpp +++ b/src/dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.cpp @@ -307,6 +307,8 @@ coro::CoroTask IndexResolverUtility::process( // Merge augmentation info (all groups should have same global config) if (out.needs_augmentation) { result.needs_augmentation = true; + } + if (out.stored_time_interval_us != 0) { result.stored_time_interval_us = out.stored_time_interval_us; } } diff --git a/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp b/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp index 09f0bf9e..274c32e6 100644 --- a/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp +++ b/src/dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -5,12 +6,15 @@ #include #include #include +#include #include #include #include #include #include +#include +#include namespace dftracer::utils::utilities::composites::dft::indexing { @@ -47,6 +51,33 @@ coro::CoroTask resolve_and_build_index( co_return result; } + // A cached aggregation tier at a different interval can't be refined in + // place; discard every affected index root and rebuild. + if (result.needs_augmentation && !input.force_rebuild) { + std::set index_roots; + for (const auto& file : result.all_files) { + index_roots.insert( + internal::determine_index_path(file, input.index_dir)); + } + for (const auto& root : index_roots) { + DFTRACER_UTILS_LOG_INFO( + "Aggregation interval changed (index built at %llu us); " + "rebuilding %s", + static_cast(result.stored_time_interval_us), + root.c_str()); + std::error_code ec; + fs::remove_all(root, ec); + if (ec) { + throw std::runtime_error("failed to remove stale index " + + root + ": " + ec.message()); + } + } + result = co_await resolver.process(resolve_input); + if (result.all_files.empty()) { + co_return result; + } + } + // Collect files that need work (checkpoint or aggregation) // When force_rebuild is set, process all files std::vector files_needing_work; diff --git a/src/dftracer/utils/utilities/composites/dft/internal/utils.cpp b/src/dftracer/utils/utilities/composites/dft/internal/utils.cpp index c5865387..c19cd6cf 100644 --- a/src/dftracer/utils/utilities/composites/dft/internal/utils.cpp +++ b/src/dftracer/utils/utilities/composites/dft/internal/utils.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -24,8 +25,37 @@ std::string determine_provenance_index_path(const std::string& data_path, return determine_index_path(data_path, index_dir); } +std::string_view to_lower_ascii(std::string_view s, std::string& storage) { + bool has_upper = false; + for (char c : s) { + if (c >= 'A' && c <= 'Z') { + has_upper = true; + break; + } + } + if (!has_upper) return s; + storage.assign(s); + for (auto& c : storage) { + c = static_cast(std::tolower(static_cast(c))); + } + return storage; +} + +bool ascii_iequals(std::string_view a, std::string_view b) { + if (a.size() != b.size()) return false; + for (std::size_t i = 0; i < a.size(); ++i) { + if (std::tolower(static_cast(a[i])) != + std::tolower(static_cast(b[i]))) { + return false; + } + } + return true; +} + bool is_data_transfer_op(std::string_view cat, std::string_view name) { - if (cat != "POSIX" && cat != "STDIO") return false; + if (!ascii_iequals(cat, "posix") && !ascii_iequals(cat, "stdio")) { + return false; + } static const std::unordered_set OPS = { "read", "write", "pread", "pwrite", "pread64", "pwrite64", "readv", "writev", "preadv", "pwritev", diff --git a/tests/python/test_aggregator.py b/tests/python/test_aggregator.py index 3e3a957e..c3668157 100644 --- a/tests/python/test_aggregator.py +++ b/tests/python/test_aggregator.py @@ -145,13 +145,13 @@ def test_process_aggregates_profile_and_system_counters(self): rows = self._rows_by_key(result) assert len(rows) == 3 - event = rows[(0, "POSIX", "read")] + event = rows[(0, "posix", "read")] assert event["count"] == 1 assert event["dur_total"] == 50 assert event["size_total"] == 64 assert event["bytes_total"] == 64 - profile = rows[(1, "PROFILE", "cpu_usage")] + profile = rows[(1, "profile", "cpu_usage")] assert profile["count"] == 4 assert profile["dur_total"] == 80 assert profile["dur_min"] == 10 @@ -193,8 +193,8 @@ def test_iter_arrow_emits_separate_event_profile_and_system_batches(self): rows = self._rows_by_key(result) assert set(rows) == { - (0, "POSIX", "read"), - (1, "PROFILE", "cpu_usage"), + (0, "posix", "read"), + (1, "profile", "cpu_usage"), (2, "sys", "mem_bw"), } @@ -359,7 +359,7 @@ def test_process_with_query_filter(self): env.temp_dir, index_dir=env.temp_dir, force_rebuild=True, - query='cat == "POSIX"', + query='cat == "posix"', ) pa = pytest.importorskip("pyarrow") @@ -368,7 +368,7 @@ def test_process_with_query_filter(self): # Should only have POSIX entries assert len(rows) == 2 - assert all(row["cat"] == "POSIX" for row in rows) + assert all(row["cat"] == "posix" for row in rows) def test_iter_arrow_with_query_filter(self): """Query parameter filters streaming results.""" @@ -393,7 +393,7 @@ def test_iter_arrow_with_query_filter(self): env.temp_dir, index_dir=env.temp_dir, force_rebuild=True, - query='cat == "APP"', + query='cat == "app"', ) ) @@ -403,7 +403,7 @@ def test_iter_arrow_with_query_filter(self): # Should only have APP entries assert len(rows) == 1 - assert rows[0]["cat"] == "APP" + assert rows[0]["cat"] == "app" def test_write_arrow_creates_files(self): """write_arrow creates Arrow IPC files.""" @@ -455,8 +455,8 @@ def test_write_arrow_with_views(self): index_dir=env.temp_dir, force_rebuild=True, views=[ - {"name": "io", "query": 'cat == "POSIX"'}, - {"name": "compute", "query": 'cat == "APP"'}, + {"name": "io", "query": 'cat == "posix"'}, + {"name": "compute", "query": 'cat == "app"'}, ], ) @@ -477,7 +477,7 @@ def test_write_arrow_with_views(self): assert len(io_files) > 0 with ipc.open_file(str(io_files[0])) as f: table = pa.Table.from_batches([f.get_batch(i) for i in range(f.num_record_batches)]) - assert all(row["cat"] == "POSIX" for row in table.to_pylist()) + assert all(row["cat"] == "posix" for row in table.to_pylist()) def test_write_arrow_compression(self): """write_arrow respects compression setting.""" diff --git a/tests/python/test_indexer.py b/tests/python/test_indexer.py index 9951a636..0fdb90f5 100644 --- a/tests/python/test_indexer.py +++ b/tests/python/test_indexer.py @@ -378,6 +378,35 @@ def test_index_status_dataclass(self): assert len(status.needs_work) == 1 assert status.index_path == "/tmp/index" + def test_resolve_reports_aggregation_interval(self): + """resolve() surfaces the time interval of the cached aggregation tier.""" + with Environment() as env: + directory = env.create_indexed_traces(pids=[1]) + with dft_utils.Indexer( + directory=directory, + require_aggregation=dft_utils.AggregationConfig(time_interval_ms=5000), + ) as indexer: + indexer.ensure_indexed() + assert indexer.resolve().aggregation_interval_us == 5_000_000 + + def test_ensure_indexed_rebuilds_on_interval_change(self): + """A new interval discards the stale aggregation tier and rebuilds it.""" + pa = pytest.importorskip("pyarrow") + with Environment() as env: + directory = env.create_indexed_traces(pids=[1, 2]) + with dft_utils.Indexer( + directory=directory, + require_aggregation=dft_utils.AggregationConfig(time_interval_ms=1000), + ) as indexer: + indexer.ensure_indexed() + status = indexer.resolve() + assert status.aggregation_interval_us == 1_000_000 + rows = sum( + pa.record_batch(b).num_rows + for b in indexer.iter_arrow_dfanalyzer_all().get("events", []) + ) + assert rows > 0 + def test_aggregation_config_dataclass(self): """Test AggregationConfig dataclass""" config = dft_utils.AggregationConfig( @@ -716,7 +745,7 @@ def test_iter_arrow_dfanalyzer_all_string_filter(self): directory = env.create_indexed_traces(pids=[1]) with self._make_indexer(directory) as indexer: indexer.ensure_indexed() - result = indexer.iter_arrow_dfanalyzer_all(query='cat == "POSIX"') + result = indexer.iter_arrow_dfanalyzer_all(query='cat == "posix"') rows = sum(pa.record_batch(b).num_rows for b in result.get("events", [])) assert rows > 0 diff --git a/tests/utilities/composites/dft/aggregators/test_aggregator_utility.cpp b/tests/utilities/composites/dft/aggregators/test_aggregator_utility.cpp index e6df7b44..26301b1c 100644 --- a/tests/utilities/composites/dft/aggregators/test_aggregator_utility.cpp +++ b/tests/utilities/composites/dft/aggregators/test_aggregator_utility.cpp @@ -90,14 +90,14 @@ TEST_SUITE("AggregatorUtility") { CHECK(system_batch->entries.size() == 1); const auto& event_entry = event_batch->entries.front(); - CHECK(event_entry.key.cat() == "POSIX"); + CHECK(event_entry.key.cat() == "posix"); CHECK(event_entry.key.name() == "read"); CHECK(event_entry.metrics.count == 1); CHECK(event_entry.metrics.duration.total == 50); CHECK(event_entry.metrics.size.total == 64); const auto& profile_entry = profile_batch->entries.front(); - CHECK(profile_entry.key.cat() == "PROFILE"); + CHECK(profile_entry.key.cat() == "profile"); CHECK(profile_entry.key.name() == "cpu_usage"); CHECK(profile_entry.metrics.count == 4); CHECK(profile_entry.metrics.duration.total == 80); diff --git a/tests/utilities/composites/dft/aggregators/test_chunk_aggregator_utility.cpp b/tests/utilities/composites/dft/aggregators/test_chunk_aggregator_utility.cpp index 1eb54fc3..9beea331 100644 --- a/tests/utilities/composites/dft/aggregators/test_chunk_aggregator_utility.cpp +++ b/tests/utilities/composites/dft/aggregators/test_chunk_aggregator_utility.cpp @@ -56,7 +56,7 @@ TEST_SUITE("ChunkAggregatorUtility") { REQUIRE(output.aggregations.size() == 1); const auto& [event_key, event_metrics] = *output.aggregations.begin(); - CHECK(event_key.cat() == "POSIX"); + CHECK(event_key.cat() == "posix"); CHECK(event_key.name() == "read"); CHECK(event_metrics.count == 1); CHECK(event_metrics.duration.total == 50); @@ -68,7 +68,7 @@ TEST_SUITE("ChunkAggregatorUtility") { REQUIRE(output.profile_aggregations.size() == 1); const auto& [profile_key, profile_metrics] = *output.profile_aggregations.begin(); - CHECK(profile_key.cat() == "PROFILE"); + CHECK(profile_key.cat() == "profile"); CHECK(profile_key.name() == "cpu_usage"); CHECK(profile_metrics.count == 4); CHECK(profile_metrics.duration.total == 80);