Skip to content

chore: plans for async generators and task-queue dataset builder#347

Merged
andreatgretel merged 17 commits intomainfrom
andreatgretel/chore/async-generators-plan
Feb 25, 2026
Merged

chore: plans for async generators and task-queue dataset builder#347
andreatgretel merged 17 commits intomainfrom
andreatgretel/chore/async-generators-plan

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Feb 20, 2026

Summary

Adds a comprehensive plan for transforming DataDesigner's dataset builder from sequential column-by-column processing into an async task queue with dependency-aware scheduling. Plan only — implementation will follow in subsequent PRs.

Part of #346

Changes

Added

  • plans/346/async-generators-and-task-queue.md — full plan covering:
    • Column-level static execution graph with on-demand cell-level dependency queries
    • Task granularity (from-scratch, cell-by-cell, full-column) with multi-column generator dedup
    • Three-layer concurrency control: execution semaphore, PR chore: plans for model facade overhaul #344's throttle manager (optional), submission budget
    • Two-phase dispatch pattern (release execution slot during throttle wait, reacquire for execution)
    • Generator statefulness via is_stateful property for reentrancy control
    • Retry & salvage policy for transient failures
    • allow_resize scoped out of async v1 (raises DatasetGenerationError at startup)
    • Risks including compute-bound generators/GIL contention, eager row-drop, pre-batch mutations
    • 8 implementation steps, PR breakdown (4 independent PRs), success criteria, and test plan
  • plans/346/code-sketches.md — structural sketches of ExecutionGraph, CompletionTracker, Task, AsyncTaskScheduler, generator base class changes, and builder integration
  • plans/346/diagrams.md — reference Mermaid diagrams for scheduler flow, task lifecycle, and component architecture

Attention Areas

Reviewers: Please pay special attention to the following:


Description updated with AI

@andreatgretel andreatgretel requested a review from a team as a code owner February 20, 2026 20:26
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 20, 2026

Greptile Summary

This PR adds comprehensive planning documentation for transforming DataDesigner's dataset builder from sequential column-by-column processing to an async task queue with dependency-aware scheduling. The plan is highly detailed and well-structured:

Main plan document (async-generators-and-task-queue.md):

  • Clearly articulates the transformation from sequential to async parallel execution
  • Defines 8 key design decisions with strong technical rationale
  • Provides 8 implementation steps with detailed acceptance criteria
  • Maps implementation to 4 independent PRs with clear dependency graph
  • Thoroughly documents risks, UX considerations, profiling strategy, and backward compatibility
  • Includes thoughtful comparison with PR feat: add execution graph builder plan with reference implementation #269, explaining what was adopted and what was changed

Code sketches (code-sketches.md):

  • Shows structural implementation patterns for ExecutionGraph, CompletionTracker, Task models
  • Demonstrates how components interact with concrete code examples
  • Illustrates multi-column generator deduplication and symmetric async/sync bridging

Diagrams (diagrams.md):

  • Provides clear Mermaid visualizations of task lifecycle, scheduler flow, and concurrency layers
  • Shows concrete dependency resolution example with timeline
  • Illustrates row group pipelining and failure mode avoidance

The plan demonstrates deep understanding of async Python, concurrency control, and the existing codebase architecture. The PR breakdown is particularly well-designed, allowing independent development and review of 4 PRs (foundation, generators, scheduler, integration) with clear dependencies.

Confidence Score: 5/5

  • This PR is safe to merge with no risk - it only adds planning documentation
  • Documentation-only PR adding comprehensive design plans with no code changes. The plan is thorough, well-structured, and demonstrates strong technical understanding. All three files are markdown documentation with no runtime impact.
  • No files require special attention - all are well-written planning documents

Important Files Changed

