Skip to content

feat: dynamic table-filter pushdown (Phase 1.1)#794

Draft
kevkrist wants to merge 15 commits into
sirius-db:devfrom
kevkrist:phase-1-1-dynamic-filter-pushdown
Draft

feat: dynamic table-filter pushdown (Phase 1.1)#794
kevkrist wants to merge 15 commits into
sirius-db:devfrom
kevkrist:phase-1-1-dynamic-filter-pushdown

Conversation

@kevkrist

@kevkrist kevkrist commented May 20, 2026

Copy link
Copy Markdown
Collaborator

Summary

End-to-end wiring of the dynamic-filter scaffolding from #664. A hash-join build computes a global (min, max) per join key via cudf::reduce, packages it as a single-zone sirius_dynamic_zone_map_filter, and pushes it into a channel shared with a downstream parquet scan. The scan AND-merges the AST fragments with its translated static filter and installs the combined predicate via cudf::io::parquet_reader_options::set_filter, so parquet row-group pruning fires against runtime values from the build side.

This PR implements Phase 1.1 for dynamic filters in Sirius (#664) — router on sirius_physical_plan_generator, probe_targets on the hash join, sirius_dynamic_filters field propagated through the scan operators and the pipeline converter into sirius_gpu_parquet_scan_operator, producer push at on_finalize_operator, consumer merge factored into a free function merge_dynamic_filters_into_ast for unit testing.

Coverage: HASH_JOIN_MODE::BUILD_PROBE only (the mode that materializes _build_table at finalize). BUILD_PROBE is gated on num_partitions == 1, so within this scope a single-zone zone-map is the per-partition answer. The multi-partition case (STANDARD / MIXED_JOIN modes, where the build sits in ports["build"]->repo as N partitions × M batches) lands in Phase 1.2 via the partition-sink hook, which computes per-partition (min, max) upstream of the join and feeds them in as N zones.

Also closes #795 — dynamic filters preserved across LogicalOperator::Copy()

LogicalOperator::Copy is implemented as serialize→deserialize. Neither LogicalComparisonJoin::filter_pushdown (serialized properties 200–208) nor LogicalGet::dynamic_filters (serialized properties 200–214) is in its operator's serialization schema, so a plain Copy strips them. Three sites in the transparent execution path each Copy the captured plan and would each independently strip the metadata:

  1. sirius_optimizer_hook (capture into SiriusContext)
  2. SiriusContext::OnFinalizePrepare (validation copy before installing PhysicalSiriusExecution)
  3. PhysicalSiriusExecution::GetDataInternal (per-execute copy when the prepared statement is re-run)

Factored into a single helper sirius::transparent::copy_logical_plan and applied at all three sites. The helper wraps LogicalOperator::Copy and additionally re-attaches filter_pushdown / dynamic_filters from the source via a parallel-walk; the DynamicTableFilterSet shared_ptr is shared (not deep-copied) to preserve the route-key pointer identity that pairs producer joins with their downstream scans.

Without this half, Phase 1.1's wiring is correct but inert in transparent gpu_execution mode — nothing downstream ever sees the optimizer's pushdown work.

Test plan

  • [dynamic_filter] — 21/21 pass (16 framework + 5 router).
  • [scan_merge] — 9/9 pass (empty set, dynamic-only, static + dynamic merge, hive-column skip, missing AST capability, multi-column, multi-filter-per-column, out-of-range guard).
  • [preserve] — 7/7 pass (3 clone scenarios incl. DynamicTableFilterSet pointer-identity preservation; 4 parallel-walk scenarios incl. children recursion and structural-mismatch bail).
  • [parquet][TPC-H] — 22/22 TPC-H parquet queries pass.
  • SF=30 TPC-H, full 22-query run via gpu_execution: 132 producer pushes fire end-to-end (builds 1–60,128 rows). Q14/Q19 specifically don't push — their lineitem-side joins are large enough that the partition operator splits them into >1 partitions, leaving the join in STANDARD mode (out of Phase 1.1's BUILD_PROBE scope; Phase 1.2 covers them). No performance regression.

🤖 Generated with Claude Code [Edited by human, Kevin Kristensen]

@kevkrist kevkrist marked this pull request as draft May 20, 2026 19:45
kevkrist added a commit to kevkrist/sirius that referenced this pull request May 20, 2026
…ent hook

LogicalOperator::Copy round-trips through serialize/deserialize. Neither
LogicalComparisonJoin::filter_pushdown nor LogicalGet::dynamic_filters
appears in its operator's serialization schema, so both fields are null
on the deserialized copy. In transparent gpu_execution mode the captured
copy is what Sirius's planner walks — making any downstream consumer of
the dynamic-filter pipeline dormant.

