Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions docs/source/api/dfanalyzer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
DFAnalyzer Module
=================

The ``dftracer.utils.dfanalyzer`` module bridges the C++ aggregation index to
`dfanalyzer <https://github.com/hariharan-devarajan/dfanalyzer>`_. It provides
the index-build, Arrow-IPC marshalling, and distributed high-level-metrics
(HLM) helpers that dfanalyzer drives over a Dask cluster.

These helpers only depend on the :class:`~dftracer.utils.Indexer` and the Arrow
plumbing, so they live in ``dftracer-utils`` rather than being vendored inside
dfanalyzer.

Dask is an optional dependency -- the distributed helpers require
``dask.distributed`` to be installed.

Index Building
--------------

.. autofunction:: dftracer.utils.dfanalyzer.resolve_trace_inputs

.. autofunction:: dftracer.utils.dfanalyzer.index_path_for

.. autofunction:: dftracer.utils.dfanalyzer.build_index_distributed

.. autofunction:: dftracer.utils.dfanalyzer.ensure_index

Arrow IPC Marshalling
---------------------

The C extension yields Arrow data as PyCapsules. These helpers convert between
capsules, Arrow IPC byte streams (the wire format moved between Dask workers),
and pandas frames.

.. autofunction:: dftracer.utils.dfanalyzer.batches_to_ipc

.. autofunction:: dftracer.utils.dfanalyzer.ipc_to_pandas

.. autofunction:: dftracer.utils.dfanalyzer.scan_to_ipc

Distributed High-Level Metrics
------------------------------

The HLM pipeline aggregates the per-worker aggregation column family into a
Dask DataFrame. Each worker owns a disjoint PID set, so per-worker partials
have disjoint keys and need no cross-worker merge.

.. autofunction:: dftracer.utils.dfanalyzer.distributed_hlm

.. autofunction:: dftracer.utils.dfanalyzer.worker_hlm_partial

.. autofunction:: dftracer.utils.dfanalyzer.make_empty_hlm

View Groupby Partials
---------------------

These helpers implement mergeable per-partition view aggregation: each
partition emits partial aggregates (sum, count, min, max, sum-of-squares) that
are combined and finalized into mean/std without a global shuffle.

.. autofunction:: dftracer.utils.dfanalyzer.partial_arrow_view_groupby

.. autofunction:: dftracer.utils.dfanalyzer.finalize_view_partials

.. autofunction:: dftracer.utils.dfanalyzer.build_partial_meta

.. autofunction:: dftracer.utils.dfanalyzer.build_final_meta

Dtype Coercion
--------------

Utilities that normalize Arrow-backed dtypes into the pandas-native dtypes
expected by dfanalyzer's downstream ``metrics.py``.

.. autofunction:: dftracer.utils.dfanalyzer.normalize_arrow_dtypes

.. autofunction:: dftracer.utils.dfanalyzer.coerce_arrow_numerics_to_pandas_native

.. autofunction:: dftracer.utils.dfanalyzer.coerce_profile_dtypes
2 changes: 2 additions & 0 deletions docs/source/api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This section contains the Python API documentation for dftracer utilities.
runtime
reader
indexer
dfanalyzer