Filename Overview
plans/346/async-generators-and-task-queue.md Comprehensive async task queue plan with detailed design decisions, implementation steps, risk analysis, and PR breakdown
plans/346/code-sketches.md Structural code sketches showing ExecutionGraph, CompletionTracker, Task models, scheduler, and generator integration patterns
plans/346/diagrams.md Mermaid diagrams visualizing task lifecycle, scheduler flow, dependency resolution, concurrency layers, and row group pipelining

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    START[Issue #346: Async Task Queue] --> PLAN[Plan Document]
    
    PLAN --> KEY[8 Key Design Decisions]
    KEY --> K1[Column-level static ExecutionGraph]
    KEY --> K2[Task granularity by strategy]
    KEY --> K3[Row groups as checkpoints]
    KEY --> K4[3-layer concurrency control]
    KEY --> K5[Generator statefulness]
    KEY --> K6[Pre/post-batch processors]
    KEY --> K7[Retry & salvage policy]
    KEY --> K8[allow_resize scoped out]
    
    PLAN --> IMPL[8 Implementation Steps]
    IMPL --> S1[Step 1: ExecutionGraph]
    IMPL --> S2[Step 2: CompletionTracker]
    IMPL --> S3[Step 3: Task Model]
    IMPL --> S4[Step 4: AsyncTaskScheduler]
    IMPL --> S5[Step 5: Generator async migration]
    IMPL --> S6[Step 6: Buffer manager]
    IMPL --> S7[Step 7: Builder integration]
    IMPL --> S8[Step 8: Tests & validation]
    
    IMPL --> PR[4 Independent PRs]
    PR --> PR1[PR1: Foundation<br/>Steps 1+2+3]
    PR --> PR2[PR2: Generators<br/>Step 5]
    PR --> PR3[PR3: Scheduler<br/>Steps 4+6]
    PR --> PR4[PR4: Integration<br/>Steps 7+8]
    
    PR1 --> PR3
    PR2 --> PR3
    PR3 --> PR4
    
    PLAN --> DOCS[Supporting Docs]
    DOCS --> CODE[code-sketches.md<br/>Structural patterns]
    DOCS --> DIAG[diagrams.md<br/>Visual flows]
    
    style PLAN fill:#e1f5ff
    style PR fill:#d4edda
    style DOCS fill:#fff3cd
Loading

Last reviewed commit: 8333e1e

@nabinchha
Copy link
Contributor

Thanks for this plan. One cross-plan concern to address before implementation:

PR #344 (model facade overhaul) defines adaptive, key-based auto-throttling for model calls (provider+model(+domain)), with dynamic limit changes on 429s. This plan currently proposes a single global scheduler semaphore sized from summed model limits. Those can conflict.

Why this matters:

  • A hot/throttled key can consume global scheduler slots while waiting/retrying, reducing throughput for unrelated keys.
  • A static global budget can oversubscribe a key that has already been reduced by adaptive throttling.

Suggested contract:

  1. Keep scheduler semaphore as a coarse CPU/memory guard only (not the source of truth for API concurrency).
  2. Gate every LLM outbound call through the shared throttle manager (keyed by provider/model/domain).
  3. Ensure tasks waiting on throttle/backoff do not hold scheduler worker slots.
  4. Add tests for isolation/fairness: 429 on key A should not stall key B.

Intended interaction

flowchart TB
    S["Async Scheduler"] --> Q["Ready Task Queue"]
    Q --> W["Worker Slot (coarse cap)"]
    W --> T{"LLM-bound task?"}

    T -->|No| N["Run non-LLM task"]
    N --> C["Completion Tracker"]

    T -->|Yes| A["Acquire permit from Throttle Manager (provider+model+domain)"]
    A --> R["ModelClient adapter call"]
    R --> P["Provider API"]
    P --> X{"429?"}
    X -->|Yes| D["AIMD decrease + cooldown + retry"]
    D --> A
    X -->|No| U["AIMD increase"]
    U --> C
Loading

Failure mode to avoid

flowchart TB
    G["Single global scheduler semaphore"] --> H["Many tasks for throttled key A"]
    H --> I["Key A hits 429 and backs off"]
    I --> J["Tasks wait/retry while holding scheduler slots"]
    J --> K["Unrelated key B tasks are delayed/starved"]
Loading

If we align on this, the async task queue and adaptive throttling should compose cleanly without fighting each other.

@nabinchha
Copy link
Contributor

Thanks for putting this plan together. The direction looks very good overall. I have a few suggestions to consider and address before implementation, mainly around failure modes and runtime safety.

Suggestions to Consider and Address

  1. required_columns alone may not be enough for dependency resolution.
    The plan currently builds dependencies from required_columns only (L53-L55, L108-L113).
    It may be helpful to also include side-effect dependency mapping (for example __trace, __reasoning_content) to avoid edge cases where valid configs are delayed or never become ready.

  2. Whole-record replacement might cause update clobbering under parallel execution.
    The “maybe keep update_record” option (L191-L193) could overwrite prior column results when two independent columns update the same row concurrently.
    A cell-level merge update (or row-level merge protection) would likely be safer.

  3. Row-index completion tracking could conflict with resize behavior.
    The tracker assumes stable row indices (L123-L127), while current behavior supports allow_resize.
    It may help to either scope async v1 to fixed row counts or define row identity/versioning for resize paths.

  4. Seeding all row groups at startup may assume generator reentrancy.
    Step 4 suggests seeding all row groups up front (L155).
    For stateful from-scratch generators, serialization per generator instance may be safer unless reentrancy is guaranteed.

  5. Global scheduler semaphore and adaptive throttling would benefit from an explicit contract.
    The global cap proposal (L81-L83) may conflict with adaptive per-key throttling behavior in the model-facade plan.
    It may be good to define scheduler semaphore as a coarse resource guard only, with outbound LLM calls always gated by shared throttle manager.

  6. Small wording note on asyncio.to_thread.
    In Risks, the wording implies copy semantics (L245).
    to_thread passes object references; so if full-column tasks share the same DataFrame, serialization or defensive copies would help avoid concurrent mutation issues.

Suggestion for failed-cell recovery (retry rounds)

Context: In deep SDG workflows (many dependent columns), a failure near the end currently drops the entire row, which means we lose all upstream generation work already completed for that row. In practice, this can inflate drop counts and push the pipeline toward error-threshold shutdowns, especially when LLM endpoints are transiently unstable. Re-queuing failed cells for controlled retry rounds should improve the chance of producing complete rows and reduce avoidable row loss.

I like the idea of adding end-of-round retry attempts for failed cells. A possible pattern:

  1. Classify failures as retryable vs non-retryable.
  2. Put retryable failures into a deferred queue with attempt, next_eligible_at, and backoff+jitter.
  3. Continue normal scheduling first.
  4. When ready queue drains, run N salvage retry rounds over deferred tasks.
  5. Ensure retries re-enter throttle acquire path.
  6. After retry budget is exhausted, mark final failure/drop and continue row-group completion checks over remaining rows.

Optional plan updates

  1. Add a short “Retry & salvage policy” section in Key Design Decisions.
  2. Add run-config knobs for async scheduler retry rounds/backoff.
  3. Clarify async-v1 scope around allow_resize and row-mutating pre-batch behavior.
  4. Add tests for side-effect dependencies, retry salvage success/failure, and key-isolated throttling behavior.

- Decouple scheduler semaphore (coarse resource guard) from PR #344's
  adaptive throttle manager (per-key API concurrency)
- Add side-effect output column mapping to dependency resolution
- Mandate cell-level merge writes, remove unsafe update_record option
- Add is_stateful generator property for reentrancy control
- Add retry & salvage policy for transient failures
- Scope allow_resize out of async v1 (falls back to sync)
- Fix asyncio.to_thread reference semantics, require defensive copies
- Add new test cases for all above
@andreatgretel
Copy link
Contributor Author

Thanks for flagging this — the interaction between the scheduler and adaptive throttling is something that needed to be made explicit.

I've updated the plan to adopt essentially what you're describing. The concurrency control section (now Section 4) defines two independent layers:

  1. Scheduler semaphore — coarse resource guard only (configurable, default ~128). Bounds total in-flight coroutines to limit CPU/memory pressure. Not the source of truth for API concurrency.
  2. Throttle manager (from PR chore: plans for model facade overhaul #344) — gates every outbound LLM call, keyed by provider+model(+domain). This is the real API concurrency control with AIMD.

The key behavioral contract: tasks release their scheduler slot before awaiting the throttle permit. So a task acquires a slot, prepares its request, releases the slot, then waits for the throttle manager. This means tasks blocked on a throttled key don't consume scheduler capacity, and unrelated keys flow freely.

I've also added a test case in Step 8 specifically for isolation/fairness: "429 on model key A does not stall unrelated model key B tasks."

The mermaid diagram you included maps cleanly to this — the scheduler is the "Worker Slot (coarse cap)" node, and the throttle manager acquire is a separate gate before the provider API call.

@andreatgretel
Copy link
Contributor Author

Great feedback across the board. I've updated the plan to address each point:

1. Side-effect dependency mapping — Agreed. The dependency map now also registers side-effect output columns (__trace, __reasoning_content, etc.) and maps them back to their producer generator. So if a downstream column references summary__trace, it resolves to a dependency on the summary generator. Updated in Section 1 and Step 1, with a dedicated test case in Step 8.

2. Update clobbering — Agreed. The plan now mandates update_cell(row_group, row_index, column, value) as the only write path for the async builder. Whole-record replacement (update_record) is explicitly called out as unsafe under parallel execution. Updated in Step 6.

3. allow_resize scoping — After looking into this more carefully, allow_resize lets any generator (not just pre-batch processors) change the row count mid-pipeline. This would invalidate per-row completion state for all downstream columns and require dynamic rescheduling + row identity tracking. Rather than partially solving this, async v1 will fall back to the sync path when any column config has allow_resize=True. This is safe since resize is opt-in and the sync path handles it naturally. Added as a new design decision section (Section 8), a risk entry, a builder integration check in Step 7, and listed under "What we're NOT doing."

4. Generator reentrancy — Good catch. Rather than hardcoding behavior per generator type, I've added a new design decision (Section 5) introducing an is_stateful property on the base ColumnGenerator class (default False). The scheduler serializes tasks per-instance for stateful generators — row group N must complete before N+1 starts for that generator. Stateless generators dispatch all row groups concurrently. For example, SeedDatasetColumnGenerator sets is_stateful = True (it maintains a DuckDB batch reader cursor and leftover-row buffer), while SamplerColumnGenerator stays False. This is extensible to custom generators too. Updated in Steps 4 and 5, with a test case in Step 8.

5. Scheduler semaphore vs throttling — Addressed in the response to your other comment (same solution: two independent layers, tasks release slots during throttle wait).

6. asyncio.to_thread semantics — Fixed. The Risks section now correctly states that to_thread passes object references, not copies. Full-column generators sharing a DataFrame must receive a defensive df.copy() before being dispatched to a thread, with results merged back by column name. Also reflected in Step 5 where each generator's async wrapper is defined.

Retry & salvage — Great suggestion, added as a new design decision section (Section 7). The plan now includes:

  • Failure classification (transient → retryable, permanent → immediate drop)
  • Deferred queue with attempt count, next-eligible timestamp, exponential backoff + jitter
  • Salvage rounds after ready queue drains (configurable via async_scheduler_max_retries, default 2)
  • Retries go through the throttle manager acquire path
  • Final drop after budget exhaustion

Test cases added in Step 8 for retry success, non-retryable immediate drop, and budget exhaustion.

Let me know if anything needs further refinement.

- Only one of generate/agenerate needs to be implemented; base class
  bridges in both directions (to_thread for sync→async, asyncio.run
  for async→sync)
- Document impact on column generator plugins, custom columns, and
  processor plugins (all backward-compatible, no changes required)
- Add plans/346/diagrams.md with 5 Mermaid diagrams: task lifecycle,
  scheduler main loop, dependency resolution example, concurrency
  layers, and row group pipelining
- Clarify in plan that statefulness (concurrency safety) and sync/async
  (I/O model) are orthogonal concerns
- Eager row-drop propagation: failed rows are dropped across all columns
  to avoid wasting compute on incomplete rows
- Out-of-order row group checkpointing via index-based file naming
- Pre-batch processor failure skips the entire row group
- Salvage rounds get separate error threshold and config knobs
- Undersized last row group note
- Open questions: thread pool sizing, silent task hangs
- add submission budget controls to prevent unbounded parked tasks
- clarify DAG validation, safe async-to-sync bridging, and row-scoped drop policy
- align diagrams and unresolved risk wording with latest design decisions
- add UX considerations section: progress display strategy, peak memory
  cap, new config knobs, async custom columns and plugin upgrade path,
  what stays the same
- replace allow_resize silent fallback with explicit DatasetGenerationError
  at startup; move to Follow-ups section
- consolidate all deferred work into Out of scope / Follow-ups subsections
- fix five internal inconsistencies: progress tracking in Step 4, missing
  async_max_concurrent_row_groups in scheduler constructor, annotate
  _ensure_async_engine_loop as existing, SamplerColumnGenerator dual-wrapper
  scope (applies to all FromScratchColumnGenerator subclasses),
  stateful serialization vs row group admission clarification
- resolve previously open decisions: asyncio.Event over Condition,
  task_model as own module, async_max_concurrent_row_groups default 3,
  async_salvage_max_attempts_per_task dropped in favour of max_rounds+1
  semantics, thread pool keep default for v1
- fix CustomColumnGenerator FULL_COLUMN async path (needs own agenerate
  branching on strategy); note ValidationColumnGenerator internal threading
- add "Relation to PR #269" section explaining what we adopted
  (dependency source, trait inference, completion tracker design,
  statefulness separation) and what we changed (row-group tasks
  instead of cell-level nodes, ROW_STREAMABLE omitted)
- fix scheduler main loop diagram: add async_max_concurrent_row_groups
  admission step, pre-batch failure path (skip row group + release
  slot), and loop back to ADMIT after row group completion
- TaskTrace dataclass spec in Step 3 (opt-in, zero overhead when disabled)
- trace=True param on AsyncTaskScheduler constructor in Step 4
- Step 8 benchmark references trace for timing measurements
- New Profiling section: instrumentation points, example output table, usage snippet
- Replace dependency map with static ExecutionGraph class (upstream,
  downstream, strategy, topological_order, critical_path, task_count,
  to_mermaid accessors)
- Use row-group-local indices in CompletionTracker instead of global
- Clarify from-scratch columns are FULL_COLUMN with empty upstream
  deps, not a separate strategy enum value
- Remove reference to non-existent ColumnGeneratorCellByCell
- Expand PR #269 comparison: ExecutionTraits → GenerationStrategy
nabinchha
nabinchha previously approved these changes Feb 25, 2026
Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM @andreatgretel. The only thing is allow_resize scoped out of async v1 (falls back to sync) in the PR description being out of sync with what's in the plan.

@nabinchha
Copy link
Contributor

nabinchha commented Feb 25, 2026

The only other suggestion I have is to try to break down the proposed work into several smaller PRs (in the plan itself) that can land into main.

- Correct "N columns, N edges" to "O(C) nodes, O(C²) edges worst-case"
- Add dispatched set param to get_ready_tasks to prevent double-dispatch
- Clarify is_row_group_complete drop_row interaction
andreatgretel and others added 6 commits February 25, 2026 15:18
- Add PR breakdown section with 4 PRs, dependency graph, and
  "what works after merge" for each
- Add code-sketches.md with structural sketches of main components
- Reorganize test plan by PR (unit tests per PR, integration in PR 4)
- Note that throttle manager (PR #344) is optional; scheduler works
  without it initially
- Rename scheduler semaphore to execution semaphore for clarity
- Split execution semaphore from submission budget as distinct
  concerns with separate semaphores
- Add reacquire step to dispatch pattern after throttle wait
- Add multi-column generator handling via instance dedup on the
  scheduler (graph stays column-level)
GIL contention with CPU-bound custom generators and event loop
starvation with native async compute are documented as v1 risks.
ProcessPoolExecutor routing via is_cpu_bound noted as follow-up.
Compute-heavy tasks saturating the thread pool starve I/O-bound
tasks (LLM calls) from acquiring threads, not just GIL contention.
Compute-bound plugins should implement generate(), not agenerate(),
to keep CPU work off the event loop. Same rule for custom columns:
only use async def for I/O-bound work.
Copy link
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @andreatgretel! this is a great example of a spec-driven PR 🙏

@andreatgretel andreatgretel merged commit 47e52e5 into main Feb 25, 2026
47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants