From f42bd6ae3ead3528e22fd79b448eff59a6226f3f Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Fri, 20 Feb 2026 17:26:18 -0300 Subject: [PATCH 01/16] chore: plans for async generators and task-queue dataset builder Part of #346 --- plans/346/async-generators-and-task-queue.md | 274 +++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 plans/346/async-generators-and-task-queue.md diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md new file mode 100644 index 00000000..221bc530 --- /dev/null +++ b/plans/346/async-generators-and-task-queue.md @@ -0,0 +1,274 @@ +# Plan: Async Generators & Task Queue Builder + +Created: 2026-02-20 +Status: Planning + +Issue: [#346](https://github.com/NVIDIA-NeMo/DataDesigner/issues/346) + +Related: +- [#260](https://github.com/NVIDIA-NeMo/DataDesigner/issues/260) — original async engine plan +- [PR #280](https://github.com/NVIDIA-NeMo/DataDesigner/pull/280) — async ModelFacade (merged) +- [PR #269](https://github.com/NVIDIA-NeMo/DataDesigner/pull/269) — execution graph reference impl (draft) +- [PR #344](https://github.com/NVIDIA-NeMo/DataDesigner/pull/344) — model facade overhaul plans + +## Goal + +Transform the dataset builder from sequential column-by-column processing into an +async task queue with dependency-aware scheduling. Generators become async-first, +and the builder dispatches individual cell/batch tasks as soon as their upstream +dependencies are satisfied — enabling pipeline parallelism across columns and rows. + +### Current architecture + +``` +for batch in batches (of buffer_size): # sequential + for column in columns: # sequential + if from_scratch: generate_from_scratch(batch) + elif cell_by_cell: fan_out(cells) # parallel within column + elif full_column: generate(df) + checkpoint(batch) +``` + +Columns execute sequentially even when they have no mutual dependency. Rows in +different batches never overlap. Only cell-level fan-out within a single column +is parallelised. + +### Target architecture + +``` +all tasks across all row groups submitted to a single async scheduler +scheduler dispatches tasks as dependencies are met, bounded by semaphore +row groups checkpointed to parquet when fully complete +``` + +Multiple columns can execute in parallel when they don't depend on each other. +Rows from different row groups can pipeline (row group 1 column B starts while +row group 0 column C is still running). + +## Key Design Decisions + +### 1. Dynamic dependency resolution, not a static graph + +We don't build an explicit graph object (unlike PR #269). Instead: +- At setup: build a **dependency map** `dict[str, set[str]]` from each column's + `config.required_columns` property (already available on all config types via + Jinja2 template introspection). +- At runtime: a **completion tracker** (columns × rows matrix) determines which + tasks are ready by checking whether all upstream columns for a given row are done. + +This is simpler, requires no new data structures beyond what configs already +provide, and naturally handles the "dynamic" aspect — we just check readiness as +tasks complete. + +### 2. Task granularity + +| Generator type | Task unit | Readiness condition | +|---|---|---| +| `FromScratch` (seed, sampler) | `(column, row_group)` | No dependencies (always first) | +| `CELL_BY_CELL` (LLM text/code/structured/judge, image, embedding, custom) | `(column, row)` | All `required_columns` complete for that row | +| `FULL_COLUMN` (expression, validation, sampler-as-transform) | `(column, row_group)` | All `required_columns` complete for ALL rows in row group | + +### 3. Row groups as checkpoint units + +Rows are partitioned into groups of `buffer_size` (same as current batches). +When all tasks for a row group are complete, write to parquet and free memory. +This preserves the current checkpoint/memory semantics. + +Full-column generators operate on their entire row group at once, same as today. + +### 4. Concurrency control + +A single `asyncio.Semaphore` bounds total in-flight tasks. The limit should be +the sum of `max_parallel_requests` across distinct models, or a configured global +cap — whichever is smaller. This replaces per-column fan-out. + +### 5. Pre/post-batch processors + +- **Pre-batch**: runs after seed generators complete for a row group, before + other columns. Modeled as a barrier task for the row group. +- **Post-batch**: runs after all columns complete for a row group, before + checkpoint write. + +## Success Criteria + +- [ ] All generators expose async-first `agenerate` (cell-by-cell) or async wrappers (full-column/from-scratch) +- [ ] Builder dispatches tasks based on dependency readiness, not column order +- [ ] Multiple columns execute in parallel when dependencies allow +- [ ] Row groups checkpoint to parquet upon full completion +- [ ] Existing sync path (`DATA_DESIGNER_ASYNC_ENGINE=0`) continues to work unchanged +- [ ] All existing tests pass; new tests cover dependency resolution and scheduling +- [ ] `make test-run-recipes` passes with async engine enabled + +## Implementation Steps + +### Step 1: Column Dependency Map + +Build the dependency map from column configs at builder init time. + +- [ ] Add `build_dependency_map(column_configs) -> dict[str, set[str]]` utility + - Input: the ordered list of `ColumnConfigT` / `MultiColumnConfig` + - For each config, read `config.required_columns` → set of upstream column names + - For `MultiColumnConfig`, all sub-columns share the same dependencies + - Validate: every required column must appear earlier in the config list (detect cycles / missing refs) +- [ ] Add `topological_order(dependency_map) -> list[str]` — returns columns in valid execution order (should match config order; this is a validation step) +- [ ] Unit tests for dependency map construction and validation + +**Files**: new module `engine/dataset_builders/utils/dependency_map.py`, tests + +### Step 2: Completion Tracker + +A lightweight structure tracking which (column, row) pairs are done. + +- [ ] `CompletionTracker` class: + - Internal: `dict[str, set[int]]` mapping column name → set of completed row indices + - `mark_complete(column: str, row: int)` / `mark_batch_complete(column: str, row_group: int, row_group_size: int)` + - `is_ready(column: str, row: int, dependency_map) -> bool` — checks all upstream columns for that row + - `is_batch_ready(column: str, row_group: int, row_group_size: int, dependency_map) -> bool` — checks all rows in group + - `is_row_group_complete(row_group: int, row_group_size: int, all_columns: list[str]) -> bool` — all columns done for all rows + - `get_ready_tasks(dependency_map, columns_with_strategy, row_groups) -> list[Task]` — yields all currently dispatchable tasks +- [ ] No locks needed: all access is from the single asyncio event loop thread +- [ ] Unit tests + +**Files**: new module `engine/dataset_builders/utils/completion_tracker.py`, tests + +### Step 3: Task Model + +Simple dataclass representing a unit of work. + +- [ ] `Task` dataclass: + - `column: str` + - `row_group: int` + - `row_index: int | None` (None for batch tasks) + - `task_type: Literal["from_scratch", "cell", "batch", "pre_batch_processor", "post_batch_processor"]` +- [ ] `TaskResult` with status, output, error info +- [ ] Hashable so we can track dispatched/pending sets + +**Files**: new module `engine/dataset_builders/utils/task_model.py` (or inline in scheduler) + +### Step 4: Async Task Scheduler + +The core orchestrator that replaces `_run_batch` for the async path. + +- [ ] `AsyncTaskScheduler` class: + - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit, error/result callbacks + - `async run()` — main loop: + 1. Seed row groups: dispatch all `from_scratch` tasks + 2. Loop: query `completion_tracker.get_ready_tasks()` → dispatch each via `asyncio.create_task()` behind semaphore → on completion, update tracker → repeat until all tasks done or early shutdown + 3. After each row group completes: run post-batch processors, checkpoint + - Task dispatch: wraps generator call in semaphore-guarded coroutine + - Error handling: same early-shutdown logic as `AsyncConcurrentExecutor` (error rate threshold within sliding window) + - Progress tracking: reuse `ProgressTracker` per column +- [ ] Use `asyncio.Event` or `asyncio.Condition` to wake the scheduler when a task completes (avoids polling) +- [ ] Unit tests with mock generators + +**Files**: new module `engine/dataset_builders/async_scheduler.py`, tests + +### Step 5: Generator Async Migration + +Make all generator types async-capable. + +- [ ] `ColumnGeneratorWithModelChatCompletion.agenerate` — already implemented (PR #280), no changes needed +- [ ] `FromScratchColumnGenerator`: add `async agenerate_from_scratch(num_records) -> DataFrame` — wraps sync in `asyncio.to_thread` +- [ ] `ColumnGeneratorFullColumn`: add `async agenerate(data: DataFrame) -> DataFrame` — wraps sync in `asyncio.to_thread` +- [ ] `ExpressionColumnGenerator`: inherits full-column async wrapper +- [ ] `SamplerColumnGenerator`: inherits from-scratch async wrapper +- [ ] `SeedDatasetColumnGenerator`: inherits from-scratch async wrapper +- [ ] `ValidationColumnGenerator`: inherits full-column async wrapper +- [ ] `CustomColumnGenerator`: inherits whichever strategy it uses +- [ ] `ImageCellGenerator`, `EmbeddingCellGenerator`: add native `agenerate` using `model.agenerate_image` / `model.agenerate_text_embeddings` +- [ ] Base class `ColumnGenerator.agenerate` fallback (already exists via `asyncio.to_thread`) is sufficient for non-LLM generators + +**Files**: `generators/base.py`, `generators/expression.py`, `generators/samplers.py`, `generators/seed_dataset.py`, `generators/image.py`, `generators/embedding.py`, tests + +### Step 6: Buffer / Row Group Manager + +Adapt `DatasetBatchManager` for concurrent row group processing. + +- [ ] Support multiple row groups in-flight simultaneously (currently only one batch's buffer exists) + - Option A: Multiple buffer instances (one per active row group) + - Option B: Single shared buffer partitioned by row group offset ranges + - Recommendation: **Option A** — cleaner isolation, each row group has its own `list[dict]` +- [ ] `update_cell(row_group: int, row_index: int, column: str, value: Any)` — fine-grained update (vs. replacing entire record dict) + - Or keep the current `update_record(index, record_dict)` approach per row group +- [ ] `checkpoint_row_group(row_group: int)` — write parquet, free memory +- [ ] Preserve `drop_records` semantics within each row group +- [ ] Keep backward compatibility with sync path (the existing `DatasetBatchManager` is untouched) + +**Files**: new class or extension in `dataset_batch_manager.py`, tests + +### Step 7: Builder Integration + +Wire the new scheduler into `ColumnWiseDatasetBuilder`. + +- [ ] New method `_build_async(generators, num_records, buffer_size, ...)`: + 1. Build dependency map from `self._column_configs` + 2. Partition rows into row groups + 3. Create `CompletionTracker`, `AsyncTaskScheduler` + 4. Run scheduler on the background event loop (reuse `_ensure_async_engine_loop()`) + 5. Scheduler handles checkpointing via callbacks +- [ ] `build()` dispatches to `_build_async()` when `DATA_DESIGNER_ASYNC_ENGINE=1`, else existing sync path +- [ ] `build_preview()` uses the same async path (single row group, no checkpoint) +- [ ] Error handling: `DatasetGenerationError` wrapping, record dropping, telemetry events +- [ ] Processor integration: + - Pre-batch: scheduler runs after seed tasks for a row group + - Post-batch: scheduler runs after all column tasks for a row group, before checkpoint + +**Files**: `column_wise_builder.py` + +### Step 8: Tests & Validation + +- [ ] Unit tests for each new module (dependency map, completion tracker, task model, scheduler) +- [ ] Integration test: multi-column config with known dependencies, verify parallel execution +- [ ] Integration test: mixed cell-by-cell + full-column generators +- [ ] Integration test: error rate shutdown +- [ ] Integration test: checkpoint correctness (row groups written in order, parquet valid) +- [ ] Run `make test` — all existing tests pass +- [ ] Run `make test-run-recipes` with `DATA_DESIGNER_ASYNC_ENGINE=1` +- [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency + +## Risks & Considerations + +- **Memory with concurrent row groups**: Having multiple row groups in-flight increases + peak memory. Mitigation: limit max concurrent row groups (e.g., 2-3) via a separate + semaphore or by feeding row groups into the scheduler incrementally. + +- **Record dropping across columns**: If row 5 fails on column B, we must prevent + column C from processing row 5. The completion tracker naturally handles this — a + failed task is never marked complete, so downstream tasks never become ready. But we + need to detect when a row group is "done enough" to checkpoint (some rows dropped). + Solution: track dropped rows per row group; row group is complete when all non-dropped + rows have all columns done. + +- **Full-column generator ordering**: If two full-column generators have no mutual + dependency, they could run in parallel on the same row group. This is safe as long + as they operate on independent columns. Need to ensure the DataFrame isn't mutated + concurrently — use `asyncio.to_thread` (which copies into a thread) or serialize + full-column tasks within a row group. + +- **Pre-batch processors mutating data**: Pre-batch processors (e.g., schema transform) + can add/remove/modify rows. This changes the row count and invalidates the completion + tracker's row indices. Solution: treat pre-batch as a barrier that resets the tracker + state for that row group (re-index rows after processor runs). + +- **Backward compatibility**: The sync path must remain untouched. All new code is + gated behind `DATA_DESIGNER_ASYNC_ENGINE=1` and sits in new modules. + +## Notes + +### What we're NOT doing in this PR +- Overhauling `ModelFacade` internals (PR #344's scope) +- Building a heavyweight static execution graph (PR #269's approach — we take the + lightweight dynamic approach instead) +- Removing the sync/threaded path (it stays as the default) + +### Key insight from existing code +Every column config already has a `required_columns` property that returns the +column names referenced in its Jinja2 templates. This gives us explicit dependency +information without any config schema changes. The dependency map is just +`{col.name: set(col.required_columns) for col in configs}`. + +### On "dynamic" graph building +The graph is implicit in the dependency map + completion tracker. We never +materialise a node/edge graph structure. The scheduler dynamically determines +readiness by querying the tracker — this is what "dynamic" means in this context. +As tasks complete, new tasks become eligible. No upfront planning of execution order. From c95ac3a4bf9320757996253d49afa5c21ea75e83 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Tue, 24 Feb 2026 11:35:30 -0300 Subject: [PATCH 02/16] address review feedback on async generators plan - Decouple scheduler semaphore (coarse resource guard) from PR #344's adaptive throttle manager (per-key API concurrency) - Add side-effect output column mapping to dependency resolution - Mandate cell-level merge writes, remove unsafe update_record option - Add is_stateful generator property for reentrancy control - Add retry & salvage policy for transient failures - Scope allow_resize out of async v1 (falls back to sync) - Fix asyncio.to_thread reference semantics, require defensive copies - Add new test cases for all above --- plans/346/async-generators-and-task-queue.md | 145 ++++++++++++++++--- 1 file changed, 121 insertions(+), 24 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 221bc530..b1faf993 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -53,6 +53,11 @@ We don't build an explicit graph object (unlike PR #269). Instead: - At setup: build a **dependency map** `dict[str, set[str]]` from each column's `config.required_columns` property (already available on all config types via Jinja2 template introspection). +- The dependency map also registers **side-effect output columns** (e.g., + `__trace`, `__reasoning_content`) and maps them back to their producer generator. + A downstream column referencing `summary__trace` resolves to a dependency on + the `summary` generator. This ensures side-effect columns are never missing from + the map or treated as unsatisfied. - At runtime: a **completion tracker** (columns × rows matrix) determines which tasks are ready by checking whether all upstream columns for a given row are done. @@ -78,17 +83,74 @@ Full-column generators operate on their entire row group at once, same as today. ### 4. Concurrency control -A single `asyncio.Semaphore` bounds total in-flight tasks. The limit should be -the sum of `max_parallel_requests` across distinct models, or a configured global -cap — whichever is smaller. This replaces per-column fan-out. +Two independent layers: -### 5. Pre/post-batch processors +1. **Scheduler semaphore** — a coarse resource guard bounding total in-flight + tasks to limit CPU/memory pressure (e.g., configurable cap, default ~128). + This is **not** the source of truth for API concurrency; it only prevents + unbounded coroutine fan-out. + +2. **Throttle manager** (from PR #344) — gates every outbound LLM call, keyed + by `provider+model(+domain)`. Dynamically adjusts per-key limits on 429s + via AIMD. This is the real API concurrency control. + +Tasks must **not hold scheduler slots while waiting on throttle backoff**. A task +acquires a scheduler slot, prepares its request, then releases the slot before +awaiting the throttle permit. This ensures a throttled model key doesn't starve +unrelated keys by hogging scheduler capacity. + +This replaces per-column fan-out and composes cleanly with PR #344's adaptive +throttling without the two systems fighting each other. + +### 5. Generator statefulness and reentrancy + +Generators declare whether they are stateful via an `is_stateful` property on +the base class (default `False`). Stateful generators maintain internal state +across calls (e.g., `SeedDatasetColumnGenerator` has a DuckDB batch reader +cursor and leftover-row buffer). The scheduler **serializes tasks per-instance** +for stateful generators — row group N must complete before row group N+1 starts +for that generator. Stateless generators (e.g., `SamplerColumnGenerator`) can +dispatch all row groups concurrently. + +This is a generator-level attribute, not a type-level assumption. Custom +generators declare their own contract. + +### 6. Pre/post-batch processors - **Pre-batch**: runs after seed generators complete for a row group, before other columns. Modeled as a barrier task for the row group. - **Post-batch**: runs after all columns complete for a row group, before checkpoint write. +### 7. Retry & salvage policy + +In deep pipelines, a transient failure on a late column drops the entire row, +wasting all upstream generation work. Controlled retry rounds recover rows that +would otherwise be lost. + +1. **Classify failures**: transient (429, 500, timeout) → retryable; permanent + (400, validation error, schema mismatch) → non-retryable (immediate drop). +2. **Deferred queue**: retryable failures are placed in a deferred queue with + `attempt` count, `next_eligible_at` timestamp, and exponential backoff + jitter. +3. **Scheduling priority**: normal ready tasks are dispatched first. When the + ready queue drains, the scheduler runs up to `N` salvage rounds over deferred + tasks (configurable via `async_scheduler_max_retries`, default 2). +4. **Throttle-aware**: retries re-enter the throttle manager acquire path, so + they don't exacerbate rate limiting. +5. **Final drop**: after retry budget is exhausted, mark the cell as failed and + the row as dropped. Continue row-group completion checks over remaining rows. + +### 8. `allow_resize` scoping + +The completion tracker uses row indices as stable identifiers. `allow_resize` +lets any generator change the row count mid-pipeline, which invalidates all +per-row completion state for downstream columns. Supporting this under parallel +execution would require dynamic rescheduling and row identity tracking. + +**Async v1 scope**: if any column config has `allow_resize=True`, the builder +falls back to the sync path. This is safe because resize is opt-in and the sync +path handles it naturally. Full async support for resize is a follow-up. + ## Success Criteria - [ ] All generators expose async-first `agenerate` (cell-by-cell) or async wrappers (full-column/from-scratch) @@ -108,8 +170,11 @@ Build the dependency map from column configs at builder init time. - [ ] Add `build_dependency_map(column_configs) -> dict[str, set[str]]` utility - Input: the ordered list of `ColumnConfigT` / `MultiColumnConfig` - For each config, read `config.required_columns` → set of upstream column names + - Also register side-effect output columns (`__trace`, `__reasoning_content`, etc.) + and map them back to their producer column, so downstream references resolve correctly - For `MultiColumnConfig`, all sub-columns share the same dependencies - - Validate: every required column must appear earlier in the config list (detect cycles / missing refs) + - Validate: every required column must appear earlier in the config list or be a + registered side-effect output (detect cycles / missing refs) - [ ] Add `topological_order(dependency_map) -> list[str]` — returns columns in valid execution order (should match config order; this is a validation step) - [ ] Unit tests for dependency map construction and validation @@ -152,11 +217,20 @@ The core orchestrator that replaces `_run_batch` for the async path. - [ ] `AsyncTaskScheduler` class: - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit, error/result callbacks - `async run()` — main loop: - 1. Seed row groups: dispatch all `from_scratch` tasks - 2. Loop: query `completion_tracker.get_ready_tasks()` → dispatch each via `asyncio.create_task()` behind semaphore → on completion, update tracker → repeat until all tasks done or early shutdown - 3. After each row group completes: run post-batch processors, checkpoint - - Task dispatch: wraps generator call in semaphore-guarded coroutine - - Error handling: same early-shutdown logic as `AsyncConcurrentExecutor` (error rate threshold within sliding window) + 1. Dispatch `from_scratch` tasks, respecting `is_stateful`: stateful generators + serialize per-instance (row group N completes before N+1 starts for that + generator); stateless generators dispatch all row groups concurrently + 2. Loop: query `completion_tracker.get_ready_tasks()` → dispatch each via + `asyncio.create_task()` behind scheduler semaphore → on completion, update + tracker → repeat until all tasks done or early shutdown + 3. When ready queue drains, run salvage rounds over deferred retryable failures + (up to `async_scheduler_max_retries` rounds) + 4. After each row group completes: run post-batch processors, checkpoint + - Task dispatch: acquire scheduler semaphore slot → prepare request → release + slot → await throttle permit (for LLM tasks) → execute → write result + - Error handling: classify failures as retryable vs non-retryable; retryable + go to deferred queue with backoff; same early-shutdown logic as + `AsyncConcurrentExecutor` (error rate threshold within sliding window) - Progress tracking: reuse `ProgressTracker` per column - [ ] Use `asyncio.Event` or `asyncio.Condition` to wake the scheduler when a task completes (avoids polling) - [ ] Unit tests with mock generators @@ -165,16 +239,18 @@ The core orchestrator that replaces `_run_batch` for the async path. ### Step 5: Generator Async Migration -Make all generator types async-capable. +Make all generator types async-capable and declare statefulness. +- [ ] Add `is_stateful` property to base `ColumnGenerator` (default `False`). + Stateful generators are serialized per-instance by the scheduler. - [ ] `ColumnGeneratorWithModelChatCompletion.agenerate` — already implemented (PR #280), no changes needed -- [ ] `FromScratchColumnGenerator`: add `async agenerate_from_scratch(num_records) -> DataFrame` — wraps sync in `asyncio.to_thread` -- [ ] `ColumnGeneratorFullColumn`: add `async agenerate(data: DataFrame) -> DataFrame` — wraps sync in `asyncio.to_thread` +- [ ] `FromScratchColumnGenerator`: add `async agenerate_from_scratch(num_records) -> DataFrame` — wraps sync in `asyncio.to_thread` with defensive `df.copy()` on shared data +- [ ] `ColumnGeneratorFullColumn`: add `async agenerate(data: DataFrame) -> DataFrame` — wraps sync in `asyncio.to_thread` with defensive `df.copy()` (see Risks) - [ ] `ExpressionColumnGenerator`: inherits full-column async wrapper -- [ ] `SamplerColumnGenerator`: inherits from-scratch async wrapper -- [ ] `SeedDatasetColumnGenerator`: inherits from-scratch async wrapper +- [ ] `SamplerColumnGenerator`: inherits from-scratch async wrapper; `is_stateful = False` +- [ ] `SeedDatasetColumnGenerator`: inherits from-scratch async wrapper; `is_stateful = True` (maintains DuckDB batch reader cursor and leftover-row buffer) - [ ] `ValidationColumnGenerator`: inherits full-column async wrapper -- [ ] `CustomColumnGenerator`: inherits whichever strategy it uses +- [ ] `CustomColumnGenerator`: inherits whichever strategy it uses; `is_stateful` should be overridable by custom implementations - [ ] `ImageCellGenerator`, `EmbeddingCellGenerator`: add native `agenerate` using `model.agenerate_image` / `model.agenerate_text_embeddings` - [ ] Base class `ColumnGenerator.agenerate` fallback (already exists via `asyncio.to_thread`) is sufficient for non-LLM generators @@ -188,8 +264,10 @@ Adapt `DatasetBatchManager` for concurrent row group processing. - Option A: Multiple buffer instances (one per active row group) - Option B: Single shared buffer partitioned by row group offset ranges - Recommendation: **Option A** — cleaner isolation, each row group has its own `list[dict]` -- [ ] `update_cell(row_group: int, row_index: int, column: str, value: Any)` — fine-grained update (vs. replacing entire record dict) - - Or keep the current `update_record(index, record_dict)` approach per row group +- [ ] `update_cell(row_group: int, row_index: int, column: str, value: Any)` — cell-level + merge is the only write path for the async builder. Whole-record replacement + (`update_record`) is unsafe under parallel execution (two independent columns + finishing the same row concurrently would clobber each other's results) - [ ] `checkpoint_row_group(row_group: int)` — write parquet, free memory - [ ] Preserve `drop_records` semantics within each row group - [ ] Keep backward compatibility with sync path (the existing `DatasetBatchManager` is untouched) @@ -206,7 +284,8 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. 3. Create `CompletionTracker`, `AsyncTaskScheduler` 4. Run scheduler on the background event loop (reuse `_ensure_async_engine_loop()`) 5. Scheduler handles checkpointing via callbacks -- [ ] `build()` dispatches to `_build_async()` when `DATA_DESIGNER_ASYNC_ENGINE=1`, else existing sync path +- [ ] `build()` dispatches to `_build_async()` when `DATA_DESIGNER_ASYNC_ENGINE=1` + **and** no column config has `allow_resize=True`; else existing sync path - [ ] `build_preview()` uses the same async path (single row group, no checkpoint) - [ ] Error handling: `DatasetGenerationError` wrapping, record dropping, telemetry events - [ ] Processor integration: @@ -218,10 +297,19 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. ### Step 8: Tests & Validation - [ ] Unit tests for each new module (dependency map, completion tracker, task model, scheduler) +- [ ] Dependency map: side-effect output columns resolve correctly (e.g., column + depending on `summary__trace` maps to a dependency on the `summary` generator) - [ ] Integration test: multi-column config with known dependencies, verify parallel execution - [ ] Integration test: mixed cell-by-cell + full-column generators - [ ] Integration test: error rate shutdown - [ ] Integration test: checkpoint correctness (row groups written in order, parquet valid) +- [ ] Integration test: `allow_resize=True` falls back to sync path +- [ ] Integration test: stateful generator (`is_stateful=True`) serializes per-instance + across row groups; stateless generators run concurrently +- [ ] Integration test: retry salvage — transient failure is retried and succeeds; + non-retryable failure drops immediately; retry budget exhaustion drops correctly +- [ ] Integration test: throttling fairness — 429 on model key A does not stall + unrelated model key B tasks - [ ] Run `make test` — all existing tests pass - [ ] Run `make test-run-recipes` with `DATA_DESIGNER_ASYNC_ENGINE=1` - [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency @@ -241,15 +329,21 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - **Full-column generator ordering**: If two full-column generators have no mutual dependency, they could run in parallel on the same row group. This is safe as long - as they operate on independent columns. Need to ensure the DataFrame isn't mutated - concurrently — use `asyncio.to_thread` (which copies into a thread) or serialize - full-column tasks within a row group. + as they operate on independent columns. `asyncio.to_thread` passes **object + references**, not copies — if two full-column generators share the same DataFrame, + concurrent mutation is possible. Solution: pass `df.copy()` to each full-column + generator dispatched to a thread, and merge results back by column name. - **Pre-batch processors mutating data**: Pre-batch processors (e.g., schema transform) can add/remove/modify rows. This changes the row count and invalidates the completion tracker's row indices. Solution: treat pre-batch as a barrier that resets the tracker state for that row group (re-index rows after processor runs). +- **`allow_resize` incompatibility**: Any generator with `allow_resize=True` can change + the row count mid-pipeline, invalidating per-row completion state for all downstream + columns. Dynamic rescheduling and row identity tracking would be needed to support + this. **Async v1 falls back to the sync path** when any config uses `allow_resize`. + - **Backward compatibility**: The sync path must remain untouched. All new code is gated behind `DATA_DESIGNER_ASYNC_ENGINE=1` and sits in new modules. @@ -260,12 +354,15 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - Building a heavyweight static execution graph (PR #269's approach — we take the lightweight dynamic approach instead) - Removing the sync/threaded path (it stays as the default) +- Supporting `allow_resize=True` in the async path (falls back to sync; follow-up) ### Key insight from existing code Every column config already has a `required_columns` property that returns the column names referenced in its Jinja2 templates. This gives us explicit dependency -information without any config schema changes. The dependency map is just -`{col.name: set(col.required_columns) for col in configs}`. +information without any config schema changes. The dependency map starts as +`{col.name: set(col.required_columns) for col in configs}`, extended with a +side-effect output mapping so that references to columns like `summary__trace` +resolve to a dependency on the `summary` generator. ### On "dynamic" graph building The graph is implicit in the dependency map + completion tracker. We never From 13cfb34f3b19630b0f8eda78a2d76225ce5be128 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Tue, 24 Feb 2026 11:58:19 -0300 Subject: [PATCH 03/16] add symmetric generate/agenerate bridge and plugin compatibility notes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Only one of generate/agenerate needs to be implemented; base class bridges in both directions (to_thread for sync→async, asyncio.run for async→sync) - Document impact on column generator plugins, custom columns, and processor plugins (all backward-compatible, no changes required) --- plans/346/async-generators-and-task-queue.md | 41 +++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index b1faf993..dfe5324e 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -241,6 +241,21 @@ The core orchestrator that replaces `_run_batch` for the async path. Make all generator types async-capable and declare statefulness. +**Symmetric `generate` / `agenerate` contract**: only one of the two methods needs +to be implemented. The base class provides automatic bridging in both directions: +- If only `generate()` is implemented → `agenerate()` wraps it via `asyncio.to_thread` + (already exists from PR #280). +- If only `agenerate()` is implemented → `generate()` wraps it via `asyncio.run()` + (new fallback). This is safe because the sync builder never has a running event loop. + +This means sync-first generators (most built-ins, existing plugins) work unchanged, +and async-first generators (new plugins doing native async I/O) only need to implement +`agenerate()` without writing a redundant sync version. + +- [ ] Add symmetric bridging on the base `ColumnGenerator`: + - `agenerate()` default: `asyncio.to_thread(self.generate, data)` (already exists) + - `generate()` default: `asyncio.run(self.agenerate(data))` (new, for async-first generators) + - Detect which one the subclass overrides to avoid infinite recursion - [ ] Add `is_stateful` property to base `ColumnGenerator` (default `False`). Stateful generators are serialized per-instance by the scheduler. - [ ] `ColumnGeneratorWithModelChatCompletion.agenerate` — already implemented (PR #280), no changes needed @@ -250,9 +265,8 @@ Make all generator types async-capable and declare statefulness. - [ ] `SamplerColumnGenerator`: inherits from-scratch async wrapper; `is_stateful = False` - [ ] `SeedDatasetColumnGenerator`: inherits from-scratch async wrapper; `is_stateful = True` (maintains DuckDB batch reader cursor and leftover-row buffer) - [ ] `ValidationColumnGenerator`: inherits full-column async wrapper -- [ ] `CustomColumnGenerator`: inherits whichever strategy it uses; `is_stateful` should be overridable by custom implementations +- [ ] `CustomColumnGenerator`: inherits whichever strategy it uses; `is_stateful` should be overridable by custom implementations. For `@custom_column_generator` functions, detect `asyncio.iscoroutinefunction` and call directly if async. - [ ] `ImageCellGenerator`, `EmbeddingCellGenerator`: add native `agenerate` using `model.agenerate_image` / `model.agenerate_text_embeddings` -- [ ] Base class `ColumnGenerator.agenerate` fallback (already exists via `asyncio.to_thread`) is sufficient for non-LLM generators **Files**: `generators/base.py`, `generators/expression.py`, `generators/samplers.py`, `generators/seed_dataset.py`, `generators/image.py`, `generators/embedding.py`, tests @@ -356,6 +370,29 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - Removing the sync/threaded path (it stays as the default) - Supporting `allow_resize=True` in the async path (falls back to sync; follow-up) +### Impact on plugins and custom columns + +This change is **backward-compatible** with all existing plugins and custom columns. +No plugin author needs to modify their code for it to work under the async scheduler. + +**Column generator plugins** (registered via entry points): plugins subclass one of +the base generator classes and implement `generate()`. The base class `agenerate()` +fallback wraps `generate()` in `asyncio.to_thread`, so every existing plugin +automatically gets async support. Plugins that want native async performance can +optionally override `agenerate()` instead — the symmetric bridging means they don't +need to implement `generate()` at all. The `is_stateful` property defaults to `False`, +which is correct for most plugins; stateful plugins can override it. + +**Custom columns** (`@custom_column_generator`): user-provided sync functions are +wrapped in `asyncio.to_thread` by the framework. If the user provides an async +function, `CustomColumnGenerator` detects this via `asyncio.iscoroutinefunction` +and calls it directly as a coroutine — no thread pool overhead. + +**Processor plugins** (`process_before_batch`, `process_after_batch`, +`process_after_generation`): processors run at barrier points in the scheduling loop +where no column generation is concurrent. They remain purely synchronous and are +unaffected by this change. + ### Key insight from existing code Every column config already has a `required_columns` property that returns the column names referenced in its Jinja2 templates. This gives us explicit dependency From d3fce38f8a1c941ca7771750a830d00029f25217 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Tue, 24 Feb 2026 12:13:58 -0300 Subject: [PATCH 04/16] add reference diagrams and clarify statefulness concept - Add plans/346/diagrams.md with 5 Mermaid diagrams: task lifecycle, scheduler main loop, dependency resolution example, concurrency layers, and row group pipelining - Clarify in plan that statefulness (concurrency safety) and sync/async (I/O model) are orthogonal concerns --- plans/346/async-generators-and-task-queue.md | 7 + plans/346/diagrams.md | 186 +++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 plans/346/diagrams.md diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index dfe5324e..e781612b 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -104,6 +104,13 @@ throttling without the two systems fighting each other. ### 5. Generator statefulness and reentrancy +Statefulness and sync/async are orthogonal concerns. Sync vs async is about the +**I/O model** — whether the underlying work is blocking (needs a thread) or +non-blocking (native coroutine). Statefulness is about **concurrency safety** — +whether multiple calls to the same generator instance can safely overlap. A +generator can be async but stateful (e.g., a cursor over an async database), or +sync but stateless (e.g., a random sampler). + Generators declare whether they are stateful via an `is_stateful` property on the base class (default `False`). Stateful generators maintain internal state across calls (e.g., `SeedDatasetColumnGenerator` has a DuckDB batch reader diff --git a/plans/346/diagrams.md b/plans/346/diagrams.md new file mode 100644 index 00000000..c0e543a7 --- /dev/null +++ b/plans/346/diagrams.md @@ -0,0 +1,186 @@ +# Async Generators & Task Queue — Reference Diagrams + +## 1. Task Lifecycle + +How a single task flows through the scheduler, from dispatch to completion or failure. + +```mermaid +flowchart TD + R[Ready Queue] --> A[Acquire scheduler slot] + A --> P[Prepare request] + P --> S{LLM-bound?} + + S -->|No| E[Execute task] + S -->|Yes| RS[Release scheduler slot] + RS --> T[Acquire throttle permit\nfrom Throttle Manager\nkeyed by provider+model] + T --> E + + E --> O{Outcome} + + O -->|Success| MC[Write result via update_cell] + MC --> CT[Mark complete in tracker] + CT --> W[Wake scheduler\nto check new ready tasks] + + O -->|Retryable failure\n429, 500, timeout| D[Deferred queue\nattempt++, backoff + jitter] + + O -->|Non-retryable failure\n400, validation, schema| DR[Mark row as dropped] + DR --> W +``` + +## 2. Scheduler Main Loop + +The overall orchestration flow from start to row group checkpoint. + +```mermaid +flowchart TD + START[Start] --> SEED[Dispatch from_scratch tasks] + SEED --> SEED_CHECK{Stateful generator?} + SEED_CHECK -->|Yes| SER[Serialize per-instance\nrow group N before N+1] + SEED_CHECK -->|No| PAR[Dispatch all row groups\nconcurrently] + SER --> PRE + PAR --> PRE + + PRE[Pre-batch barrier\nrun processors, reset tracker] --> LOOP + + LOOP[Query tracker:\nget_ready_tasks] --> READY{Tasks ready?} + + READY -->|Yes| DISPATCH[Dispatch tasks\nbehind scheduler semaphore] + DISPATCH --> COMPLETE[On completion:\nupdate tracker] + COMPLETE --> LOOP + + READY -->|No| DEFERRED{Deferred queue\nnon-empty?} + + DEFERRED -->|Yes, rounds left| SALVAGE[Run salvage round\nover retryable failures] + SALVAGE --> LOOP + + DEFERRED -->|No, or budget exhausted| RG_CHECK{Row group\ncomplete?} + + RG_CHECK -->|Yes| POST[Post-batch processors] + POST --> CP[Checkpoint to parquet\nfree memory] + CP --> DONE{All row groups\ndone?} + + RG_CHECK -->|No| WAIT[Wait for in-flight\ntasks to complete] + WAIT --> LOOP + + DONE -->|Yes| FIN[Done] + DONE -->|No| LOOP +``` + +## 3. Dependency Resolution Example + +A concrete pipeline with 5 columns showing parallel execution opportunities. +Columns `B` and `C` are independent and run in parallel once `A` completes. + +```mermaid +gantt + title Row Group 0 — Task Execution Timeline + dateFormat YYYY-MM-DD + axisFormat %d + tickInterval 1day + + section Seed + A (from_scratch) :a, 2026-01-01, 2d + + section Pre-batch + Pre-batch processors :pre, after a, 1d + + section Cell-by-cell + B (LLM text, depends on A) :b, after pre, 4d + C (LLM judge, depends on A) :c, after pre, 3d + + section Full-column + D (expression, depends on B+C) :d, after b, 1d + + section Post-batch + E (validation, depends on D) :e, after d, 1d + Post-batch + checkpoint :post, after e, 1d +``` + +Dependency map for this example: +``` +A: {} ← no dependencies, from_scratch +B: {A} ← cell-by-cell, waits for A per row +C: {A} ← cell-by-cell, waits for A per row (parallel with B) +D: {B, C} ← full-column, waits for B+C on ALL rows +E: {D} ← full-column, waits for D +``` + +## 4. Concurrency Layers + +The two-layer design: scheduler semaphore (coarse resource guard) and throttle +manager (per-key API concurrency). Tasks release scheduler slots while waiting +for throttle permits. + +```mermaid +flowchart TB + S[Async Scheduler] --> Q[Ready Task Queue] + Q --> W[Scheduler Semaphore\ncoarse cap ~128] + + W --> T{LLM-bound?} + + T -->|No| N[Run non-LLM task\nexpression, sampler, etc.] + N --> C[Completion Tracker] + + T -->|Yes| REL[Release scheduler slot] + REL --> A[Acquire permit from\nThrottle Manager\nkeyed by provider+model+domain] + A --> R[ModelClient adapter call] + R --> P[Provider API] + P --> X{429?} + X -->|Yes| D[AIMD decrease\ncooldown + retry] + D --> A + X -->|No| U[AIMD increase] + U --> C +``` + +### Failure mode this design avoids + +Without slot release, a throttled key starves unrelated keys: + +```mermaid +flowchart TB + G[Single global semaphore\nno slot release] --> H[Many tasks for throttled key A] + H --> I[Key A hits 429, backs off] + I --> J[Tasks wait/retry\nwhile holding scheduler slots] + J --> K[Unrelated key B tasks\ndelayed or starved] + + style G fill:#fee,stroke:#c33 + style K fill:#fee,stroke:#c33 +``` + +## 5. Row Group Pipelining + +Multiple row groups overlap — row group 1 starts its independent columns while +row group 0 is still finishing later columns. + +```mermaid +gantt + title Cross-Row-Group Pipelining + dateFormat YYYY-MM-DD + axisFormat %d + tickInterval 1day + + section Row Group 0 + RG0 seed (A) :rg0a, 2026-01-01, 2d + RG0 col B :rg0b, after rg0a, 4d + RG0 col C :rg0c, after rg0a, 3d + RG0 col D :rg0d, after rg0b, 1d + RG0 checkpoint :rg0cp, after rg0d, 1d + + section Row Group 1 + RG1 seed (A) :rg1a, after rg0a, 2d + RG1 col B :rg1b, after rg1a, 4d + RG1 col C :rg1c, after rg1a, 3d + RG1 col D :rg1d, after rg1b, 1d + RG1 checkpoint :rg1cp, after rg1d, 1d + + section Row Group 2 + RG2 seed (A) :rg2a, after rg1a, 2d + RG2 col B :rg2b, after rg2a, 4d + RG2 col C :rg2c, after rg2a, 3d + RG2 col D :rg2d, after rg2b, 1d + RG2 checkpoint :rg2cp, after rg2d, 1d +``` + +Note: seed (A) is shown staggered because `SeedDatasetColumnGenerator` is +stateful (`is_stateful = True`), so row groups serialize for that generator. +Columns B and C are stateless and pipeline freely across row groups. From 3b3842ff52e240e5104997799acf82bb81d59b0b Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Tue, 24 Feb 2026 12:38:58 -0300 Subject: [PATCH 05/16] add edge case handling and open questions - Eager row-drop propagation: failed rows are dropped across all columns to avoid wasting compute on incomplete rows - Out-of-order row group checkpointing via index-based file naming - Pre-batch processor failure skips the entire row group - Salvage rounds get separate error threshold and config knobs - Undersized last row group note - Open questions: thread pool sizing, silent task hangs --- plans/346/async-generators-and-task-queue.md | 73 ++++++++++++++++---- 1 file changed, 59 insertions(+), 14 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index e781612b..03718d42 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -79,6 +79,12 @@ Rows are partitioned into groups of `buffer_size` (same as current batches). When all tasks for a row group are complete, write to parquet and free memory. This preserves the current checkpoint/memory semantics. +Row groups may complete **out of order** (e.g., row group 2 finishes before row +group 1 if RG1 has a slow column). Checkpoint writes use the row group index for +file naming (`batch_0.parquet`, `batch_1.parquet`, etc.), so out-of-order writes +produce correctly named files. When loading the final dataset, files are read in +index order, so row ordering is preserved regardless of write order. + Full-column generators operate on their entire row group at once, same as today. ### 4. Concurrency control @@ -125,7 +131,8 @@ generators declare their own contract. ### 6. Pre/post-batch processors - **Pre-batch**: runs after seed generators complete for a row group, before - other columns. Modeled as a barrier task for the row group. + other columns. Modeled as a barrier task for the row group. If a pre-batch + processor fails, the entire row group is skipped. - **Post-batch**: runs after all columns complete for a row group, before checkpoint write. @@ -141,11 +148,15 @@ would otherwise be lost. `attempt` count, `next_eligible_at` timestamp, and exponential backoff + jitter. 3. **Scheduling priority**: normal ready tasks are dispatched first. When the ready queue drains, the scheduler runs up to `N` salvage rounds over deferred - tasks (configurable via `async_scheduler_max_retries`, default 2). -4. **Throttle-aware**: retries re-enter the throttle manager acquire path, so + tasks (configurable via `async_salvage_max_rounds`, default 2). +4. **Separate error threshold**: salvage rounds use their own error rate threshold + (e.g., `async_salvage_error_threshold=0.8`), independent of the main scheduling + loop, since higher failure rates are expected when retrying. +5. **Throttle-aware**: retries re-enter the throttle manager acquire path, so they don't exacerbate rate limiting. -5. **Final drop**: after retry budget is exhausted, mark the cell as failed and - the row as dropped. Continue row-group completion checks over remaining rows. +6. **Final drop**: after retry budget is exhausted, mark the cell as failed and + the row as dropped (via eager row-drop propagation). Continue row-group + completion checks over remaining rows. ### 8. `allow_resize` scoping @@ -196,8 +207,10 @@ A lightweight structure tracking which (column, row) pairs are done. - `mark_complete(column: str, row: int)` / `mark_batch_complete(column: str, row_group: int, row_group_size: int)` - `is_ready(column: str, row: int, dependency_map) -> bool` — checks all upstream columns for that row - `is_batch_ready(column: str, row_group: int, row_group_size: int, dependency_map) -> bool` — checks all rows in group - - `is_row_group_complete(row_group: int, row_group_size: int, all_columns: list[str]) -> bool` — all columns done for all rows - - `get_ready_tasks(dependency_map, columns_with_strategy, row_groups) -> list[Task]` — yields all currently dispatchable tasks + - `drop_row(row_group: int, row_index: int)` — marks row as dropped across all columns; + `get_ready_tasks` skips dropped rows, in-flight tasks for dropped rows are ignored on completion + - `is_row_group_complete(row_group: int, row_group_size: int, all_columns: list[str]) -> bool` — all non-dropped rows have all columns done + - `get_ready_tasks(dependency_map, columns_with_strategy, row_groups) -> list[Task]` — yields all currently dispatchable tasks, excluding dropped rows - [ ] No locks needed: all access is from the single asyncio event loop thread - [ ] Unit tests @@ -331,6 +344,12 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. non-retryable failure drops immediately; retry budget exhaustion drops correctly - [ ] Integration test: throttling fairness — 429 on model key A does not stall unrelated model key B tasks +- [ ] Integration test: eager row-drop — failure on column B drops the row across + all columns, independent column C does not process the dropped row +- [ ] Integration test: out-of-order row group completion produces correctly named + parquet files; final dataset loads in correct row order +- [ ] Integration test: pre-batch processor failure skips the row group, remaining + row groups continue - [ ] Run `make test` — all existing tests pass - [ ] Run `make test-run-recipes` with `DATA_DESIGNER_ASYNC_ENGINE=1` - [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency @@ -341,12 +360,15 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. peak memory. Mitigation: limit max concurrent row groups (e.g., 2-3) via a separate semaphore or by feeding row groups into the scheduler incrementally. -- **Record dropping across columns**: If row 5 fails on column B, we must prevent - column C from processing row 5. The completion tracker naturally handles this — a - failed task is never marked complete, so downstream tasks never become ready. But we - need to detect when a row group is "done enough" to checkpoint (some rows dropped). - Solution: track dropped rows per row group; row group is complete when all non-dropped - rows have all columns done. +- **Eager row-drop propagation**: When a task fails non-recoverably (non-retryable, + or retry budget exhausted), the **entire row** must be marked as dropped across all + columns — not just the failed column. Otherwise, independent columns that don't + depend on the failed column will continue processing that row, wasting compute on + a row that can never be complete. The completion tracker needs a `drop_row(row_group, + row_index)` method that cancels/skips all pending and in-flight tasks for that row. + Retryable failures go to the deferred queue first; eager drop only happens after + retries are exhausted. Row group is complete when all non-dropped rows have all + columns done. - **Full-column generator ordering**: If two full-column generators have no mutual dependency, they could run in parallel on the same row group. This is safe as long @@ -358,7 +380,14 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - **Pre-batch processors mutating data**: Pre-batch processors (e.g., schema transform) can add/remove/modify rows. This changes the row count and invalidates the completion tracker's row indices. Solution: treat pre-batch as a barrier that resets the tracker - state for that row group (re-index rows after processor runs). + state for that row group (re-index rows after processor runs). If a pre-batch + processor **fails**, the entire row group is skipped (treated as a fatal row-group + error — log, skip, continue with remaining row groups). + +- **Undersized last row group**: If `num_records` is not a multiple of `buffer_size`, + the last row group has fewer rows. This is the same as the sync path and should not + require special handling, but full-column generators and batch-level logic must not + assume uniform row group sizes. - **`allow_resize` incompatibility**: Any generator with `allow_resize=True` can change the row count mid-pipeline, invalidating per-row completion state for all downstream @@ -368,6 +397,22 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - **Backward compatibility**: The sync path must remain untouched. All new code is gated behind `DATA_DESIGNER_ASYNC_ENGINE=1` and sits in new modules. +## Open Questions + +- **Thread pool sizing**: sync generators wrapped in `asyncio.to_thread` use Python's + default thread pool executor (typically `min(32, cpu_count + 4)` threads). If many + sync generators run concurrently, the pool could become a bottleneck even though the + scheduler semaphore hasn't been reached. Should we explicitly size the executor to + match the scheduler's concurrency cap, or is the default sufficient? + +- **Silent task hangs**: A sync generator wrapped in `asyncio.to_thread` could hang + or crash silently (e.g., native code segfault, deadlocked I/O). Per-task timeouts + would catch this, but risk false positives on legitimately long-running tasks (large + batches, big context windows). For v1, rely on upstream timeouts (HTTP timeouts on + LLM calls, generator-level timeouts) and the error rate shutdown for detecting + no-progress states. Should we add per-task timeouts with per-generator overrides, or + is upstream timeout coverage sufficient? + ## Notes ### What we're NOT doing in this PR From 64c8bbec37a1d7ff66f0e53a030918118347925d Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Tue, 24 Feb 2026 14:02:08 -0300 Subject: [PATCH 06/16] refine async scheduler plan safeguards - add submission budget controls to prevent unbounded parked tasks - clarify DAG validation, safe async-to-sync bridging, and row-scoped drop policy - align diagrams and unresolved risk wording with latest design decisions --- plans/346/async-generators-and-task-queue.md | 97 ++++++++++++++------ plans/346/diagrams.md | 9 +- 2 files changed, 72 insertions(+), 34 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 03718d42..b1698342 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -89,24 +89,29 @@ Full-column generators operate on their entire row group at once, same as today. ### 4. Concurrency control -Two independent layers: +Three independent layers: 1. **Scheduler semaphore** — a coarse resource guard bounding total in-flight - tasks to limit CPU/memory pressure (e.g., configurable cap, default ~128). - This is **not** the source of truth for API concurrency; it only prevents - unbounded coroutine fan-out. + active execution to limit CPU/memory pressure (e.g., configurable cap, default + ~128). This is **not** the source of truth for API concurrency. 2. **Throttle manager** (from PR #344) — gates every outbound LLM call, keyed by `provider+model(+domain)`. Dynamically adjusts per-key limits on 429s via AIMD. This is the real API concurrency control. +3. **Submission budget** — a hard cap on "submitted but not finished" tasks + (running + waiting on throttle/backoff), e.g., `async_scheduler_max_submitted_tasks`. + This prevents unbounded parked coroutines when tasks release scheduler slots + before throttle acquire. + Tasks must **not hold scheduler slots while waiting on throttle backoff**. A task acquires a scheduler slot, prepares its request, then releases the slot before awaiting the throttle permit. This ensures a throttled model key doesn't starve unrelated keys by hogging scheduler capacity. -This replaces per-column fan-out and composes cleanly with PR #344's adaptive -throttling without the two systems fighting each other. +Task admission is therefore bounded by the submission budget, while active +execution is bounded by the scheduler semaphore. This composes cleanly with +PR #344's adaptive throttling without the two systems fighting each other. ### 5. Generator statefulness and reentrancy @@ -191,9 +196,10 @@ Build the dependency map from column configs at builder init time. - Also register side-effect output columns (`__trace`, `__reasoning_content`, etc.) and map them back to their producer column, so downstream references resolve correctly - For `MultiColumnConfig`, all sub-columns share the same dependencies - - Validate: every required column must appear earlier in the config list or be a - registered side-effect output (detect cycles / missing refs) -- [ ] Add `topological_order(dependency_map) -> list[str]` — returns columns in valid execution order (should match config order; this is a validation step) + - Validate: every required column must resolve to a known producer (including + registered side-effect outputs), and the graph must be acyclic +- [ ] Add `topological_order(dependency_map) -> list[str]` — returns a valid DAG + execution order used for validation (not required to match config declaration order) - [ ] Unit tests for dependency map construction and validation **Files**: new module `engine/dataset_builders/utils/dependency_map.py`, tests @@ -235,7 +241,7 @@ Simple dataclass representing a unit of work. The core orchestrator that replaces `_run_batch` for the async path. - [ ] `AsyncTaskScheduler` class: - - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit, error/result callbacks + - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit, submission budget, error/result callbacks - `async run()` — main loop: 1. Dispatch `from_scratch` tasks, respecting `is_stateful`: stateful generators serialize per-instance (row group N completes before N+1 starts for that @@ -244,10 +250,13 @@ The core orchestrator that replaces `_run_batch` for the async path. `asyncio.create_task()` behind scheduler semaphore → on completion, update tracker → repeat until all tasks done or early shutdown 3. When ready queue drains, run salvage rounds over deferred retryable failures - (up to `async_scheduler_max_retries` rounds) + (up to `async_salvage_max_rounds` rounds) 4. After each row group completes: run post-batch processors, checkpoint - Task dispatch: acquire scheduler semaphore slot → prepare request → release slot → await throttle permit (for LLM tasks) → execute → write result + - Admission control: never allow more than `async_scheduler_max_submitted_tasks` + tasks in submitted/running/waiting states; hold remaining ready tasks in the + scheduler queue until slots free up - Error handling: classify failures as retryable vs non-retryable; retryable go to deferred queue with backoff; same early-shutdown logic as `AsyncConcurrentExecutor` (error rate threshold within sliding window) @@ -265,8 +274,12 @@ Make all generator types async-capable and declare statefulness. to be implemented. The base class provides automatic bridging in both directions: - If only `generate()` is implemented → `agenerate()` wraps it via `asyncio.to_thread` (already exists from PR #280). -- If only `agenerate()` is implemented → `generate()` wraps it via `asyncio.run()` - (new fallback). This is safe because the sync builder never has a running event loop. +- If only `agenerate()` is implemented → `generate()` uses a safe sync runner + helper: + - no running loop in current thread: use `asyncio.run(self.agenerate(data))` + - running loop detected: submit to the builder's dedicated background event loop + thread via `asyncio.run_coroutine_threadsafe(...).result(timeout=...)` + This avoids nested-loop errors while keeping async-first plugins ergonomic. This means sync-first generators (most built-ins, existing plugins) work unchanged, and async-first generators (new plugins doing native async I/O) only need to implement @@ -274,7 +287,9 @@ and async-first generators (new plugins doing native async I/O) only need to imp - [ ] Add symmetric bridging on the base `ColumnGenerator`: - `agenerate()` default: `asyncio.to_thread(self.generate, data)` (already exists) - - `generate()` default: `asyncio.run(self.agenerate(data))` (new, for async-first generators) + - `generate()` default: call a safe sync runner helper that: + - uses `asyncio.run()` if no loop is running in the current thread + - otherwise submits to the background loop with `run_coroutine_threadsafe(...).result(timeout=...)` - Detect which one the subclass overrides to avoid infinite recursion - [ ] Add `is_stateful` property to base `ColumnGenerator` (default `False`). Stateful generators are serialized per-instance by the scheduler. @@ -344,8 +359,12 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. non-retryable failure drops immediately; retry budget exhaustion drops correctly - [ ] Integration test: throttling fairness — 429 on model key A does not stall unrelated model key B tasks +- [ ] Integration test: bounded submission — with many ready tasks and a tight + throttle key, submitted task count never exceeds `async_scheduler_max_submitted_tasks` - [ ] Integration test: eager row-drop — failure on column B drops the row across all columns, independent column C does not process the dropped row +- [ ] Integration test: row-drop with in-flight full-column task — completed task + may still compute dropped rows, but writeback is suppressed and row remains dropped - [ ] Integration test: out-of-order row group completion produces correctly named parquet files; final dataset loads in correct row order - [ ] Integration test: pre-batch processor failure skips the row group, remaining @@ -360,16 +379,38 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. peak memory. Mitigation: limit max concurrent row groups (e.g., 2-3) via a separate semaphore or by feeding row groups into the scheduler incrementally. +- **Unbounded parked coroutines during throttle waits**: Releasing scheduler slots + before throttle acquire improves fairness, but can create large numbers of parked + tasks if admission is not bounded. Mitigation: enforce + `async_scheduler_max_submitted_tasks` as a hard cap on submitted/running/waiting + tasks. + - **Eager row-drop propagation**: When a task fails non-recoverably (non-retryable, or retry budget exhausted), the **entire row** must be marked as dropped across all columns — not just the failed column. Otherwise, independent columns that don't depend on the failed column will continue processing that row, wasting compute on a row that can never be complete. The completion tracker needs a `drop_row(row_group, - row_index)` method that cancels/skips all pending and in-flight tasks for that row. + row_index)` method that skips all pending tasks for that row; in-flight tasks may + still complete but their writeback is suppressed once the row is marked dropped. Retryable failures go to the deferred queue first; eager drop only happens after retries are exhausted. Row group is complete when all non-dropped rows have all columns done. +- **Dropped rows vs in-flight batch/full-column work (v1 policy)**: preemptively + cancelling already-running full-column/batch tasks is complex and error-prone. + Async v1 keeps this simple: once a row is dropped, scheduler will not enqueue new + tasks for that row and all write paths must suppress writeback for dropped rows. + Already-running batch/full-column tasks may still compute values for dropped rows, + but those outputs are ignored. Dropped-row propagation is strictly row-scoped; + a row-group/batch is never dropped solely due to row-level failures. Track this + as "wasted work" telemetry for later optimization. + +- **Sync bridge in async-hosted contexts**: async-first generators need a safe + `generate()` fallback that works even when called from environments with an active + event loop (notebooks/services). Mitigation: use a sync runner helper that uses + `asyncio.run()` when safe, else routes through the dedicated background event loop + via `run_coroutine_threadsafe(...).result(timeout=...)`. + - **Full-column generator ordering**: If two full-column generators have no mutual dependency, they could run in parallel on the same row group. This is safe as long as they operate on independent columns. `asyncio.to_thread` passes **object @@ -397,21 +438,17 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - **Backward compatibility**: The sync path must remain untouched. All new code is gated behind `DATA_DESIGNER_ASYNC_ENGINE=1` and sits in new modules. -## Open Questions - -- **Thread pool sizing**: sync generators wrapped in `asyncio.to_thread` use Python's - default thread pool executor (typically `min(32, cpu_count + 4)` threads). If many - sync generators run concurrently, the pool could become a bottleneck even though the - scheduler semaphore hasn't been reached. Should we explicitly size the executor to - match the scheduler's concurrency cap, or is the default sufficient? - -- **Silent task hangs**: A sync generator wrapped in `asyncio.to_thread` could hang - or crash silently (e.g., native code segfault, deadlocked I/O). Per-task timeouts - would catch this, but risk false positives on legitimately long-running tasks (large - batches, big context windows). For v1, rely on upstream timeouts (HTTP timeouts on - LLM calls, generator-level timeouts) and the error rate shutdown for detecting - no-progress states. Should we add per-task timeouts with per-generator overrides, or - is upstream timeout coverage sufficient? +- **Thread pool sizing (unresolved)**: sync generators wrapped in `asyncio.to_thread` + use Python's default thread pool executor (typically `min(32, cpu_count + 4)`). + If many sync generators run concurrently, the pool could become a bottleneck even + when scheduler limits are higher. Decide whether to explicitly size the executor + to match scheduler caps, or keep defaults for v1. + +- **Silent task hangs (unresolved)**: a sync generator wrapped in `asyncio.to_thread` + could hang or stall indefinitely. Per-task timeouts catch this but may produce + false positives on valid long-running tasks. For v1, rely on upstream/model + timeouts and no-progress detection; evaluate optional per-generator timeout + overrides as follow-up if needed. ## Notes diff --git a/plans/346/diagrams.md b/plans/346/diagrams.md index c0e543a7..c914f77b 100644 --- a/plans/346/diagrams.md +++ b/plans/346/diagrams.md @@ -107,14 +107,15 @@ E: {D} ← full-column, waits for D ## 4. Concurrency Layers -The two-layer design: scheduler semaphore (coarse resource guard) and throttle -manager (per-key API concurrency). Tasks release scheduler slots while waiting -for throttle permits. +The three-layer design: submission budget (bounded admission), scheduler +semaphore (coarse active-execution guard), and throttle manager (per-key API +concurrency). Tasks release scheduler slots while waiting for throttle permits. ```mermaid flowchart TB S[Async Scheduler] --> Q[Ready Task Queue] - Q --> W[Scheduler Semaphore\ncoarse cap ~128] + Q --> B[Submission Budget\nmax submitted tasks] + B --> W[Scheduler Semaphore\ncoarse active cap ~128] W --> T{LLM-bound?} From 979e41770c34542544024f81a52bcd05873ab1e5 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 10:27:54 -0300 Subject: [PATCH 07/16] refine plan with UX considerations and design clarifications - add UX considerations section: progress display strategy, peak memory cap, new config knobs, async custom columns and plugin upgrade path, what stays the same - replace allow_resize silent fallback with explicit DatasetGenerationError at startup; move to Follow-ups section - consolidate all deferred work into Out of scope / Follow-ups subsections - fix five internal inconsistencies: progress tracking in Step 4, missing async_max_concurrent_row_groups in scheduler constructor, annotate _ensure_async_engine_loop as existing, SamplerColumnGenerator dual-wrapper scope (applies to all FromScratchColumnGenerator subclasses), stateful serialization vs row group admission clarification - resolve previously open decisions: asyncio.Event over Condition, task_model as own module, async_max_concurrent_row_groups default 3, async_salvage_max_attempts_per_task dropped in favour of max_rounds+1 semantics, thread pool keep default for v1 - fix CustomColumnGenerator FULL_COLUMN async path (needs own agenerate branching on strategy); note ValidationColumnGenerator internal threading --- plans/346/async-generators-and-task-queue.md | 177 ++++++++++++++----- 1 file changed, 137 insertions(+), 40 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index b1698342..db87eaca 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -133,6 +133,13 @@ dispatch all row groups concurrently. This is a generator-level attribute, not a type-level assumption. Custom generators declare their own contract. +The row group admission semaphore (`async_max_concurrent_row_groups`) and stateful +serialization are complementary, not conflicting: the semaphore controls how many row +groups are admitted into the scheduler at once; serialization controls the dispatch +order of seed tasks within the admitted set. A stateful generator with 3 row groups +admitted will still run their seeds in order (0 → 1 → 2); other columns in those row +groups remain free to pipeline. + ### 6. Pre/post-batch processors - **Pre-batch**: runs after seed generators complete for a row group, before @@ -152,8 +159,11 @@ would otherwise be lost. 2. **Deferred queue**: retryable failures are placed in a deferred queue with `attempt` count, `next_eligible_at` timestamp, and exponential backoff + jitter. 3. **Scheduling priority**: normal ready tasks are dispatched first. When the - ready queue drains, the scheduler runs up to `N` salvage rounds over deferred - tasks (configurable via `async_salvage_max_rounds`, default 2). + ready queue drains, the scheduler sweeps the deferred queue and re-dispatches + all tasks whose backoff has elapsed — this is one salvage round. The scheduler + runs at most `async_salvage_max_rounds` such rounds (default 2) before dropping + all remaining deferred tasks. A task that keeps failing is retried once per round, + so the maximum attempts per task is `async_salvage_max_rounds + 1`. 4. **Separate error threshold**: salvage rounds use their own error rate threshold (e.g., `async_salvage_error_threshold=0.8`), independent of the main scheduling loop, since higher failure rates are expected when retrying. @@ -170,9 +180,12 @@ lets any generator change the row count mid-pipeline, which invalidates all per-row completion state for downstream columns. Supporting this under parallel execution would require dynamic rescheduling and row identity tracking. -**Async v1 scope**: if any column config has `allow_resize=True`, the builder -falls back to the sync path. This is safe because resize is opt-in and the sync -path handles it naturally. Full async support for resize is a follow-up. +**Async v1 scope**: if any column config has `allow_resize=True` and +`DATA_DESIGNER_ASYNC_ENGINE=1`, the builder raises a `DatasetGenerationError` +at startup (before any generation begins). The user must either remove +`allow_resize=True` from their config or disable the async engine. Silent +fallback is intentionally avoided — the user should know these two settings are +incompatible and make an explicit choice (see Follow-ups). ## Success Criteria @@ -234,18 +247,22 @@ Simple dataclass representing a unit of work. - [ ] `TaskResult` with status, output, error info - [ ] Hashable so we can track dispatched/pending sets -**Files**: new module `engine/dataset_builders/utils/task_model.py` (or inline in scheduler) +**Files**: new module `engine/dataset_builders/utils/task_model.py` — must be its own module +since `CompletionTracker`, `AsyncTaskScheduler`, and the buffer manager all reference `Task`/`TaskResult`; +inlining would create import cycles. ### Step 4: Async Task Scheduler The core orchestrator that replaces `_run_batch` for the async path. - [ ] `AsyncTaskScheduler` class: - - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit, submission budget, error/result callbacks + - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit (`async_scheduler_max_submitted_tasks`), row group semaphore (`async_max_concurrent_row_groups`), salvage config, error/result callbacks - `async run()` — main loop: - 1. Dispatch `from_scratch` tasks, respecting `is_stateful`: stateful generators - serialize per-instance (row group N completes before N+1 starts for that - generator); stateless generators dispatch all row groups concurrently + 1. Acquire the row group semaphore (`async_max_concurrent_row_groups`) before + admitting a new row group's seed tasks. Dispatch `from_scratch` tasks, + respecting `is_stateful`: stateful generators serialize per-instance (row group + N's seed completes before N+1's seed starts for that generator); stateless + generators dispatch all admitted row groups concurrently 2. Loop: query `completion_tracker.get_ready_tasks()` → dispatch each via `asyncio.create_task()` behind scheduler semaphore → on completion, update tracker → repeat until all tasks done or early shutdown @@ -260,8 +277,15 @@ The core orchestrator that replaces `_run_batch` for the async path. - Error handling: classify failures as retryable vs non-retryable; retryable go to deferred queue with backoff; same early-shutdown logic as `AsyncConcurrentExecutor` (error rate threshold within sliding window) - - Progress tracking: reuse `ProgressTracker` per column -- [ ] Use `asyncio.Event` or `asyncio.Condition` to wake the scheduler when a task completes (avoids polling) + - Progress tracking: create one `ProgressTracker` per column for accounting + (success/failure counts, rate, ETA), but suppress per-completion interval logs + in async mode. A separate background coroutine (`asyncio.create_task`) emits a + single consolidated summary line every 10 seconds across all active columns; + it is cancelled once all tasks complete. See UX Considerations. +- [ ] Use `asyncio.Event` to wake the scheduler when a task completes (avoids polling). + `Event` is sufficient — the scheduler resets it and re-checks ready tasks on each wake; + `Condition` would be needed only if waiting on a specific predicate, which the tracker + already handles. - [ ] Unit tests with mock generators **Files**: new module `engine/dataset_builders/async_scheduler.py`, tests @@ -294,13 +318,28 @@ and async-first generators (new plugins doing native async I/O) only need to imp - [ ] Add `is_stateful` property to base `ColumnGenerator` (default `False`). Stateful generators are serialized per-instance by the scheduler. - [ ] `ColumnGeneratorWithModelChatCompletion.agenerate` — already implemented (PR #280), no changes needed -- [ ] `FromScratchColumnGenerator`: add `async agenerate_from_scratch(num_records) -> DataFrame` — wraps sync in `asyncio.to_thread` with defensive `df.copy()` on shared data -- [ ] `ColumnGeneratorFullColumn`: add `async agenerate(data: DataFrame) -> DataFrame` — wraps sync in `asyncio.to_thread` with defensive `df.copy()` (see Risks) +- [ ] `FromScratchColumnGenerator`: add both async wrappers — `async agenerate_from_scratch(num_records) -> DataFrame` + (wraps `generate_from_scratch` in `asyncio.to_thread`) and `async agenerate(data: DataFrame) -> DataFrame` + (wraps `generate` in `asyncio.to_thread` with defensive `df.copy()`). Both are needed because the + scheduler dispatches subclasses via either path depending on whether the buffer is empty. +- [ ] `ColumnGeneratorFullColumn`: add `async agenerate(data: DataFrame) -> DataFrame` — wraps sync in + `asyncio.to_thread` with defensive `df.copy()` (see Risks). This intentionally overrides the base + `ColumnGenerator.agenerate(dict)` with a DataFrame-typed signature; the scheduler dispatches the + correct variant based on generation strategy. - [ ] `ExpressionColumnGenerator`: inherits full-column async wrapper -- [ ] `SamplerColumnGenerator`: inherits from-scratch async wrapper; `is_stateful = False` -- [ ] `SeedDatasetColumnGenerator`: inherits from-scratch async wrapper; `is_stateful = True` (maintains DuckDB batch reader cursor and leftover-row buffer) -- [ ] `ValidationColumnGenerator`: inherits full-column async wrapper -- [ ] `CustomColumnGenerator`: inherits whichever strategy it uses; `is_stateful` should be overridable by custom implementations. For `@custom_column_generator` functions, detect `asyncio.iscoroutinefunction` and call directly if async. +- [ ] `SamplerColumnGenerator`: inherits both wrappers from `FromScratchColumnGenerator`; no custom implementation needed. `is_stateful = False` +- [ ] `SeedDatasetColumnGenerator`: inherits both wrappers from `FromScratchColumnGenerator`; no custom implementation needed. `is_stateful = True` (maintains DuckDB batch reader cursor and leftover-row buffer) +- [ ] `ValidationColumnGenerator`: inherits full-column async wrapper. Note: for `REMOTE` validators + with `max_parallel_requests > 1`, `generate()` internally uses `ConcurrentThreadExecutor`, so the + async wrapper spawns a thread that itself spawns more threads — bypassing the scheduler's concurrency + controls for those HTTP calls. Acceptable for v1 (see Follow-ups). +- [ ] `CustomColumnGenerator`: inherits directly from `ColumnGenerator` (not from `ColumnGeneratorCellByCell` + or `ColumnGeneratorFullColumn`), so it does not automatically inherit either async wrapper. Needs its own + `agenerate` that branches on strategy: + - `CELL_BY_CELL`: if the user function is a coroutine (`asyncio.iscoroutinefunction`), call it directly; + otherwise wrap in `asyncio.to_thread` + - `FULL_COLUMN`: wrap `generate(DataFrame)` in `asyncio.to_thread` with defensive `df.copy()` + `is_stateful` defaults to `False`; custom implementations can override it. - [ ] `ImageCellGenerator`, `EmbeddingCellGenerator`: add native `agenerate` using `model.agenerate_image` / `model.agenerate_text_embeddings` **Files**: `generators/base.py`, `generators/expression.py`, `generators/samplers.py`, `generators/seed_dataset.py`, `generators/image.py`, `generators/embedding.py`, tests @@ -331,10 +370,12 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. 1. Build dependency map from `self._column_configs` 2. Partition rows into row groups 3. Create `CompletionTracker`, `AsyncTaskScheduler` - 4. Run scheduler on the background event loop (reuse `_ensure_async_engine_loop()`) + 4. Run scheduler on the background event loop (reuse `_ensure_async_engine_loop()` + from `dataset_builders/utils/async_concurrency.py` — already exists) 5. Scheduler handles checkpointing via callbacks -- [ ] `build()` dispatches to `_build_async()` when `DATA_DESIGNER_ASYNC_ENGINE=1` - **and** no column config has `allow_resize=True`; else existing sync path +- [ ] `build()` raises `DatasetGenerationError` at startup if `DATA_DESIGNER_ASYNC_ENGINE=1` + and any column config has `allow_resize=True`, naming the offending column(s); + otherwise dispatches to `_build_async()` - [ ] `build_preview()` uses the same async path (single row group, no checkpoint) - [ ] Error handling: `DatasetGenerationError` wrapping, record dropping, telemetry events - [ ] Processor integration: @@ -352,7 +393,7 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - [ ] Integration test: mixed cell-by-cell + full-column generators - [ ] Integration test: error rate shutdown - [ ] Integration test: checkpoint correctness (row groups written in order, parquet valid) -- [ ] Integration test: `allow_resize=True` falls back to sync path +- [ ] Integration test: `allow_resize=True` with async engine raises `DatasetGenerationError` at startup, naming the column - [ ] Integration test: stateful generator (`is_stateful=True`) serializes per-instance across row groups; stateless generators run concurrently - [ ] Integration test: retry salvage — transient failure is retried and succeeds; @@ -376,8 +417,9 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. ## Risks & Considerations - **Memory with concurrent row groups**: Having multiple row groups in-flight increases - peak memory. Mitigation: limit max concurrent row groups (e.g., 2-3) via a separate - semaphore or by feeding row groups into the scheduler incrementally. + peak memory. Mitigation: a dedicated semaphore caps concurrent in-flight row groups, + controlled by `async_max_concurrent_row_groups` (default 3). The scheduler only admits + a new row group's seed tasks once a slot is available. - **Unbounded parked coroutines during throttle waits**: Releasing scheduler slots before throttle acquire improves fairness, but can create large numbers of parked @@ -402,8 +444,7 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. tasks for that row and all write paths must suppress writeback for dropped rows. Already-running batch/full-column tasks may still compute values for dropped rows, but those outputs are ignored. Dropped-row propagation is strictly row-scoped; - a row-group/batch is never dropped solely due to row-level failures. Track this - as "wasted work" telemetry for later optimization. + a row-group/batch is never dropped solely due to row-level failures. - **Sync bridge in async-hosted contexts**: async-first generators need a safe `generate()` fallback that works even when called from environments with an active @@ -433,31 +474,87 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. - **`allow_resize` incompatibility**: Any generator with `allow_resize=True` can change the row count mid-pipeline, invalidating per-row completion state for all downstream columns. Dynamic rescheduling and row identity tracking would be needed to support - this. **Async v1 falls back to the sync path** when any config uses `allow_resize`. + this. **Async v1 raises a `DatasetGenerationError` at startup** when any config uses + `allow_resize=True` with the async engine enabled, naming the offending column(s). + The user must resolve the conflict explicitly. - **Backward compatibility**: The sync path must remain untouched. All new code is gated behind `DATA_DESIGNER_ASYNC_ENGINE=1` and sits in new modules. -- **Thread pool sizing (unresolved)**: sync generators wrapped in `asyncio.to_thread` - use Python's default thread pool executor (typically `min(32, cpu_count + 4)`). - If many sync generators run concurrently, the pool could become a bottleneck even - when scheduler limits are higher. Decide whether to explicitly size the executor - to match scheduler caps, or keep defaults for v1. - -- **Silent task hangs (unresolved)**: a sync generator wrapped in `asyncio.to_thread` - could hang or stall indefinitely. Per-task timeouts catch this but may produce - false positives on valid long-running tasks. For v1, rely on upstream/model - timeouts and no-progress detection; evaluate optional per-generator timeout - overrides as follow-up if needed. +- **Thread pool sizing**: sync generators wrapped in `asyncio.to_thread` use Python's + default thread pool executor (`min(32, cpu_count + 4)`). For v1, keep the default — + the scheduler semaphore and row group cap already bound actual concurrency to levels + where the default pool is unlikely to be a bottleneck (see Follow-ups). + +- **Silent task hangs**: a sync generator wrapped in `asyncio.to_thread` could hang + indefinitely. For v1, rely on upstream model timeouts (see Follow-ups). + +## UX Considerations + +- **Interleaved log output**: with parallel columns and row groups, log lines will interleave. + All async log output should include `(column=X, row_group=N)` context so output remains + readable during debugging. + +- **Progress display**: in the async path, per-column `ProgressTracker` interval logs are + suppressed to avoid interleaved noise. Instead, the scheduler runs a lightweight background + coroutine (`asyncio.create_task`) that emits a single consolidated summary line on a fixed + timer (e.g., every 10 seconds): + ``` + Progress: col_A 45/100 (45%, 2.1 rec/s) | col_B 32/100 (32%) | col_C 78/100 (78%, eta 12s) + ``` + The coroutine reads completion counters from existing `ProgressTracker` instances and is + cancelled once all tasks are done. The sync path is unchanged. + +- **Peak memory**: multiple in-flight row groups increase peak memory. The cap is + `async_max_concurrent_row_groups` (default 3), exposed so users can lower it in + memory-constrained environments. + +- **New config knobs**: `async_scheduler_max_submitted_tasks`, `async_max_concurrent_row_groups`, + `async_salvage_max_rounds`, and `async_salvage_error_threshold` are new parameters users may + encounter in error messages or docs. Each must have a sensible default; users should not need + to tune them in the common case. + +- **Async custom columns**: users can now write `async def` functions with + `@custom_column_generator` and get native async execution without thread overhead. Worth + surfacing in the changelog and docs. + +- **Plugin async upgrade path**: plugin authors who want native async performance can + override `agenerate()` instead of `generate()` — the symmetric bridging means they don't + need to implement both. Stateful plugins should override `is_stateful = True`. Worth + documenting in the plugin authoring guide. + +### What stays the same + +- **Config API**: no schema changes — existing configs work without modification. +- **Sync path**: `DATA_DESIGNER_ASYNC_ENGINE=0` (the default) is untouched; existing users + see no behavioral change. +- **Existing plugins and sync custom columns**: all continue to work unchanged. +- **Row ordering**: the final dataset rows are always in the declared order regardless of + out-of-order row group completion. +- **Checkpoint file naming and format**: parquet files use the same naming scheme and schema. ## Notes -### What we're NOT doing in this PR +### Out of scope for this PR - Overhauling `ModelFacade` internals (PR #344's scope) - Building a heavyweight static execution graph (PR #269's approach — we take the lightweight dynamic approach instead) - Removing the sync/threaded path (it stays as the default) -- Supporting `allow_resize=True` in the async path (falls back to sync; follow-up) + +### Follow-ups +- **`allow_resize` async support**: currently raises at startup when the async engine is + enabled; full support requires dynamic rescheduling and row identity tracking. +- **Native async `RemoteValidator`**: `ValidationColumnGenerator` wraps `generate()` in + `asyncio.to_thread`, which spawns a thread that itself uses `ConcurrentThreadExecutor` + for parallel HTTP calls, bypassing the scheduler's concurrency controls. Fix: native + async `agenerate` on `RemoteValidator`. +- **Per-generator task timeouts**: sync generators wrapped in `asyncio.to_thread` can + hang indefinitely. For v1 we rely on upstream model timeouts; optional per-generator + timeout overrides are the follow-up. +- **Wasted-work telemetry**: in-flight full-column/batch tasks continue computing after + a row is dropped; add telemetry to track compute wasted on dropped rows. +- **Thread pool sizing**: if profiling shows saturation of the default executor + (`min(32, cpu_count + 4)`), explicitly size it to match the scheduler caps. ### Impact on plugins and custom columns From 7765b7a20a8e4b3fc8361bfa69e78150bea52f2d Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 10:59:18 -0300 Subject: [PATCH 08/16] document relation to PR #269 and fix scheduler diagram - add "Relation to PR #269" section explaining what we adopted (dependency source, trait inference, completion tracker design, statefulness separation) and what we changed (row-group tasks instead of cell-level nodes, ROW_STREAMABLE omitted) - fix scheduler main loop diagram: add async_max_concurrent_row_groups admission step, pre-batch failure path (skip row group + release slot), and loop back to ADMIT after row group completion --- plans/346/async-generators-and-task-queue.md | 51 ++++++++++++++++++++ plans/346/diagrams.md | 14 ++++-- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index db87eaca..0a43a6a6 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -533,6 +533,57 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. out-of-order row group completion. - **Checkpoint file naming and format**: parquet files use the same naming scheme and schema. +## Relation to PR #269 + +PR #269 ("feat: add execution graph builder plan with reference implementation") is a +companion design by @johnnygreco that we reviewed before finalising this plan. It proposes +a static `ExecutionGraph` with typed node IDs (`CellNodeId`, `BarrierNodeId`), an +`ExecutionTraits` flag enum, and a `CompletionTracker`. It intentionally stops at the +graph/tracker layer; this plan covers the full stack on top of that foundation. + +### What we adopted + +- **Dependency source**: derive the graph from `required_columns` on existing configs, + extended with a side-effect mapping for columns like `summary__trace`. No config schema + changes in either approach. +- **Trait inference from properties, not class names**: `GraphBuilder._infer_traits()` + inspects `can_generate_from_scratch` and `get_generation_strategy()` rather than + matching class names. We apply the same principle, keeping plugin generators compatible. +- **Lightweight completion tracking**: a `dict[str, set[int]]` mapping column → completed + rows, rather than materialising O(C × R) cell-level state. Our `CompletionTracker` + follows the same design. +- **Statefulness as a separate concern from execution strategy**: PR #269 separates + execution traits (how a generator runs) from per-instance concurrency safety. We + formalise this with the `is_stateful` property. + +### What we changed, and why + +**Row-group tasks instead of cell-level nodes.** PR #269 models every `(row, column)` pair +as a virtual `CellNodeId`. Full-column generators become `BARRIER` nodes — a synthetic +node that must complete before any output cells are ready. This faithfully models +dependencies but creates a problem the PR itself flags as an open issue: a validation +column anywhere in the pipeline blocks all checkpointing until the entire dataset +completes, because no row is "done" until every column, including the barrier, finishes. + +We scope full-column tasks to a **row group** (the existing `buffer_size` batch). The +effective barrier is just the FULL_COLUMN task waiting for all rows *in that group* — not +the whole dataset. Checkpoints happen as each row group completes, so a failure mid-run +loses at most one batch. + +**`ROW_STREAMABLE` trait omitted.** PR #269 introduces `is_row_streamable` so full-column +generators that process rows independently (e.g., `ExpressionColumnGenerator`) can be +scheduled row-by-row, recovering some pipelining within a barrier. In our row-group model +this is unnecessary: even a full-column generator only blocks one batch, preserving +checkpoint cadence without subdividing tasks further. Expression columns run in +microseconds and are never the scheduling bottleneck. We note this as a potential +follow-up if profiling shows otherwise. + +**Scheduler and concurrency layers added.** PR #269 deliberately stops at the graph and +tracker. Steps 1–3 of this plan (dependency map, completion tracker, task model) are +directly informed by PR #269 and we treat it as the reference design for that layer. The +remaining steps — scheduler, concurrency controls, retry/salvage, buffer manager, and +builder integration — extend that foundation to a deployable implementation. + ## Notes ### Out of scope for this PR diff --git a/plans/346/diagrams.md b/plans/346/diagrams.md index c914f77b..ee0962d3 100644 --- a/plans/346/diagrams.md +++ b/plans/346/diagrams.md @@ -33,14 +33,18 @@ The overall orchestration flow from start to row group checkpoint. ```mermaid flowchart TD - START[Start] --> SEED[Dispatch from_scratch tasks] + START[Start] --> ADMIT[Admit row group\nacquire async_max_concurrent_row_groups slot] + ADMIT --> SEED[Dispatch from_scratch tasks] SEED --> SEED_CHECK{Stateful generator?} SEED_CHECK -->|Yes| SER[Serialize per-instance\nrow group N before N+1] - SEED_CHECK -->|No| PAR[Dispatch all row groups\nconcurrently] + SEED_CHECK -->|No| PAR[Dispatch concurrently\nwithin admitted set] SER --> PRE PAR --> PRE - PRE[Pre-batch barrier\nrun processors, reset tracker] --> LOOP + PRE[Pre-batch barrier\nrun processors, reset tracker] --> PRE_OK{Processor\nsucceeded?} + PRE_OK -->|No| SKIP[Skip row group\nrelease semaphore slot] + SKIP --> DONE + PRE_OK -->|Yes| LOOP LOOP[Query tracker:\nget_ready_tasks] --> READY{Tasks ready?} @@ -56,14 +60,14 @@ flowchart TD DEFERRED -->|No, or budget exhausted| RG_CHECK{Row group\ncomplete?} RG_CHECK -->|Yes| POST[Post-batch processors] - POST --> CP[Checkpoint to parquet\nfree memory] + POST --> CP[Checkpoint to parquet\nfree memory\nrelease semaphore slot] CP --> DONE{All row groups\ndone?} RG_CHECK -->|No| WAIT[Wait for in-flight\ntasks to complete] WAIT --> LOOP DONE -->|Yes| FIN[Done] - DONE -->|No| LOOP + DONE -->|No| ADMIT ``` ## 3. Dependency Resolution Example From b6d8a2af0c56d5995dc83fe79eeafa751f17a2d9 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 11:30:04 -0300 Subject: [PATCH 09/16] add profiling/tracing section to async scheduler plan - TaskTrace dataclass spec in Step 3 (opt-in, zero overhead when disabled) - trace=True param on AsyncTaskScheduler constructor in Step 4 - Step 8 benchmark references trace for timing measurements - New Profiling section: instrumentation points, example output table, usage snippet --- plans/346/async-generators-and-task-queue.md | 76 +++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 0a43a6a6..31e5e092 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -245,6 +245,12 @@ Simple dataclass representing a unit of work. - `row_index: int | None` (None for batch tasks) - `task_type: Literal["from_scratch", "cell", "batch", "pre_batch_processor", "post_batch_processor"]` - [ ] `TaskResult` with status, output, error info +- [ ] `TaskTrace` dataclass (only instantiated when tracing is enabled): + - `column: str`, `row_group: int`, `row_index: int | None`, `task_type: str` + - `dispatched_at: float` — `perf_counter()` when `create_task()` fires + - `slot_acquired_at: float` — after scheduler semaphore acquired + - `completed_at: float` — in `finally` block after generator returns + - `status: str`, `error: str | None` - [ ] Hashable so we can track dispatched/pending sets **Files**: new module `engine/dataset_builders/utils/task_model.py` — must be its own module @@ -256,7 +262,8 @@ inlining would create import cycles. The core orchestrator that replaces `_run_batch` for the async path. - [ ] `AsyncTaskScheduler` class: - - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit (`async_scheduler_max_submitted_tasks`), row group semaphore (`async_max_concurrent_row_groups`), salvage config, error/result callbacks + - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit (`async_scheduler_max_submitted_tasks`), row group semaphore (`async_max_concurrent_row_groups`), salvage config, error/result callbacks, `trace: bool = False` + - When `trace=True`, populates `scheduler.traces: list[TaskTrace]` (one record per task); otherwise no `TaskTrace` objects are created. See Profiling. - `async run()` — main loop: 1. Acquire the row group semaphore (`async_max_concurrent_row_groups`) before admitting a new row group's seed tasks. Dispatch `from_scratch` tasks, @@ -412,7 +419,7 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. row groups continue - [ ] Run `make test` — all existing tests pass - [ ] Run `make test-run-recipes` with `DATA_DESIGNER_ASYNC_ENGINE=1` -- [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency +- [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency; use `trace=True` and load `scheduler.traces` into a DataFrame to measure per-column dispatch and execution times ## Risks & Considerations @@ -533,6 +540,71 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. out-of-order row group completion. - **Checkpoint file naming and format**: parquet files use the same naming scheme and schema. +## Profiling + +Task execution tracing is opt-in, enabled by passing `trace=True` to `build()` or setting +`DATA_DESIGNER_ASYNC_TRACE=1`. When disabled (the default), no `TaskTrace` objects are +created and there is no overhead. When enabled, the scheduler collects one `TaskTrace` +record per task and exposes the list as `scheduler.traces: list[TaskTrace]`, which is +surfaced on the result object after the run. + +### Instrumentation points + +Three timestamps are recorded inside the task coroutine — all on the event loop thread, +so no locking is needed: + +1. `dispatched_at` — set in the scheduler loop right before `asyncio.create_task()` +2. `slot_acquired_at` — set inside the coroutine immediately after `await semaphore.acquire()` +3. `completed_at` + `status` — set in a `try/finally` block wrapping the generator call + +The `TaskTrace` object is created at dispatch time and passed into the coroutine closure; +the coroutine mutates it in-place. On completion it is appended to `scheduler.traces` via +the result callback. + +### What the data shows + +- `slot_acquired_at - dispatched_at`: time waiting on the scheduler semaphore (contention indicator) +- `completed_at - slot_acquired_at`: actual generator execution time +- Dispatch timestamps across tasks: verify dependency order and parallelism (e.g. two + independent columns should show overlapping `slot_acquired_at`–`completed_at` ranges) + +### Example output + +Two-column config (`topic` → `question` → `answer`), 2 row groups, 3 rows each: + +``` +column rg row type dispatched slot_acquired completed duration status +-------- -- --- ------------ ---------- ------------- --------- -------- ------ +topic 0 - from_scratch 0.000 0.001 0.012 0.011 ok +topic 1 - from_scratch 0.001 0.001 0.013 0.012 ok ← RG0+1 overlap (stateless) +question 0 0 cell 0.013 0.014 0.142 0.128 ok +question 0 1 cell 0.013 0.014 0.189 0.175 ok +question 0 2 cell 0.013 0.015 0.201 0.186 ok +question 1 3 cell 0.014 0.015 0.155 0.141 ok +question 1 4 cell 0.014 0.016 0.210 0.194 ok +question 1 5 cell 0.015 0.016 0.198 0.182 ok +answer 0 0 cell 0.143 0.143 0.312 0.169 ok ← dispatched as question[0,0] completes +answer 0 1 cell 0.190 0.190 0.398 0.208 ok +answer 0 2 cell 0.202 0.202 0.445 0.243 ok +answer 1 3 cell 0.156 0.156 0.334 0.178 ok +... +``` + +`topic` RG0 and RG1 dispatch 1ms apart and run concurrently (stateless). `question` rows +are all dispatched the moment `topic` completes for their row group. `answer[0,0]` is +dispatched at `t=0.143`, exactly when `question[0,0]` finishes — confirming cell-level +pipelining across row groups. + +### Usage + +```python +result = data_designer.build(num_records=100, trace=True) +df = pd.DataFrame([asdict(t) for t in result.traces]) +df["wait"] = df["slot_acquired_at"] - df["dispatched_at"] +df["duration"] = df["completed_at"] - df["slot_acquired_at"] +df.groupby("column")[["wait", "duration"]].describe() +``` + ## Relation to PR #269 PR #269 ("feat: add execution graph builder plan with reference implementation") is a From 5b1b058ca0f486e4ca6b75260662f8a4b37bdd40 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 13:36:06 -0300 Subject: [PATCH 10/16] refine async scheduler plan from review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace dependency map with static ExecutionGraph class (upstream, downstream, strategy, topological_order, critical_path, task_count, to_mermaid accessors) - Use row-group-local indices in CompletionTracker instead of global - Clarify from-scratch columns are FULL_COLUMN with empty upstream deps, not a separate strategy enum value - Remove reference to non-existent ColumnGeneratorCellByCell - Expand PR #269 comparison: ExecutionTraits → GenerationStrategy --- plans/346/async-generators-and-task-queue.md | 139 ++++++++++++------- 1 file changed, 88 insertions(+), 51 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 31e5e092..8f3fa52e 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -47,23 +47,25 @@ row group 0 column C is still running). ## Key Design Decisions -### 1. Dynamic dependency resolution, not a static graph +### 1. Column-level static execution graph -We don't build an explicit graph object (unlike PR #269). Instead: -- At setup: build a **dependency map** `dict[str, set[str]]` from each column's +- At setup: build an `ExecutionGraph` (column-level DAG) from each column's `config.required_columns` property (already available on all config types via - Jinja2 template introspection). -- The dependency map also registers **side-effect output columns** (e.g., + Jinja2 template introspection) and the generation strategy from each generator. +- The graph also registers **side-effect output columns** (e.g., `__trace`, `__reasoning_content`) and maps them back to their producer generator. A downstream column referencing `summary__trace` resolves to a dependency on the `summary` generator. This ensures side-effect columns are never missing from - the map or treated as unsatisfied. + the graph or treated as unsatisfied. - At runtime: a **completion tracker** (columns × rows matrix) determines which tasks are ready by checking whether all upstream columns for a given row are done. -This is simpler, requires no new data structures beyond what configs already -provide, and naturally handles the "dynamic" aspect — we just check readiness as -tasks complete. +The graph is column-granularity only — no cell-level nodes — so it stays small +(N columns, N edges) regardless of row count. Scheduling remains dynamic: the +completion tracker drives readiness checks as tasks complete, with no upfront +planning of execution order. The static graph adds inspectability (visualization, +critical path, upfront task counts, error attribution) without changing how the +scheduler operates at runtime. ### 2. Task granularity @@ -199,37 +201,54 @@ incompatible and make an explicit choice (see Follow-ups). ## Implementation Steps -### Step 1: Column Dependency Map - -Build the dependency map from column configs at builder init time. - -- [ ] Add `build_dependency_map(column_configs) -> dict[str, set[str]]` utility - - Input: the ordered list of `ColumnConfigT` / `MultiColumnConfig` +### Step 1: Execution Graph + +Build a column-level static execution graph from column configs at builder init time. +The graph is column-granularity only — no cell-level nodes — so it stays small (N columns, +N edges) regardless of row count and avoids the barrier/checkpoint problems of a cell-level +graph. + +- [ ] `ExecutionGraph` class: + - Backing stores: `dict[str, set[str]]` column → upstream columns; + `dict[str, GenerationStrategy]` column → generation strategy + - `upstream(column: str) -> set[str]` — direct dependencies of a column + - `downstream(column: str) -> set[str]` — columns that depend on this one (for error attribution) + - `strategy(column: str) -> GenerationStrategy` — cell-by-cell or full-column + - `topological_order() -> list[str]` — valid DAG execution order (used by scheduler and for validation) + - `critical_path() -> list[str]` — longest dependency chain (useful for ETA estimates) + - `task_count(num_records: int, buffer_size: int) -> dict[str, int]` — exact task count per + column before the run starts; cell-by-cell columns produce `num_records` tasks, + full-column columns (including from-scratch generators, which report `FULL_COLUMN`) + produce `ceil(num_records / buffer_size)` tasks + - `to_mermaid() -> str` — Mermaid diagram string; nodes are annotated with strategy type +- [ ] `build_execution_graph(column_configs, strategies: dict[str, GenerationStrategy]) -> ExecutionGraph` utility: + - Input: the ordered list of `ColumnConfigT` / `MultiColumnConfig`, plus a pre-computed + strategy map (available from generators at builder init time via `get_generation_strategy()`) - For each config, read `config.required_columns` → set of upstream column names - Also register side-effect output columns (`__trace`, `__reasoning_content`, etc.) and map them back to their producer column, so downstream references resolve correctly - For `MultiColumnConfig`, all sub-columns share the same dependencies - Validate: every required column must resolve to a known producer (including registered side-effect outputs), and the graph must be acyclic -- [ ] Add `topological_order(dependency_map) -> list[str]` — returns a valid DAG - execution order used for validation (not required to match config declaration order) -- [ ] Unit tests for dependency map construction and validation +- [ ] Unit tests for graph construction, validation, critical path, task count, and Mermaid output -**Files**: new module `engine/dataset_builders/utils/dependency_map.py`, tests +**Files**: new module `engine/dataset_builders/utils/execution_graph.py`, tests ### Step 2: Completion Tracker -A lightweight structure tracking which (column, row) pairs are done. +A lightweight structure tracking which (column, row_group, row_index) tuples are +done. Row indices are **local** to their row group (0-based within each group), +matching the buffer manager's per-row-group addressing. - [ ] `CompletionTracker` class: - - Internal: `dict[str, set[int]]` mapping column name → set of completed row indices - - `mark_complete(column: str, row: int)` / `mark_batch_complete(column: str, row_group: int, row_group_size: int)` - - `is_ready(column: str, row: int, dependency_map) -> bool` — checks all upstream columns for that row - - `is_batch_ready(column: str, row_group: int, row_group_size: int, dependency_map) -> bool` — checks all rows in group + - Internal: `dict[int, dict[str, set[int]]]` mapping row_group → column → set of completed local row indices + - `mark_complete(column: str, row_group: int, row_index: int)` / `mark_batch_complete(column: str, row_group: int, row_group_size: int)` + - `is_ready(column: str, row_group: int, row_index: int, graph: ExecutionGraph) -> bool` — checks all upstream columns for that (row_group, row_index) + - `is_batch_ready(column: str, row_group: int, row_group_size: int, graph: ExecutionGraph) -> bool` — checks all rows in group - `drop_row(row_group: int, row_index: int)` — marks row as dropped across all columns; `get_ready_tasks` skips dropped rows, in-flight tasks for dropped rows are ignored on completion - `is_row_group_complete(row_group: int, row_group_size: int, all_columns: list[str]) -> bool` — all non-dropped rows have all columns done - - `get_ready_tasks(dependency_map, columns_with_strategy, row_groups) -> list[Task]` — yields all currently dispatchable tasks, excluding dropped rows + - `get_ready_tasks(graph: ExecutionGraph, row_groups) -> list[Task]` — yields all currently dispatchable tasks, excluding dropped rows; reads `graph.strategy(column)` to determine task granularity per column - [ ] No locks needed: all access is from the single asyncio event loop thread - [ ] Unit tests @@ -262,7 +281,7 @@ inlining would create import cycles. The core orchestrator that replaces `_run_batch` for the async path. - [ ] `AsyncTaskScheduler` class: - - Constructor takes: generators (by column name), dependency map, completion tracker, row group definitions, concurrency limit (`async_scheduler_max_submitted_tasks`), row group semaphore (`async_max_concurrent_row_groups`), salvage config, error/result callbacks, `trace: bool = False` + - Constructor takes: generators (by column name), `graph: ExecutionGraph`, completion tracker, row group definitions, concurrency limit (`async_scheduler_max_submitted_tasks`), row group semaphore (`async_max_concurrent_row_groups`), salvage config, error/result callbacks, `trace: bool = False` - When `trace=True`, populates `scheduler.traces: list[TaskTrace]` (one record per task); otherwise no `TaskTrace` objects are created. See Profiling. - `async run()` — main loop: 1. Acquire the row group semaphore (`async_max_concurrent_row_groups`) before @@ -340,8 +359,8 @@ and async-first generators (new plugins doing native async I/O) only need to imp with `max_parallel_requests > 1`, `generate()` internally uses `ConcurrentThreadExecutor`, so the async wrapper spawns a thread that itself spawns more threads — bypassing the scheduler's concurrency controls for those HTTP calls. Acceptable for v1 (see Follow-ups). -- [ ] `CustomColumnGenerator`: inherits directly from `ColumnGenerator` (not from `ColumnGeneratorCellByCell` - or `ColumnGeneratorFullColumn`), so it does not automatically inherit either async wrapper. Needs its own +- [ ] `CustomColumnGenerator`: inherits directly from `ColumnGenerator` (not from + `ColumnGeneratorFullColumn`), so it does not automatically inherit the full-column async wrapper. Needs its own `agenerate` that branches on strategy: - `CELL_BY_CELL`: if the user function is a coroutine (`asyncio.iscoroutinefunction`), call it directly; otherwise wrap in `asyncio.to_thread` @@ -374,7 +393,7 @@ Adapt `DatasetBatchManager` for concurrent row group processing. Wire the new scheduler into `ColumnWiseDatasetBuilder`. - [ ] New method `_build_async(generators, num_records, buffer_size, ...)`: - 1. Build dependency map from `self._column_configs` + 1. Build `ExecutionGraph` from `self._column_configs` and generator strategies 2. Partition rows into row groups 3. Create `CompletionTracker`, `AsyncTaskScheduler` 4. Run scheduler on the background event loop (reuse `_ensure_async_engine_loop()` @@ -393,8 +412,8 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. ### Step 8: Tests & Validation -- [ ] Unit tests for each new module (dependency map, completion tracker, task model, scheduler) -- [ ] Dependency map: side-effect output columns resolve correctly (e.g., column +- [ ] Unit tests for each new module (execution graph, completion tracker, task model, scheduler) +- [ ] Execution graph: side-effect output columns resolve correctly (e.g., column depending on `summary__trace` maps to a dependency on the `summary` generator) - [ ] Integration test: multi-column config with known dependencies, verify parallel execution - [ ] Integration test: mixed cell-by-cell + full-column generators @@ -611,10 +630,13 @@ PR #269 ("feat: add execution graph builder plan with reference implementation") companion design by @johnnygreco that we reviewed before finalising this plan. It proposes a static `ExecutionGraph` with typed node IDs (`CellNodeId`, `BarrierNodeId`), an `ExecutionTraits` flag enum, and a `CompletionTracker`. It intentionally stops at the -graph/tracker layer; this plan covers the full stack on top of that foundation. +graph/tracker layer; this plan covers the full stack from graph through to deployment. ### What we adopted +- **Static `ExecutionGraph`**: we adopt the concept of building a static graph upfront, + inspectable before and after a run. Our graph is column-granularity rather than + cell-granularity — see below for why. - **Dependency source**: derive the graph from `required_columns` on existing configs, extended with a side-effect mapping for columns like `summary__trace`. No config schema changes in either approach. @@ -630,17 +652,30 @@ graph/tracker layer; this plan covers the full stack on top of that foundation. ### What we changed, and why -**Row-group tasks instead of cell-level nodes.** PR #269 models every `(row, column)` pair +**Column-level graph, not cell-level.** PR #269 models every `(row, column)` pair as a virtual `CellNodeId`. Full-column generators become `BARRIER` nodes — a synthetic node that must complete before any output cells are ready. This faithfully models dependencies but creates a problem the PR itself flags as an open issue: a validation column anywhere in the pipeline blocks all checkpointing until the entire dataset completes, because no row is "done" until every column, including the barrier, finishes. - -We scope full-column tasks to a **row group** (the existing `buffer_size` batch). The -effective barrier is just the FULL_COLUMN task waiting for all rows *in that group* — not -the whole dataset. Checkpoints happen as each row group completes, so a failure mid-run -loses at most one batch. +Cell-level nodes also scale to O(C × R), which is large for realistic dataset sizes. + +We use a **column-level** `ExecutionGraph` instead — N columns, N edges, fixed size +regardless of row count. This still provides the full value of a static graph (visualization, +critical path, upfront task counts, error attribution via `downstream()`) without the +checkpoint problem or the node explosion. Full-column tasks are scoped to a **row group**: +the effective barrier is just that FULL_COLUMN task waiting for all rows *in that group*, +not the whole dataset. Checkpoints happen as each row group completes, so a failure +mid-run loses at most one batch. + +**`ExecutionTraits` replaced by `GenerationStrategy` on the graph.** PR #269 attaches an +`ExecutionTraits` flag enum (`CELL`, `BARRIER`, `ROW_STREAMABLE`) to each node. Since our +graph is column-level, we store `GenerationStrategy` (cell-by-cell, full-column) directly +on each column node instead. From-scratch columns are identified by having no upstream +dependencies in the graph; the scheduler checks `can_generate_from_scratch` on the generator +instance to determine which method to call. This serves the same purpose as `ExecutionTraits` +— the scheduler and `CompletionTracker` use it to determine task granularity — without +needing typed node IDs or flag combinations. **`ROW_STREAMABLE` trait omitted.** PR #269 introduces `is_row_streamable` so full-column generators that process rows independently (e.g., `ExpressionColumnGenerator`) can be @@ -651,7 +686,7 @@ microseconds and are never the scheduling bottleneck. We note this as a potentia follow-up if profiling shows otherwise. **Scheduler and concurrency layers added.** PR #269 deliberately stops at the graph and -tracker. Steps 1–3 of this plan (dependency map, completion tracker, task model) are +tracker. Steps 1–3 of this plan (execution graph, completion tracker, task model) are directly informed by PR #269 and we treat it as the reference design for that layer. The remaining steps — scheduler, concurrency controls, retry/salvage, buffer manager, and builder integration — extend that foundation to a deployable implementation. @@ -660,8 +695,8 @@ builder integration — extend that foundation to a deployable implementation. ### Out of scope for this PR - Overhauling `ModelFacade` internals (PR #344's scope) -- Building a heavyweight static execution graph (PR #269's approach — we take the - lightweight dynamic approach instead) +- Building a cell-level static execution graph (PR #269's `CellNodeId`/`BarrierNodeId` + approach — we use a column-level graph instead, which avoids the barrier/checkpoint problem) - Removing the sync/threaded path (it stays as the default) ### Follow-ups @@ -705,13 +740,15 @@ unaffected by this change. ### Key insight from existing code Every column config already has a `required_columns` property that returns the column names referenced in its Jinja2 templates. This gives us explicit dependency -information without any config schema changes. The dependency map starts as -`{col.name: set(col.required_columns) for col in configs}`, extended with a -side-effect output mapping so that references to columns like `summary__trace` -resolve to a dependency on the `summary` generator. - -### On "dynamic" graph building -The graph is implicit in the dependency map + completion tracker. We never -materialise a node/edge graph structure. The scheduler dynamically determines -readiness by querying the tracker — this is what "dynamic" means in this context. -As tasks complete, new tasks become eligible. No upfront planning of execution order. +information without any config schema changes. The `ExecutionGraph` dependency +structure starts as `{col.name: set(col.required_columns) for col in configs}`, +extended with a side-effect output mapping so that references to columns like +`summary__trace` resolve to a dependency on the `summary` generator. + +### On static graph vs dynamic scheduling +The `ExecutionGraph` is a static column-level DAG built once at init time — it +describes *what* can run and in what order. Scheduling remains dynamic: the +completion tracker drives readiness checks as tasks complete, and new tasks become +eligible without any upfront planning. The two concerns are complementary: +the graph provides inspectability and upfront analysis; the tracker provides +runtime readiness at row granularity. From 3cba9e2944f5e041c8bb307ea39ebfea7f41b387 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 14:18:17 -0300 Subject: [PATCH 11/16] fix graph complexity notation and clarify tracker API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Correct "N columns, N edges" to "O(C) nodes, O(C²) edges worst-case" - Add dispatched set param to get_ready_tasks to prevent double-dispatch - Clarify is_row_group_complete drop_row interaction --- plans/346/async-generators-and-task-queue.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 8f3fa52e..5125220c 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -61,7 +61,7 @@ row group 0 column C is still running). tasks are ready by checking whether all upstream columns for a given row are done. The graph is column-granularity only — no cell-level nodes — so it stays small -(N columns, N edges) regardless of row count. Scheduling remains dynamic: the +(O(C) nodes, O(C²) edges worst-case) regardless of row count. Scheduling remains dynamic: the completion tracker drives readiness checks as tasks complete, with no upfront planning of execution order. The static graph adds inspectability (visualization, critical path, upfront task counts, error attribution) without changing how the @@ -204,8 +204,8 @@ incompatible and make an explicit choice (see Follow-ups). ### Step 1: Execution Graph Build a column-level static execution graph from column configs at builder init time. -The graph is column-granularity only — no cell-level nodes — so it stays small (N columns, -N edges) regardless of row count and avoids the barrier/checkpoint problems of a cell-level +The graph is column-granularity only — no cell-level nodes — so it stays small (O(C) nodes, +O(C²) edges worst-case) regardless of row count and avoids the barrier/checkpoint problems of a cell-level graph. - [ ] `ExecutionGraph` class: @@ -247,8 +247,8 @@ matching the buffer manager's per-row-group addressing. - `is_batch_ready(column: str, row_group: int, row_group_size: int, graph: ExecutionGraph) -> bool` — checks all rows in group - `drop_row(row_group: int, row_index: int)` — marks row as dropped across all columns; `get_ready_tasks` skips dropped rows, in-flight tasks for dropped rows are ignored on completion - - `is_row_group_complete(row_group: int, row_group_size: int, all_columns: list[str]) -> bool` — all non-dropped rows have all columns done - - `get_ready_tasks(graph: ExecutionGraph, row_groups) -> list[Task]` — yields all currently dispatchable tasks, excluding dropped rows; reads `graph.strategy(column)` to determine task granularity per column + - `is_row_group_complete(row_group: int, row_group_size: int, all_columns: list[str]) -> bool` — all non-dropped rows have all columns done; `row_group_size` is the original size, dropped rows (via `drop_row`) are excluded internally + - `get_ready_tasks(graph: ExecutionGraph, row_groups, dispatched: set[Task]) -> list[Task]` — yields all currently dispatchable tasks, excluding dropped rows and already-dispatched/in-flight tasks; reads `graph.strategy(column)` to determine task granularity per column - [ ] No locks needed: all access is from the single asyncio event loop thread - [ ] Unit tests @@ -660,8 +660,8 @@ column anywhere in the pipeline blocks all checkpointing until the entire datase completes, because no row is "done" until every column, including the barrier, finishes. Cell-level nodes also scale to O(C × R), which is large for realistic dataset sizes. -We use a **column-level** `ExecutionGraph` instead — N columns, N edges, fixed size -regardless of row count. This still provides the full value of a static graph (visualization, +We use a **column-level** `ExecutionGraph` instead — O(C) nodes, O(C²) edges worst-case, +fixed size regardless of row count. This still provides the full value of a static graph (visualization, critical path, upfront task counts, error attribution via `downstream()`) without the checkpoint problem or the node explosion. Full-column tasks are scoped to a **row group**: the effective barrier is just that FULL_COLUMN task waiting for all rows *in that group*, From 8612deceb6d36fdd92e09daf0a8ea10e9746b136 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 15:18:00 -0300 Subject: [PATCH 12/16] add PR breakdown, code sketches, and throttle note - Add PR breakdown section with 4 PRs, dependency graph, and "what works after merge" for each - Add code-sketches.md with structural sketches of main components - Reorganize test plan by PR (unit tests per PR, integration in PR 4) - Note that throttle manager (PR #344) is optional; scheduler works without it initially --- plans/346/async-generators-and-task-queue.md | 183 +++++++-- plans/346/code-sketches.md | 369 +++++++++++++++++++ 2 files changed, 530 insertions(+), 22 deletions(-) create mode 100644 plans/346/code-sketches.md diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 5125220c..28f7257b 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -100,6 +100,12 @@ Three independent layers: 2. **Throttle manager** (from PR #344) — gates every outbound LLM call, keyed by `provider+model(+domain)`. Dynamically adjusts per-key limits on 429s via AIMD. This is the real API concurrency control. + **Note**: PR #344 may land after the initial scheduler PRs. The scheduler + is designed to work without it: when no throttle manager is available, tasks + skip the throttle acquire step and only the scheduler semaphore + submission + budget bound concurrency. Once PR #344 merges, the throttle acquire call is + wired in — no scheduler changes needed, just a conditional `await` in the + task dispatch path. 3. **Submission budget** — a hard cap on "submitted but not finished" tasks (running + waiting on throttle/backoff), e.g., `async_scheduler_max_submitted_tasks`. @@ -199,6 +205,11 @@ incompatible and make an explicit choice (see Follow-ups). - [ ] All existing tests pass; new tests cover dependency resolution and scheduling - [ ] `make test-run-recipes` passes with async engine enabled +## Code Sketches + +See [`code-sketches.md`](code-sketches.md) for structural sketches of the main +components and how they interact. + ## Implementation Steps ### Step 1: Execution Graph @@ -412,33 +423,161 @@ Wire the new scheduler into `ColumnWiseDatasetBuilder`. ### Step 8: Tests & Validation -- [ ] Unit tests for each new module (execution graph, completion tracker, task model, scheduler) +Tests are added incrementally with each PR, not deferred to the end. + +**PR 1 (foundation) — unit tests**: +- [ ] Execution graph construction, validation, topological order, critical path - [ ] Execution graph: side-effect output columns resolve correctly (e.g., column depending on `summary__trace` maps to a dependency on the `summary` generator) -- [ ] Integration test: multi-column config with known dependencies, verify parallel execution -- [ ] Integration test: mixed cell-by-cell + full-column generators -- [ ] Integration test: error rate shutdown -- [ ] Integration test: checkpoint correctness (row groups written in order, parquet valid) -- [ ] Integration test: `allow_resize=True` with async engine raises `DatasetGenerationError` at startup, naming the column -- [ ] Integration test: stateful generator (`is_stateful=True`) serializes per-instance - across row groups; stateless generators run concurrently -- [ ] Integration test: retry salvage — transient failure is retried and succeeds; +- [ ] Execution graph: `cell_dependencies` returns correct deps for cell-by-cell, + full-column, and from-scratch columns +- [ ] Execution graph: `task_count` and `to_mermaid` output +- [ ] Completion tracker: `mark_complete`, `is_complete`, `all_complete` +- [ ] Completion tracker: `drop_row`, `is_dropped`, `is_row_group_complete` +- [ ] Task model: hashability, equality, TaskResult, TaskTrace + +**PR 2 (generators) — unit tests**: +- [ ] Symmetric bridging: sync-only generator can be called via `agenerate` +- [ ] Symmetric bridging: async-only generator can be called via `generate` +- [ ] `is_stateful` defaults to `False`; `SeedDatasetColumnGenerator` returns `True` +- [ ] `FromScratchColumnGenerator.agenerate_from_scratch` wraps sync correctly +- [ ] `ColumnGeneratorFullColumn.agenerate` passes `df.copy()` to thread +- [ ] `CustomColumnGenerator.agenerate` detects coroutine functions and calls directly +- [ ] All existing generator tests pass unchanged (`make test`) + +**PR 3 (scheduler + buffer) — unit tests with mock generators**: +- [ ] Scheduler dispatches from-scratch tasks first, then downstream as deps complete +- [ ] Stateful generator serializes across row groups; stateless runs concurrently +- [ ] Retry salvage: transient failure is retried and succeeds; non-retryable failure drops immediately; retry budget exhaustion drops correctly -- [ ] Integration test: throttling fairness — 429 on model key A does not stall - unrelated model key B tasks -- [ ] Integration test: bounded submission — with many ready tasks and a tight - throttle key, submitted task count never exceeds `async_scheduler_max_submitted_tasks` -- [ ] Integration test: eager row-drop — failure on column B drops the row across - all columns, independent column C does not process the dropped row -- [ ] Integration test: row-drop with in-flight full-column task — completed task - may still compute dropped rows, but writeback is suppressed and row remains dropped -- [ ] Integration test: out-of-order row group completion produces correctly named - parquet files; final dataset loads in correct row order -- [ ] Integration test: pre-batch processor failure skips the row group, remaining - row groups continue +- [ ] Eager row-drop: failure on column B drops the row across all columns, + independent column C does not process the dropped row +- [ ] Row-drop with in-flight full-column task: completed task may still compute + dropped rows, but writeback is suppressed and row remains dropped +- [ ] Bounded submission: submitted task count never exceeds + `async_scheduler_max_submitted_tasks` +- [ ] Error rate shutdown within sliding window +- [ ] Buffer manager: concurrent row groups, `update_cell`, `checkpoint_row_group` +- [ ] Buffer manager: `drop_records` within row group + +**PR 4 (integration) — integration tests + full validation**: +- [ ] Multi-column config with known dependencies, verify parallel execution +- [ ] Mixed cell-by-cell + full-column generators +- [ ] Checkpoint correctness: row groups written in order, parquet valid +- [ ] Out-of-order row group completion produces correctly named parquet files; + final dataset loads in correct row order +- [ ] `allow_resize=True` with async engine raises `DatasetGenerationError` at startup, + naming the column +- [ ] Pre-batch processor failure skips the row group, remaining row groups continue +- [ ] Throttling fairness: 429 on model key A does not stall unrelated model key B + tasks (once PR #344 is available) - [ ] Run `make test` — all existing tests pass - [ ] Run `make test-run-recipes` with `DATA_DESIGNER_ASYNC_ENGINE=1` -- [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency; use `trace=True` and load `scheduler.traces` into a DataFrame to measure per-column dispatch and execution times +- [ ] Benchmark: compare sync vs async on a multi-column recipe with simulated latency; + use `trace=True` and load `scheduler.traces` into a DataFrame to measure per-column + dispatch and execution times + +## PR Breakdown + +The implementation steps map to 4 PRs that can be reviewed and merged independently. +Each PR is self-contained: it adds new modules with full test coverage but does not +change existing behavior until the final integration PR. + +### PR 1: Foundation (Steps 1 + 2 + 3) + +**Scope**: `ExecutionGraph`, `CompletionTracker`, `Task`/`TaskResult`/`TaskTrace` dataclasses. + +All three are pure data structures with no side effects on the existing codebase. +They live in new modules under `engine/dataset_builders/utils/` and are only imported +by code introduced in later PRs. + +- `execution_graph.py` + tests +- `completion_tracker.py` + tests +- `task_model.py` + tests + +**Why grouped**: the three are tightly coupled (the tracker takes the graph to resolve +readiness, the task model is the unit of work for both), small individually, and +have no external dependencies. Splitting them into 3 separate PRs would create +review overhead without meaningful isolation benefit. + +**What works after merge**: you can build an `ExecutionGraph` from any existing config, +inspect it (`topological_order`, `critical_path`, `task_count`, `to_mermaid`), query +cell-level dependencies, and track completion state — all in isolation, with full test +coverage. No runtime behavior changes. + +**Can merge independently**: yes — no existing code imports these modules. + +### PR 2: Generator async migration (Step 5) + +**Scope**: `is_stateful` property on base class, symmetric `generate`/`agenerate` +bridging, async wrappers on all generator subclasses. + +- Changes to `generators/base.py` (add `is_stateful`, symmetric bridging) +- Changes to `generators/samplers.py`, `generators/seed_dataset.py`, + `generators/expression.py`, `generators/image.py`, `generators/embedding.py` +- `CustomColumnGenerator` async branching +- Tests for bridging, statefulness declaration, async wrappers + +**What works after merge**: every generator can be called via `await agenerate()` or +`await agenerate_from_scratch()`. Sync generators auto-bridge to async via +`asyncio.to_thread`; async-first generators auto-bridge to sync via the safe runner +helper. `is_stateful` is queryable on every generator instance. The existing sync +path is completely untouched — `make test` passes with no behavior change. + +**Can merge independently**: yes — `agenerate()` already exists on the base class +from PR #280; this PR extends the pattern to all subclasses and adds `is_stateful`. +Existing sync callers are unaffected. + +**No dependency on PR 1**: generator changes don't reference the graph/tracker/task model. + +### PR 3: Scheduler + buffer manager (Steps 4 + 6) + +**Scope**: `AsyncTaskScheduler`, row group buffer manager. + +- `async_scheduler.py` + tests (uses graph, tracker, and task model from PR 1) +- Buffer manager extension in `dataset_batch_manager.py` + tests +- Retry/salvage logic, progress consolidation, error handling + +**Depends on**: PR 1 (imports `ExecutionGraph`, `CompletionTracker`, `Task`), PR 2 +(calls `agenerate` / `agenerate_from_scratch`, reads `is_stateful`). + +**What works after merge**: the scheduler can be instantiated with mock generators and +driven through its full lifecycle in tests — row group admission, dependency-driven +dispatch, retry/salvage, row drops, checkpoint callbacks. The buffer manager supports +concurrent row groups with cell-level writes. Not yet wired into the real builder. + +**Can merge independently**: yes — the scheduler is a new module, not yet wired into +the builder. The buffer manager extension adds new methods without changing existing ones. + +### PR 4: Builder integration (Steps 7 + 8) + +**Scope**: Wire everything together in `ColumnWiseDatasetBuilder`. + +- `_build_async()` method on `ColumnWiseDatasetBuilder` +- `allow_resize` startup check +- Pre/post-batch processor integration +- Integration tests, recipe tests with `DATA_DESIGNER_ASYNC_ENGINE=1` +- Benchmark setup + +**Depends on**: PRs 1, 2, 3. + +**What works after merge**: `DATA_DESIGNER_ASYNC_ENGINE=1` enables the full async +pipeline end-to-end. Multi-column configs run with dependency-aware parallel +scheduling, row group checkpointing, retry/salvage, and progress reporting. The +sync path (`DATA_DESIGNER_ASYNC_ENGINE=0`, the default) is unchanged. + +**This is the only PR that changes existing behavior** (gated behind +`DATA_DESIGNER_ASYNC_ENGINE=1`). + +### Dependency graph + +``` +PR 1 (foundation) ──┐ + ├──→ PR 3 (scheduler + buffer) ──→ PR 4 (integration) +PR 2 (generators) ──┘ +``` + +PRs 1 and 2 can be developed and reviewed in parallel. ## Risks & Considerations diff --git a/plans/346/code-sketches.md b/plans/346/code-sketches.md new file mode 100644 index 00000000..c894b01e --- /dev/null +++ b/plans/346/code-sketches.md @@ -0,0 +1,369 @@ +# Code Sketches + +Structural sketches of the main components. Not runnable — shows how the pieces +fit together. See [async-generators-and-task-queue.md](async-generators-and-task-queue.md) +for the full design. + +## ExecutionGraph + +```python +@dataclass +class ExecutionGraph: + _upstream: dict[str, set[str]] # column → upstream columns + _downstream: dict[str, set[str]] # column → downstream columns + _strategies: dict[str, GenerationStrategy] # column → cell-by-cell or full-column + _side_effect_map: dict[str, str] # e.g. "summary__trace" → "summary" + + def upstream(self, column: str) -> set[str]: ... + def downstream(self, column: str) -> set[str]: ... + def strategy(self, column: str) -> GenerationStrategy: ... + + def cell_dependencies( + self, + column: str, + row_group: int, + row_index: int | None, + row_group_size: int, + ) -> list[tuple[str, int, int | None]]: + """Derive cell-level deps on demand from column-level DAG + strategy. + + cell-by-cell upstream, row 2: [(upstream, rg, 2)] + full-column upstream: [(upstream, rg, 0), (upstream, rg, 1), ...] + from-scratch (no upstream): [] + """ + deps: list[tuple[str, int, int | None]] = [] + for up_col in self.upstream(column): + up_strategy = self.strategy(up_col) + if up_strategy == GenerationStrategy.CELL_BY_CELL: + if row_index is not None: + deps.append((up_col, row_group, row_index)) + else: + for ri in range(row_group_size): + deps.append((up_col, row_group, ri)) + else: + deps.append((up_col, row_group, None)) + return deps + + def topological_order(self) -> list[str]: ... + def critical_path(self) -> list[str]: ... + def task_count(self, num_records: int, buffer_size: int) -> dict[str, int]: ... + def to_mermaid(self) -> str: ... +``` + +### Building the graph + +```python +def build_execution_graph( + column_configs: list[ColumnConfigT], + strategies: dict[str, GenerationStrategy], +) -> ExecutionGraph: + graph = ExecutionGraph() + for config in column_configs: + name = config.name + graph._strategies[name] = strategies[name] + + required = set(config.required_columns) + resolved = set() + for req in required: + # "summary__trace" → dependency on the "summary" generator + if req in graph._side_effect_map: + resolved.add(graph._side_effect_map[req]) + else: + resolved.add(req) + graph._upstream[name] = resolved + + for dep in resolved: + graph._downstream.setdefault(dep, set()).add(name) + + # Register side-effect outputs (e.g., __trace, __reasoning_content) + # so downstream references resolve correctly + ... + + # Validate: acyclic, all required columns resolve to known producers + ... + return graph +``` + +## CompletionTracker + +Pure state store — does not resolve dependencies or determine readiness. + +```python +class CompletionTracker: + def __init__(self) -> None: + self._completed: dict[int, dict[str, set[int]]] = {} # rg → col → {row indices} + self._dropped: dict[int, set[int]] = {} # rg → {row indices} + + def mark_complete(self, column: str, row_group: int, row_index: int) -> None: + self._completed.setdefault(row_group, {}).setdefault(column, set()).add(row_index) + + def mark_batch_complete(self, column: str, row_group: int, row_group_size: int) -> None: + self._completed.setdefault(row_group, {})[column] = set(range(row_group_size)) + + def is_complete(self, column: str, row_group: int, row_index: int) -> bool: + return row_index in self._completed.get(row_group, {}).get(column, set()) + + def all_complete(self, cells: list[tuple[str, int, int | None]]) -> bool: + """Used by the scheduler: tracker.all_complete(graph.cell_dependencies(...))""" + for col, rg, ri in cells: + if ri is None: + if col not in self._completed.get(rg, {}): + return False + elif not self.is_complete(col, rg, ri): + return False + return True + + def drop_row(self, row_group: int, row_index: int) -> None: + self._dropped.setdefault(row_group, set()).add(row_index) + + def is_dropped(self, row_group: int, row_index: int) -> bool: + return row_index in self._dropped.get(row_group, set()) + + def is_row_group_complete( + self, row_group: int, row_group_size: int, all_columns: list[str], + ) -> bool: + dropped = self._dropped.get(row_group, set()) + completed = self._completed.get(row_group, {}) + for ri in range(row_group_size): + if ri in dropped: + continue + for col in all_columns: + if ri not in completed.get(col, set()): + return False + return True +``` + +## Task model + +```python +@dataclass(frozen=True) +class Task: + column: str + row_group: int + row_index: int | None # None for batch/full-column tasks + task_type: Literal["from_scratch", "cell", "batch", "pre_batch_processor", "post_batch_processor"] + +@dataclass +class TaskResult: + task: Task + status: Literal["success", "error"] + output: Any = None + error: Exception | None = None + retryable: bool = False + +@dataclass +class TaskTrace: + task: Task + dispatched_at: float = 0.0 + slot_acquired_at: float = 0.0 + completed_at: float = 0.0 + status: str = "" + error: str | None = None +``` + +## AsyncTaskScheduler + +```python +class AsyncTaskScheduler: + def __init__( + self, + generators: dict[str, ColumnGenerator], + graph: ExecutionGraph, + tracker: CompletionTracker, + row_groups: list[tuple[int, int]], # (rg_id, rg_size) + *, + max_concurrent_row_groups: int = 3, + max_submitted_tasks: int = 256, + salvage_max_rounds: int = 2, + trace: bool = False, + ) -> None: + self._graph = graph + self._tracker = tracker + self._rg_semaphore = asyncio.Semaphore(max_concurrent_row_groups) + self._scheduler_semaphore = asyncio.Semaphore(max_submitted_tasks) + self._dispatched: set[Task] = set() + self._wake_event = asyncio.Event() + self.traces: list[TaskTrace] = [] + ... + + async def run(self) -> None: + # Admit row groups behind semaphore, dispatch seeds + for rg_id, rg_size in self._row_groups: + await self._rg_semaphore.acquire() + await self._dispatch_seeds(rg_id, rg_size) + + # Main loop: find ready tasks, dispatch, repeat + while not self._all_complete(): + self._wake_event.clear() + ready = self._get_ready_tasks() + for task in ready: + await self._scheduler_semaphore.acquire() + asyncio.create_task(self._execute_task(task)) + if not ready: + await self._wake_event.wait() + + # Salvage rounds for deferred retryable failures + for _ in range(self._salvage_max_rounds): + if not self._deferred: + break + await self._run_salvage_round() + + def _get_ready_tasks(self) -> list[Task]: + """The core readiness check — combines graph + tracker + scheduler policy.""" + ready: list[Task] = [] + for rg_id, rg_size in self._active_row_groups(): + for col in self._graph.topological_order(): + strategy = self._graph.strategy(col) + if strategy == GenerationStrategy.CELL_BY_CELL: + for ri in range(rg_size): + task = Task(col, rg_id, ri, "cell") + if task in self._dispatched: + continue + if self._tracker.is_dropped(rg_id, ri): + continue + deps = self._graph.cell_dependencies(col, rg_id, ri, rg_size) + if self._tracker.all_complete(deps): + ready.append(task) + else: + task = Task(col, rg_id, None, "batch") + if task in self._dispatched: + continue + deps = self._graph.cell_dependencies(col, rg_id, None, rg_size) + if self._tracker.all_complete(deps): + ready.append(task) + return ready + + async def _execute_task(self, task: Task) -> None: + self._dispatched.add(task) + try: + self._scheduler_semaphore.release() + # ... optional: await throttle permit (once PR #344 lands) ... + + generator = self._generators[task.column] + if task.task_type == "from_scratch": + result_df = await generator.agenerate_from_scratch(...) + # write all rows to buffer + elif task.task_type == "cell": + row_data = ... # read from buffer manager + result = await generator.agenerate(row_data) + if not self._tracker.is_dropped(task.row_group, task.row_index): + ... # buffer_manager.update_cell(...) + else: + batch_df = ... # read from buffer manager + result_df = await generator.agenerate(batch_df.copy()) + # merge result columns back + + self._tracker.mark_complete(task.column, task.row_group, task.row_index or 0) + except Exception as exc: + self._handle_task_failure(task, exc) + finally: + self._wake_event.set() + # check if row group is complete → post-batch → checkpoint +``` + +## Generator base class changes + +Additions to the existing `ColumnGenerator`: + +```python +class ColumnGenerator(ConfigurableTask[TaskConfigT], ABC): + + @property + def is_stateful(self) -> bool: + """Override to True for generators that maintain state across calls.""" + return False + + # --- Symmetric bridging --- + + async def agenerate(self, data: dict) -> dict: + """Default: wrap sync generate in a thread.""" + return await asyncio.to_thread(self.generate, data) + + def generate(self, data: dict) -> dict: + """Default (for async-first subclasses): safe sync bridge.""" + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(self.agenerate(data)) + else: + loop = ... # builder's background event loop + future = asyncio.run_coroutine_threadsafe(self.agenerate(data), loop) + return future.result(timeout=300) + + +class FromScratchColumnGenerator(ColumnGenerator[TaskConfigT], ABC): + + async def agenerate_from_scratch(self, num_records: int) -> pd.DataFrame: + return await asyncio.to_thread(self.generate_from_scratch, num_records) + + async def agenerate(self, data: pd.DataFrame) -> pd.DataFrame: + return await asyncio.to_thread(self.generate, data.copy()) + + +class ColumnGeneratorFullColumn(ColumnGenerator[TaskConfigT], ABC): + + async def agenerate(self, data: pd.DataFrame) -> pd.DataFrame: + return await asyncio.to_thread(self.generate, data.copy()) +``` + +## Builder integration + +Additions to `ColumnWiseDatasetBuilder`: + +```python +class ColumnWiseDatasetBuilder: + + def _build_async(self, generators: list[ColumnGenerator], num_records: int, buffer_size: int) -> None: + # 1. Build graph + strategies = {g.task_config.name: g.get_generation_strategy() for g in generators} + graph = build_execution_graph(self._column_configs, strategies) + + # 2. Partition into row groups + row_groups = [] + remaining = num_records + rg_id = 0 + while remaining > 0: + size = min(buffer_size, remaining) + row_groups.append((rg_id, size)) + remaining -= size + rg_id += 1 + + # 3. Create tracker and scheduler + tracker = CompletionTracker() + gen_map = {g.task_config.name: g for g in generators} + scheduler = AsyncTaskScheduler( + generators=gen_map, + graph=graph, + tracker=tracker, + row_groups=row_groups, + ) + + # 4. Run on background event loop + loop = self._ensure_async_engine_loop() + future = asyncio.run_coroutine_threadsafe(scheduler.run(), loop) + future.result() +``` + +## How the three layers interact + +On each task completion: + +``` +1. Task completes + → tracker.mark_complete(col, rg, row) + +2. Scheduler wakes up (_wake_event.set()) + → for each candidate task in active row groups: + deps = graph.cell_dependencies(col, rg, row, rg_size) + if tracker.all_complete(deps) and task not dispatched: + dispatch it + +3. If tracker.is_row_group_complete(rg, rg_size, all_columns): + → run post-batch processors + → checkpoint to parquet + → release row group semaphore slot +``` + +The graph owns **what depends on what** (static). +The tracker owns **what's done** (runtime state). +The scheduler owns **what to do next** (runtime policy). From e7d0d504076b2344baa652c4cab96baa1c4cde57 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 16:34:38 -0300 Subject: [PATCH 13/16] refine concurrency model and add multi-column handling - Rename scheduler semaphore to execution semaphore for clarity - Split execution semaphore from submission budget as distinct concerns with separate semaphores - Add reacquire step to dispatch pattern after throttle wait - Add multi-column generator handling via instance dedup on the scheduler (graph stays column-level) --- plans/346/async-generators-and-task-queue.md | 62 +++++++++++--------- plans/346/code-sketches.md | 53 +++++++++++++---- 2 files changed, 76 insertions(+), 39 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 28f7257b..1ba6c6b4 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -75,6 +75,14 @@ scheduler operates at runtime. | `CELL_BY_CELL` (LLM text/code/structured/judge, image, embedding, custom) | `(column, row)` | All `required_columns` complete for that row | | `FULL_COLUMN` (expression, validation, sampler-as-transform) | `(column, row_group)` | All `required_columns` complete for ALL rows in row group | +**Multi-column generators**: `MultiColumnConfig` (e.g., a seed dataset producing +`first_name`, `last_name`, `email`) maps multiple output columns to the same +generator instance. The graph has individual nodes for each output column. The +scheduler deduplicates by generator identity — it dispatches one task per unique +instance and marks all output columns complete on completion. This mapping +(`instance_id → output columns`) is built at scheduler init from the generator map, +where multiple column keys point to the same object. + ### 3. Row groups as checkpoint units Rows are partitioned into groups of `buffer_size` (same as current batches). @@ -93,33 +101,31 @@ Full-column generators operate on their entire row group at once, same as today. Three independent layers: -1. **Scheduler semaphore** — a coarse resource guard bounding total in-flight - active execution to limit CPU/memory pressure (e.g., configurable cap, default - ~128). This is **not** the source of truth for API concurrency. +1. **Execution semaphore** — bounds active compute/writeback sections to limit + CPU/memory pressure (e.g., configurable cap, default ~128). This is **not** + the source of truth for API concurrency. -2. **Throttle manager** (from PR #344) — gates every outbound LLM call, keyed - by `provider+model(+domain)`. Dynamically adjusts per-key limits on 429s - via AIMD. This is the real API concurrency control. - **Note**: PR #344 may land after the initial scheduler PRs. The scheduler - is designed to work without it: when no throttle manager is available, tasks - skip the throttle acquire step and only the scheduler semaphore + submission - budget bound concurrency. Once PR #344 merges, the throttle acquire call is - wired in — no scheduler changes needed, just a conditional `await` in the - task dispatch path. +2. **Throttle manager** (from PR #344) — gates outbound LLM calls, keyed by + `provider+model(+domain)`. Dynamically adjusts per-key limits on 429s via AIMD. + This is the real API concurrency control. + **Note**: PR #344 may land after the initial scheduler PRs. When no throttle + manager is available, LLM tasks skip the throttle acquire step and only the + execution semaphore + submission budget bound concurrency. 3. **Submission budget** — a hard cap on "submitted but not finished" tasks (running + waiting on throttle/backoff), e.g., `async_scheduler_max_submitted_tasks`. - This prevents unbounded parked coroutines when tasks release scheduler slots - before throttle acquire. -Tasks must **not hold scheduler slots while waiting on throttle backoff**. A task -acquires a scheduler slot, prepares its request, then releases the slot before -awaiting the throttle permit. This ensures a throttled model key doesn't starve -unrelated keys by hogging scheduler capacity. +LLM tasks must **not hold execution slots while waiting on throttle/backoff**. +Dispatch pattern: +1. acquire execution slot — prepare request +2. release execution slot +3. await throttle permit (LLM tasks only; skipped without PR #344) +4. reacquire execution slot — execute generator call + writeback +5. release execution slot -Task admission is therefore bounded by the submission budget, while active -execution is bounded by the scheduler semaphore. This composes cleanly with -PR #344's adaptive throttling without the two systems fighting each other. +Task admission is bounded by the submission budget, while active compute/writeback +is bounded by the execution semaphore. This prevents a throttled key from starving +unrelated work while still keeping total active work bounded. ### 5. Generator statefulness and reentrancy @@ -278,7 +284,7 @@ Simple dataclass representing a unit of work. - [ ] `TaskTrace` dataclass (only instantiated when tracing is enabled): - `column: str`, `row_group: int`, `row_index: int | None`, `task_type: str` - `dispatched_at: float` — `perf_counter()` when `create_task()` fires - - `slot_acquired_at: float` — after scheduler semaphore acquired + - `slot_acquired_at: float` — after execution semaphore acquired - `completed_at: float` — in `finally` block after generator returns - `status: str`, `error: str | None` - [ ] Hashable so we can track dispatched/pending sets @@ -301,13 +307,13 @@ The core orchestrator that replaces `_run_batch` for the async path. N's seed completes before N+1's seed starts for that generator); stateless generators dispatch all admitted row groups concurrently 2. Loop: query `completion_tracker.get_ready_tasks()` → dispatch each via - `asyncio.create_task()` behind scheduler semaphore → on completion, update + `asyncio.create_task()` behind submission budget → on completion, update tracker → repeat until all tasks done or early shutdown 3. When ready queue drains, run salvage rounds over deferred retryable failures (up to `async_salvage_max_rounds` rounds) 4. After each row group completes: run post-batch processors, checkpoint - - Task dispatch: acquire scheduler semaphore slot → prepare request → release - slot → await throttle permit (for LLM tasks) → execute → write result + - Task dispatch follows the pattern from §4: acquire execution slot → prepare → + release → await throttle (LLM only) → reacquire → execute + writeback → release - Admission control: never allow more than `async_scheduler_max_submitted_tasks` tasks in submitted/running/waiting states; hold remaining ready tasks in the scheduler queue until slots free up @@ -586,7 +592,7 @@ PRs 1 and 2 can be developed and reviewed in parallel. controlled by `async_max_concurrent_row_groups` (default 3). The scheduler only admits a new row group's seed tasks once a slot is available. -- **Unbounded parked coroutines during throttle waits**: Releasing scheduler slots +- **Unbounded parked coroutines during throttle waits**: Releasing execution slots before throttle acquire improves fairness, but can create large numbers of parked tasks if admission is not bounded. Mitigation: enforce `async_scheduler_max_submitted_tasks` as a hard cap on submitted/running/waiting @@ -648,7 +654,7 @@ PRs 1 and 2 can be developed and reviewed in parallel. - **Thread pool sizing**: sync generators wrapped in `asyncio.to_thread` use Python's default thread pool executor (`min(32, cpu_count + 4)`). For v1, keep the default — - the scheduler semaphore and row group cap already bound actual concurrency to levels + the execution semaphore and row group cap already bound actual concurrency to levels where the default pool is unlikely to be a bottleneck (see Follow-ups). - **Silent task hangs**: a sync generator wrapped in `asyncio.to_thread` could hang @@ -721,7 +727,7 @@ the result callback. ### What the data shows -- `slot_acquired_at - dispatched_at`: time waiting on the scheduler semaphore (contention indicator) +- `slot_acquired_at - dispatched_at`: time waiting on the execution semaphore (contention indicator) - `completed_at - slot_acquired_at`: actual generator execution time - Dispatch timestamps across tasks: verify dependency order and parallelism (e.g. two independent columns should show overlapping `slot_acquired_at`–`completed_at` ranges) diff --git a/plans/346/code-sketches.md b/plans/346/code-sketches.md index c894b01e..bbee19d3 100644 --- a/plans/346/code-sketches.md +++ b/plans/346/code-sketches.md @@ -167,23 +167,32 @@ class TaskTrace: class AsyncTaskScheduler: def __init__( self, - generators: dict[str, ColumnGenerator], + generators: dict[str, ColumnGenerator], # column name → generator (multi-column: same instance) graph: ExecutionGraph, tracker: CompletionTracker, row_groups: list[tuple[int, int]], # (rg_id, rg_size) *, max_concurrent_row_groups: int = 3, max_submitted_tasks: int = 256, + max_execution_slots: int = 128, salvage_max_rounds: int = 2, trace: bool = False, ) -> None: + self._generators = generators self._graph = graph self._tracker = tracker self._rg_semaphore = asyncio.Semaphore(max_concurrent_row_groups) - self._scheduler_semaphore = asyncio.Semaphore(max_submitted_tasks) + self._submission_semaphore = asyncio.Semaphore(max_submitted_tasks) + self._execution_semaphore = asyncio.Semaphore(max_execution_slots) self._dispatched: set[Task] = set() self._wake_event = asyncio.Event() self.traces: list[TaskTrace] = [] + + # Multi-column dedup: group output columns by generator identity + instance_map: dict[int, list[str]] = {} + for col, gen in generators.items(): + instance_map.setdefault(id(gen), []).append(col) + self._instance_to_columns = instance_map ... async def run(self) -> None: @@ -197,7 +206,7 @@ class AsyncTaskScheduler: self._wake_event.clear() ready = self._get_ready_tasks() for task in ready: - await self._scheduler_semaphore.acquire() + await self._submission_semaphore.acquire() asyncio.create_task(self._execute_task(task)) if not ready: await self._wake_event.wait() @@ -211,8 +220,12 @@ class AsyncTaskScheduler: def _get_ready_tasks(self) -> list[Task]: """The core readiness check — combines graph + tracker + scheduler policy.""" ready: list[Task] = [] + seen_instances: set[int] = set() for rg_id, rg_size in self._active_row_groups(): for col in self._graph.topological_order(): + gen = self._generators[col] + if id(gen) in seen_instances: + continue # multi-column: already dispatched via sibling strategy = self._graph.strategy(col) if strategy == GenerationStrategy.CELL_BY_CELL: for ri in range(rg_size): @@ -231,15 +244,24 @@ class AsyncTaskScheduler: deps = self._graph.cell_dependencies(col, rg_id, None, rg_size) if self._tracker.all_complete(deps): ready.append(task) + seen_instances.add(id(gen)) return ready async def _execute_task(self, task: Task) -> None: self._dispatched.add(task) try: - self._scheduler_semaphore.release() - # ... optional: await throttle permit (once PR #344 lands) ... - generator = self._generators[task.column] + + # 1. acquire execution slot → prepare request + await self._execution_semaphore.acquire() + # ... prepare request ... + self._execution_semaphore.release() + + # 2. await throttle permit (LLM tasks only; skipped without PR #344) + # ... + + # 3. reacquire execution slot → execute + writeback + await self._execution_semaphore.acquire() if task.task_type == "from_scratch": result_df = await generator.agenerate_from_scratch(...) # write all rows to buffer @@ -252,11 +274,19 @@ class AsyncTaskScheduler: batch_df = ... # read from buffer manager result_df = await generator.agenerate(batch_df.copy()) # merge result columns back + self._execution_semaphore.release() - self._tracker.mark_complete(task.column, task.row_group, task.row_index or 0) + # Mark all output columns complete (handles multi-column generators) + output_cols = self._instance_to_columns[id(generator)] + for col in output_cols: + if task.row_index is None: + self._tracker.mark_batch_complete(col, task.row_group, ...) + else: + self._tracker.mark_complete(col, task.row_group, task.row_index) except Exception as exc: self._handle_task_failure(task, exc) finally: + self._submission_semaphore.release() self._wake_event.set() # check if row group is complete → post-batch → checkpoint ``` @@ -330,9 +360,10 @@ class ColumnWiseDatasetBuilder: # 3. Create tracker and scheduler tracker = CompletionTracker() + # Multi-column generators: multiple column keys → same instance gen_map = {g.task_config.name: g for g in generators} scheduler = AsyncTaskScheduler( - generators=gen_map, + generators=gen_map, # scheduler deduplicates by identity graph=graph, tracker=tracker, row_groups=row_groups, @@ -350,13 +381,13 @@ On each task completion: ``` 1. Task completes - → tracker.mark_complete(col, rg, row) + → tracker.mark_complete(col, rg, row) # all output columns for multi-column generators 2. Scheduler wakes up (_wake_event.set()) - → for each candidate task in active row groups: + → for each candidate task in active row groups (deduped by generator identity): deps = graph.cell_dependencies(col, rg, row, rg_size) if tracker.all_complete(deps) and task not dispatched: - dispatch it + dispatch it (acquire submission slot → execute behind execution semaphore) 3. If tracker.is_row_group_complete(rg, rg_size, all_columns): → run post-batch processors From ff31567dd5eb9be8a37017f3cb9bc4eb2f7b9d05 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 17:00:09 -0300 Subject: [PATCH 14/16] add compute-bound generator risk and follow-up GIL contention with CPU-bound custom generators and event loop starvation with native async compute are documented as v1 risks. ProcessPoolExecutor routing via is_cpu_bound noted as follow-up. --- plans/346/async-generators-and-task-queue.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index 1ba6c6b4..c25d6c4d 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -660,6 +660,17 @@ PRs 1 and 2 can be developed and reviewed in parallel. - **Silent task hangs**: a sync generator wrapped in `asyncio.to_thread` could hang indefinitely. For v1, rely on upstream model timeouts (see Follow-ups). +- **Compute-bound generators and GIL contention**: sync generators wrapped in + `asyncio.to_thread` run in Python's thread pool, which protects the event loop + from starvation. However, for CPU-bound Python code, the GIL serializes threads — + many concurrent compute-heavy tasks get threading overhead with no parallelism. + Built-in compute-bound generators (expression eval, samplers) are microsecond-fast, + so GIL contention is invisible. The risk is custom generators doing heavy CPU work. + Native async generators that do CPU work without yielding are worse — they block + the event loop thread entirely, freezing all scheduling. For v1, `asyncio.to_thread` + is sufficient; a future `is_cpu_bound` property could route compute-heavy generators + to a `ProcessPoolExecutor` for true parallelism (see Follow-ups). + ## UX Considerations - **Interleaved log output**: with parallel columns and row groups, log lines will interleave. @@ -858,6 +869,9 @@ builder integration — extend that foundation to a deployable implementation. a row is dropped; add telemetry to track compute wasted on dropped rows. - **Thread pool sizing**: if profiling shows saturation of the default executor (`min(32, cpu_count + 4)`), explicitly size it to match the scheduler caps. +- **`ProcessPoolExecutor` for compute-bound generators**: if custom generators doing + heavy CPU work cause GIL contention, add an `is_cpu_bound` property and route those + generators to a `ProcessPoolExecutor` for true parallelism. ### Impact on plugins and custom columns From 1531364836e7cd5146f3007f3216a668a8d013d5 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 17:07:27 -0300 Subject: [PATCH 15/16] clarify compute-bound risk as thread pool starvation Compute-heavy tasks saturating the thread pool starve I/O-bound tasks (LLM calls) from acquiring threads, not just GIL contention. --- plans/346/async-generators-and-task-queue.md | 22 +++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index c25d6c4d..ae933721 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -660,16 +660,18 @@ PRs 1 and 2 can be developed and reviewed in parallel. - **Silent task hangs**: a sync generator wrapped in `asyncio.to_thread` could hang indefinitely. For v1, rely on upstream model timeouts (see Follow-ups). -- **Compute-bound generators and GIL contention**: sync generators wrapped in - `asyncio.to_thread` run in Python's thread pool, which protects the event loop - from starvation. However, for CPU-bound Python code, the GIL serializes threads — - many concurrent compute-heavy tasks get threading overhead with no parallelism. - Built-in compute-bound generators (expression eval, samplers) are microsecond-fast, - so GIL contention is invisible. The risk is custom generators doing heavy CPU work. - Native async generators that do CPU work without yielding are worse — they block - the event loop thread entirely, freezing all scheduling. For v1, `asyncio.to_thread` - is sufficient; a future `is_cpu_bound` property could route compute-heavy generators - to a `ProcessPoolExecutor` for true parallelism (see Follow-ups). +- **Compute-bound generators starving I/O-bound tasks**: compute-bound and I/O-bound + tasks share the same thread pool (via `asyncio.to_thread`). If compute-heavy tasks + saturate the pool, I/O-bound tasks (LLM calls, remote validators) can't acquire + threads and stall — even though they'd release the GIL immediately on network I/O. + Additionally, the GIL serializes CPU-bound threads, so compute tasks get threading + overhead with no parallelism. Native async generators that do CPU work without + yielding are worse — they block the event loop thread entirely, freezing all + scheduling. Built-in compute-bound generators (expression eval, samplers) are + microsecond-fast, so this risk is limited to custom generators doing heavy CPU work. + For v1, `asyncio.to_thread` is sufficient; a future `is_cpu_bound` property could + route compute-heavy generators to a separate `ProcessPoolExecutor`, keeping the + thread pool available for I/O-bound work (see Follow-ups). ## UX Considerations From 8333e1ea5c57fc89ba09e8bd14ba63c57318164d Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 25 Feb 2026 17:19:31 -0300 Subject: [PATCH 16/16] add async guidance for plugins and custom columns Compute-bound plugins should implement generate(), not agenerate(), to keep CPU work off the event loop. Same rule for custom columns: only use async def for I/O-bound work. --- plans/346/async-generators-and-task-queue.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index ae933721..a9b6b418 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -887,11 +887,19 @@ automatically gets async support. Plugins that want native async performance can optionally override `agenerate()` instead — the symmetric bridging means they don't need to implement `generate()` at all. The `is_stateful` property defaults to `False`, which is correct for most plugins; stateful plugins can override it. +**Important**: only override `agenerate()` if your work is I/O-bound (network calls, +async database queries). Compute-bound plugins should implement `generate()` and let +the framework wrap it in `asyncio.to_thread` — this keeps CPU work off the event loop +thread. An `agenerate()` that does CPU work without yielding blocks the event loop +and freezes all scheduling. **Custom columns** (`@custom_column_generator`): user-provided sync functions are wrapped in `asyncio.to_thread` by the framework. If the user provides an async function, `CustomColumnGenerator` detects this via `asyncio.iscoroutinefunction` and calls it directly as a coroutine — no thread pool overhead. +The same rule applies: only use `async def` for I/O-bound work. A compute-bound +`async def` that never awaits will block the event loop. For data transformations, +string processing, or any CPU-heavy logic, use a regular `def`. **Processor plugins** (`process_before_batch`, `process_after_batch`, `process_after_generation`): processors run at barrier points in the scheduling loop