Walks the original and copied trees in parallel inside sirius_optimizer_hook
and re-attaches both fields onto the copy. DynamicTableFilterSet shared_ptrs
are *shared* (not deep-copied) to preserve the route-key pointer identity
that pairs producer joins with their downstream scans.
LogicalComparisonJoin::filter_pushdown is deep-copied via
clone_filter_pushdown_info because DuckDB's CPU fallback path still
consumes the original; min_max_aggregates is omitted (Sirius does not
read it, and cloning Expression trees is non-trivial).

Refs sirius-db#795. With this fix in place, Phase 1.1 of dynamic table-filter
pushdown (sirius-db#794) activates automatically — no further code change needed.

Tests:
- 3 cases for clone_filter_pushdown_info (scalar fields, DynamicTableFilterSet
  pointer identity preserved, column bindings/storage types copied).
- 4 cases for preserve_dynamic_filter_metadata (no-op when null, copies
  onto fresh copy, recurses into children, bails on structural mismatch).
- All 22 TPC-H parquet integration tests continue to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kevkrist added a commit to kevkrist/sirius that referenced this pull request May 20, 2026
The previous commit introduced preserve_dynamic_filter_metadata and applied
it at the optimizer hook only. End-to-end testing on TPC-H Q14 surfaced two
more Copy() sites that strip the metadata again:

  - SiriusContext::OnFinalizePrepare (sirius_context.cpp:648) copies the
    captured plan for validation before installing PhysicalSiriusExecution.
  - PhysicalSiriusExecution::GetDataInternal (physical_sirius_execution.cpp:119)
    copies per-execute when the prepared statement is re-run.

Factored into a single public helper, copy_logical_plan_preserving_dynamic_filters,
and applied at all three sites. The optimizer hook keeps a counts out-param so
its diagnostic log can report how many joins/gets received metadata; the other
two call sites use the default null counts.

Verified end-to-end on Q14 at SF=30: "LogicalGet has dynamic_filters attached"
and "Wired hash join with 1 probe target(s)" INFO lines now fire on every
plan-gen pass (both validation and per-execute). The producer push at finalize
remains gated to HASH_JOIN_MODE::BUILD_PROBE per sirius-db#794's scope — at SF=30
Q14's join falls into STANDARD mode (partition operator splits the build),
so the producer push itself doesn't fire, but every upstream prerequisite now
does. Full TPC-H parquet suite (22/22) still passes; SF=30 TPC-H benchmark
+0.80% vs baseline (within noise).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kevkrist and others added 5 commits June 1, 2026 10:38
End-to-end wiring of the dynamic-filter scaffolding from sirius-db#664: a hash-join
build side computes a global (min, max) per join key via cudf::reduce, packages
it as a single-zone sirius_dynamic_zone_map_filter, and pushes it into a
channel shared with a downstream parquet scan. The scan AND-merges those AST
fragments with its translated static filter and installs the combined
predicate via reader_options::set_filter, so parquet row-group pruning fires
against actual runtime values from the build side.

Producer is gated to HASH_JOIN_MODE::BUILD_PROBE (num_partitions == 1) for
1.1; multi-zone per-build-partition bounds land in 1.2 via the partition-sink
hook. Consumer wiring targets sirius_gpu_parquet_scan_operator (not the
deprecated parquet_scan_task). The merge loop is extracted into
merge_dynamic_filters_into_ast for unit testing.

Tests: router lookup-or-create (5 cases), merge function (9 cases covering
empty set, dynamic-only, static + dynamic, hive-column skip, missing AST
capability, multi-column / multi-filter / out-of-range guard). TPC-H Q14 and
Q19 parquet integration tests continue to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds three runtime-observability INFO log lines so the dynamic-filter
pipeline is visible from sirius.log without enabling debug-level output:

- sirius_plan_get: "LogicalGet has dynamic_filters attached" — fires
  once per scan whose channel was wired up by plan-gen.
- sirius_plan_comparison_join: "Wired hash join with N probe target(s)"
  — fires once per join whose filter_pushdown was populated.
- sirius_physical_hash_join: "Pushed N dynamic filter(s) across M
  target(s)" — fires once per BUILD_PROBE finalize that actually
  emits zone-map filters.

The per-task consumer-merge log stays at DEBUG to avoid noise on
busy scans (one line per row-group partition per file).

SF=30 TPC-H benchmarking found that none of these fire in transparent
execution mode because plan->Copy() in sirius_optimizer_extension
strips LogicalComparisonJoin::filter_pushdown and LogicalGet::
dynamic_filters (no entries in DuckDB's plan serialization). Phase
1.1's wiring is correct but dormant until that's addressed — these
logs will surface the first activation cleanly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ent hook

LogicalOperator::Copy round-trips through serialize/deserialize. Neither
LogicalComparisonJoin::filter_pushdown nor LogicalGet::dynamic_filters
appears in its operator's serialization schema, so both fields are null
on the deserialized copy. In transparent gpu_execution mode the captured
copy is what Sirius's planner walks — making any downstream consumer of
the dynamic-filter pipeline dormant.

Walks the original and copied trees in parallel inside sirius_optimizer_hook
and re-attaches both fields onto the copy. DynamicTableFilterSet shared_ptrs
are *shared* (not deep-copied) to preserve the route-key pointer identity
that pairs producer joins with their downstream scans.
LogicalComparisonJoin::filter_pushdown is deep-copied via
clone_filter_pushdown_info because DuckDB's CPU fallback path still
consumes the original; min_max_aggregates is omitted (Sirius does not
read it, and cloning Expression trees is non-trivial).

Refs sirius-db#795. With this fix in place, Phase 1.1 of dynamic table-filter
pushdown (sirius-db#794) activates automatically — no further code change needed.

Tests:
- 3 cases for clone_filter_pushdown_info (scalar fields, DynamicTableFilterSet
  pointer identity preserved, column bindings/storage types copied).
- 4 cases for preserve_dynamic_filter_metadata (no-op when null, copies
  onto fresh copy, recurses into children, bails on structural mismatch).
- All 22 TPC-H parquet integration tests continue to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous commit introduced preserve_dynamic_filter_metadata and applied
it at the optimizer hook only. End-to-end testing on TPC-H Q14 surfaced two
more Copy() sites that strip the metadata again:

  - SiriusContext::OnFinalizePrepare (sirius_context.cpp:648) copies the
    captured plan for validation before installing PhysicalSiriusExecution.
  - PhysicalSiriusExecution::GetDataInternal (physical_sirius_execution.cpp:119)
    copies per-execute when the prepared statement is re-run.

Factored into a single public helper, copy_logical_plan_preserving_dynamic_filters,
and applied at all three sites. The optimizer hook keeps a counts out-param so
its diagnostic log can report how many joins/gets received metadata; the other
two call sites use the default null counts.

Verified end-to-end on Q14 at SF=30: "LogicalGet has dynamic_filters attached"
and "Wired hash join with 1 probe target(s)" INFO lines now fire on every
plan-gen pass (both validation and per-execute). The producer push at finalize
remains gated to HASH_JOIN_MODE::BUILD_PROBE per sirius-db#794's scope — at SF=30
Q14's join falls into STANDARD mode (partition operator splits the build),
so the producer push itself doesn't fire, but every upstream prerequisite now
does. Full TPC-H parquet suite (22/22) still passes; SF=30 TPC-H benchmark
+0.80% vs baseline (within noise).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kevkrist kevkrist force-pushed the phase-1-1-dynamic-filter-pushdown branch from 1988f34 to 3752fa3 Compare June 1, 2026 16:04
@kevkrist kevkrist marked this pull request as ready for review June 1, 2026 16:35
kevkrist and others added 7 commits June 4, 2026 19:16
…ication

Phase 1.1 was correct but published the join build's zone-map filter too late to
prune the probe scan: the probe scan runs concurrently with the build (injected
PARTITION/CONCAT), the probe-side CONCAT withholds probe batches from the join
until the whole scan finishes, and BUILD_PROBE builds its hash table from the
first probe batch -- so the filter was born after the scan had read its splits.

This adds:

- Atomic notification on sirius_dynamic_filter_set: producer-counted
  register_producer/mark_producer_ready/ready/wait_until_filters/has_filters, so a
  consumer is always released even on an empty/aborted build.
- Build-side producer publish: sirius_physical_hash_join overrides
  push_data_batch_partitioned to reduce (min,max) and publish the moment the
  (concat-folded) build batch is delivered to the build port -- before any probe
  batch, outside the task state machine. (A build-only first task was tried and
  reverted: it deadlocks the task_creator, whose WAITING(probe) hint is what
  drives probe production.)
- Three-point consumer application in the probe scan: reader set_filter at read,
  runtime apply on cached/pinned scans, and post-decode apply for late arrivals,
  plus a bounded readiness wait (dynamic_filter_wait_ms).
- apply_dynamic_filters_to_output_table runtime apply (compute_column +
  apply_boolean_mask), config flags + SET handlers, plan-gen producer
  registration and a central enable gate.
- Fix: the existing merge path silently dropped a static filter when its AST
  translation failed and a dynamic filter was present.

Measured SF30 (lineitem join keyset on l_orderkey, pure dynamic): default prunes
21/36 splits and beats OFF; dynamic_filter_wait_ms=150 prunes all 36 -> ~1.6x,
results bit-identical to CPU. Full unit suite (1556 cases) green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
publish_dynamic_filters_locked is publish-once (guarded by _dynamic_filters_published
under op_state_mutex) and each producer calls register_producer / mark_producer_ready
exactly once per channel, so _producers_ready can never exceed _producers_registered.
The clamp was unreachable and didn't actually protect a multi-producer channel anyway
(it caps the aggregate count, not per-producer). Replace it with a plain increment and
state the once-per-registered-producer contract; drop the test that exercised the clamp.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The bare `{ lock_guard lk(_cv_mu); }` after the atomic fetch_add was a lost-wakeup
barrier, but read as a mystery no-op. Move the fetch_add inside that lock (the
canonical "mutate predicate state under the cv mutex, then notify" form) so the
intent is obvious. No behavior change — _filter_count is atomic, so the lock-free
has_filters() fast path is unaffected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…eady) for Phase 1.3

Document why the eager wait_until_filters is the default and why "a zone-map fires
before the bloom arrives" is not a problem: filters are conjunctive superset
predicates (correctness unaffected), the consumer re-snapshots the channel per
split so a late bloom applies to subsequent splits, and wait_until_ready (wait for
the complete set) would serialize the scan behind the slow bloom build. Adds
guidance for the Phase 1.3 bloom producer (push zone-map + bloom before
mark_producer_ready) and notes the harmless per-push partial-snapshot window. Cross
-referenced from the §1.3 Bloom section.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The build-side dynamic-filter publish (in push_data_batch_partitioned) was running
its cudf::reduce on an ad-hoc `rmm::cuda_stream` created+synced+destroyed per call
— the only such site in src/, off the codebase's stream convention, and on a worker
thread while holding op_state_mutex. Replace it with the batch's own
memory_space->acquire_stream(): a pooled, per-device stream, which is how the rest
of the tree gets a stream for GPU work outside an operator execute() (cf.
host_parquet_representation_converters.cpp, cached_split_provider.cpp). This is
device-bound by construction (no reliance on the thread's current device), avoids
the per-call create/destroy, and the misleading "no task stream of its own" comment
is corrected: the folded build batch is already materialized at delivery (the
producing CONCAT execute() was synchronized by run_one_operator), and the (min,max)
scalars are synced via is_valid(stream) before any consumer reads them, so the
explicit synchronize() was redundant and is dropped. Full suite (1555) green;
keyset benchmark still prunes (merged@read > 0) with identical results.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ly path

The comment above the BUILT-transition publish_dynamic_filters_locked call still
described it as the 'early publish at the first probe task' (true before the
build-port hook existed) AND as a fallback — contradicting itself. Collapse to one
accurate note: the primary publish is the push_data_batch_partitioned build-port
hook; this site only runs if that was skipped (build batch not GPU-resident at
delivery), and lands after the first probe batch so it cannot prune already-read
splits.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@kevkrist kevkrist marked this pull request as draft June 5, 2026 20:04
kevkrist and others added 2 commits June 5, 2026 20:59
… keys

Zone-map [min,max] filters prune nothing on TPC-H's scattered join keys (even
a 0.24%-selective build spans the whole key domain). Add set-membership filters
that distinguish scattered keys exactly:

- sirius_apply_lowerable capability mixin (compute_mask(probe) -> BOOL).
- sirius_dynamic_in_list_filter: exact, cudf::stable_distinct + cudf::contains
  (small selective builds).
- sirius_dynamic_bloom_filter: cuco::bloom_filter (PIMPL'd .cu, INT64), a few
  bits/key, for large builds; false positives only (join stays authoritative).
- Producer picks one filter per key by build size (<=2M IN-list, >2M INT64
  bloom, else zone-map); consumer applies post-decode (AND-combined with any
  zone-map AST mask).
- Adaptive selectivity gate: first split measures keep ratio (free) and disables
  the membership apply for the rest of a scan when it keeps >50% of rows, so a
  non-selective build can't regress the whole scan.

SF50 full TPC-H, robust medians, all 22 results bit-identical OFF vs ON:
Q21 -10.6% (bloom on F-status orders, before lineitem's 3-way self-join),
Q2 -7.9% (IN-list on selective part); net ~-1.5%. Off by default.

Limitation: applied post-decode, so it saves downstream work, not scan I/O.

Unit tests: IN-list exact membership, Bloom no-false-negatives. cuco bundled
with cudf (CUDA flags: --expt-extended-lambda --expt-relaxed-constexpr).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cuco's default cuda_allocator uses cudaMalloc/cudaFree, which implicitly
synchronize the device and bypass the RMM pool the rest of the query runs in.
Thread the compute stream's device_async_resource_ref into a small
cuco-compatible, stream-ordered allocator (allocate(n, stream) -> rmm pool), so
the Bloom bit array comes from the same pool with no synchronizing allocation on
the hot path.

Unit tests (construction/destruction) and Q21 (-12.7%, bit-identical) unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dynamic table filters stripped by plan->Copy() in transparent execution mode

1 participant