Comment thread
rayandrew marked this conversation as resolved.
Module Overview
---------------
Expand All @@ -23,3 +24,4 @@ The dftracer utilities Python package provides the following main modules:
- :doc:`runtime` - Coroutine runtime and Dask integration
- :doc:`reader` - Lazy JSON object type
- :doc:`indexer` - Indexing and searching capabilities
- :doc:`dfanalyzer` - dfanalyzer bridge: index build, Arrow IPC, distributed HLM
2 changes: 2 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,8 @@ def linkcode_resolve(domain: str, info: dict[str, str]) -> str | None:
sys.path.insert(0, str(PYTHON_SOURCE_DIR))
_install_rtd_extension_stub()
autodoc_mock_imports = [
"numpy",
"pandas",
"pyarrow",
"dask",
"dask.distributed",
Expand Down
4 changes: 2 additions & 2 deletions docs/source/cpp_api/utilities.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ Each module below has detailed class documentation in the API Reference:
- :doc:`api/utilities/filesystem`
* - File I/O
- File reading, writing, chunk writing, async line generators
- :doc:`api/utilities/fileio`
- :doc:`api/utilities/fileio/index`
* - Compression
- Streaming zlib compression (GZIP, ZLIB, DEFLATE)
- :doc:`api/utilities/composites`
- :doc:`api/utilities/composites/index`
* - Text
- Line splitting, filtering, text processing
- :doc:`api/utilities/text`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

namespace dftracer::utils::utilities::composites::dft::internal {

// Lowercase `s`; returns a view over `s` when already lowercase (no copy),
// else lowercases into `storage` (which must outlive the returned view).
std::string_view to_lower_ascii(std::string_view s, std::string& storage);

bool ascii_iequals(std::string_view a, std::string_view b);

// True when the event's return value represents bytes transferred.
// Checks both category (POSIX/STDIO) and function name.
bool is_data_transfer_op(std::string_view cat, std::string_view name);

/**
Expand Down
104 changes: 102 additions & 2 deletions python/dftracer/utils/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from typing import Any, Dict, List, Optional, Union

try:
from dask.distributed import Client, WorkerPlugin
from dask.distributed import Client, WorkerPlugin, get_client
except ImportError:
Client: Optional[Any] = None
WorkerPlugin: Optional[Any] = None
get_client: Optional[Any] = None

try:
import dask
Expand All @@ -27,7 +28,7 @@

if WorkerPlugin is not None:

class DFTracerUtilsDaskWorkerPlugin(WorkerPlugin):
class DFTracerUtilsDaskWorkerPlugin(WorkerPlugin): # ty: ignore[unsupported-base]
"""Creates a persistent Runtime per Dask worker."""

def __init__(self, threads=0, io_threads=0):
Expand All @@ -54,6 +55,105 @@ def teardown(self, worker):
pass
del worker.dftracer_utils_runtime

else:
DFTracerUtilsDaskWorkerPlugin = None


_plugin_registered_schedulers: set = set()


def resolve_local_staging(client) -> str:
"""Derive node-local SST scratch from each Dask worker's own scratch.

Workers share the path *string* (e.g. ``/scratch/$USER``) but each resolves
it to its own node-local storage; falls back to ``/tmp`` when nothing is
reported.
"""
workers = client.scheduler_info().get("workers", {}) or {}
if workers:
worker_local_dir = next(iter(workers.values())).get("local_directory") or "/tmp"
else:
worker_local_dir = "/tmp"
return os.path.join(worker_local_dir, "dftracer-sst-staging")


def register_auto_thread_plugin() -> None:
"""Register the DFTracer worker plugin on the active distributed client.

Sizes each worker's C++ Runtime threads as hardware_concurrency /
n_workers_on_node so the Runtime uses all cores without oversubscription.

Idempotent: re-registering the same plugin on the same scheduler triggers a
teardown+setup round-trip on every worker, which deadlocks if the previous
Runtime still has in-flight coroutines. Skips if already registered for the
scheduler address. A no-op when no distributed client is active.
"""
if DFTracerUtilsDaskWorkerPlugin is None:
return
try:
import logging
import time
from collections import Counter

client = get_client() # ty: ignore[call-non-callable]
sched_addr = getattr(client.scheduler, "address", None) or ""
if sched_addr in _plugin_registered_schedulers:
return

def _addr_to_host(addr: str) -> str:
return addr.split("://")[-1].rsplit(":", 1)[0]

nthreads = client.nthreads()
for _ in range(10):
nthreads_next = client.nthreads()
if len(nthreads_next) >= len(nthreads):
nthreads = nthreads_next
if len(nthreads) > 0:
break
time.sleep(0.5)
host_counts = Counter(_addr_to_host(a) for a in nthreads.keys())

logging.getLogger("dftracer.dask_plugin").info(
"coord register_plugin: host_counts=%s total_workers=%d worker_addr_sample=%s",
dict(host_counts),
sum(host_counts.values()),
list(nthreads.keys())[:8],
)

class _AutoThreadPlugin(DFTracerUtilsDaskWorkerPlugin):
def __init__(self, host_worker_counts):
super().__init__(threads=0)
self._host_worker_counts = host_worker_counts

def setup(self, worker):
total_cpus = (
len(os.sched_getaffinity(0))
if hasattr(os, "sched_getaffinity")
else os.cpu_count() or 1
)
my_host = worker.address.split("://")[-1].rsplit(":", 1)[0]
n_local = self._host_worker_counts.get(my_host, 1)
self.threads = max(1, total_cpus // n_local)
logging.getLogger("distributed.worker").info(
"DFTracer Runtime: host=%s cpus=%d workers_on_host=%d cpp_threads=%d dict_keys=%s",
my_host,
total_cpus,
n_local,
self.threads,
list(self._host_worker_counts.keys()),
)
super().setup(worker)

client.register_plugin(_AutoThreadPlugin(dict(host_counts)))
_plugin_registered_schedulers.add(sched_addr)
logging.getLogger("dftracer.dask_plugin").info(
"Registered DFTracerUtilsDaskWorkerPlugin host_worker_counts=%s total_workers=%d",
dict(host_counts),
sum(host_counts.values()),
)
except (ValueError, ImportError):
pass


def _write_arrow_task(
file_path: str,
Expand Down
Loading
Loading