feat: concurrency per label with head-of-line skip (#1304)#1314
Conversation
Why: a single global concurrency limit cannot model per-API rate concerns (e.g. limiting OpenAI calls to 3 in flight while keeping unrelated nodes running freely). Adds optional per-label limits that compose with the existing global limit. - type.ts: add ComputedNodeData.label and ConcurrencyConfig union for GraphData.concurrency. Number form remains valid. - task_manager.ts: constructor accepts number | ConcurrencyConfig. dequeueTaskIfPossible walks the priority-sorted queue and dispatches the first task whose label still has capacity (HoL skip), so a saturated label cannot block other work. onComplete drains as many newly-eligible tasks as the global limit allows. - node.ts / validators: read and validate the new fields. Tests: - New tests/units/test_task_manager.ts (19 tests) - locks current TaskManager behavior (priority order, FIFO, nesting, reset) and covers the new label semantics (HoL skip, mixed labeled/unlabeled, slot release). - tests/validators/test_validator_graph_data.ts gains 10 tests for the object-form concurrency validator. Backward compatibility: concurrency: number and unlabeled nodes behave identically to before. prepareForNesting still bumps the global limit only. Follow-ups split out as separate issues: - #1311 aging (queued-task priority bump) - #1312 weight (weighted concurrency) - #1313 token-bucket RPM/TPM (with rationale for why concurrency-only is insufficient for TPM and burst control) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
📝 WalkthroughWalkthroughAdds optional per-node ChangesPer-Label Concurrency Control
Sequence DiagramsequenceDiagram
participant Client as Task Producer
participant Manager as TaskManager
participant Queue as Priority Queue
participant Running as Running Tracker
participant Worker as Task Executor
Client->>Manager: addTask(task { label? }, priority)
Manager->>Manager: normalizeConcurrencyConfig / canRun(task)
alt eligible (global && label OK)
Manager->>Running: increment global & label counters
Manager->>Worker: dispatch task callback
else ineligible
Manager->>Queue: enqueue task (priority-sorted)
end
Worker->>Manager: onComplete(task)
Manager->>Running: decrement global & label counters
Manager->>Queue: scan queue (HoL skip) for first eligible task
loop dispatch while eligible tasks exist
Manager->>Running: increment counters
Manager->>Worker: dispatch next eligible task
Manager->>Queue: remove dispatched entry
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 6 minutes and 35 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/graphai/src/validators/common.ts`:
- Line 16: computedNodeValidator currently accepts a `label` key without
checking its type, allowing non-string values (e.g., numbers/booleans) to pass
and bypass TaskManager's per-label limits; update the validator (the
computedNodeValidator in common.ts) to validate that node.label is a string
before accepting the key—reject or throw for non-string label values so only
string labels are permitted and TaskManager label-based caps work correctly.
In `@packages/graphai/src/validators/graph_data_validator.ts`:
- Around line 18-32: The concurrency object validation must reject unknown keys:
after verifying concurrency is an object and that "global" exists, ensure the
object only contains the allowed keys "global" and optionally "labels" by
inspecting Object.keys(concurrency) and throwing a ValidationError if any other
key is present; keep using validateConcurrencyValue(concurrency.global,
"Concurrency.global") and the existing labels branch (typeof concurrency.labels
checks and validateConcurrencyValue for each label) but add the extra-key check
before validating values so typos like "label" fail fast.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ba501093-d1c6-4063-9234-68a8b93bcbc3
📒 Files selected for processing (8)
packages/graphai/src/node.tspackages/graphai/src/task_manager.tspackages/graphai/src/type.tspackages/graphai/src/validators/common.tspackages/graphai/src/validators/graph_data_validator.tspackages/graphai/tests/units/test_task_manager.tspackages/graphai/tests/validators/test_validator_graph_data.tsplans/feat-concurrency-per-label.md
Findings from Codex (#1314 review): 1. prepareForNesting/restoreAfterNesting were label-unaware, allowing a deadlock when a parent node holding the only slot for label X invokes a nested graph whose children also use label X. Fix: both methods now accept an optional label argument; node.ts passes its own label so the parent's label-bucket gets a temporary +1 slot for the nested children, mirroring the existing global-slot escape hatch. 2. ComputedNode.label was copied through without runtime validation, so a malformed (non-string) label from YAML/JSON would silently bypass the per-label limit (Map<string, ...>.get(<non-string>) -> undefined, treated as untracked). Fix: defensive coercion in ComputedNode + a schema check in computed_node_validator. Also addresses user feedback to lean on existing utility predicates: - task_manager.normalizeConcurrencyConfig now uses isObject for the labels guard. - graph_data_validator now uses isObject + isNull instead of inlined null/typeof/Array.isArray combinations. Tests added: - TaskManager: nested-graph deadlock prevention, untouched behavior when label is undefined or unconfigured. - computed_node_validator: rejects non-string label, accepts string label. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codex cross-review iteration 1 — addressedCodex flagged two issues (network-isolated, so verdict was returned to local CLI rather than posted here). Both addressed in 96ba009: 1. Nested-graph deadlock with per-label limitsCodex finding: Fix: 2. Runtime label validationCodex finding: Fix:
Also (per user feedback)Refactored Tests added (graphai package now: 229 → 235)
All 19 workspace test suites pass ( |
Findings from Codex iteration-2: 1. Exception safety of nested-graph slot bookkeeping. prepareForNesting was paired with a happy-path-only restoreAfterNesting, so a throw inside the agent body leaked both the global concurrency bump and (after iteration-1) the label bump. This was a pre-existing bug for the global slot but became more impactful with the label addition. Fix: wrap the nested execution block in try/finally so restoreAfterNesting always runs. 2. validateConcurrencyConfig was accepting concurrency.labels: null after the iteration-1 isNull refactor relaxed the check. The declared type is Record<string, number>; null is malformed. Fix: compare against undefined directly so only the absent sentinel skips validation. 3. Iteration-1 tests did not actually exercise the label decrement path -- the only post-restore assertion was the global concurrency value. Strengthened test_task_manager.ts so the nested-graph test schedules two openai tasks after restoreAfterNesting and confirms one queues (which would not happen if the label limit were left bumped). Plus, per user feedback to cover negative patterns more thoroughly, broadened the validator suites: - test_validator_computed_node.ts: label rejected for null, array, object, boolean (in addition to number); valid for omitted and string. - test_validator_graph_data.ts: top-level concurrency rejected for null, boolean, array. concurrency.global rejected for string, null, negative, float. concurrency.labels.<key> rejected for string, null, negative, boolean, object. Empty-key edge case still validates. graphai package tests: 235 -> 254. All 19 workspace test suites pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codex cross-review iteration 2 — addressed (f804289)1. Exception safety of nested-graph slot bookkeepingCodex finding: Fix: 2.
|
Findings from Codex iteration-3:
1. The iteration-2 try/finally block did not protect the
forNestedGraph construction. resultOf() can throw when nestedGraph
is a DataSource, leaving prepareForNesting's bump unrestored.
Fix: move both prepareForNesting and the forNestedGraph build
inside the try block, and gate restoreAfterNesting on a
`nestingPrepared` flag that is set only after prepareForNesting
actually returned. This way:
- throw inside the build -> finally runs, restore is called.
- throw inside prepareForNesting itself -> flag stays false,
restore is NOT called (no bump to undo).
2. validateConcurrencyConfig accepted any non-array object for
labels, so Map/Date/class instances would pass typeof === "object"
and silently disable enforcement (Object.entries yields nothing).
It also ignored unexpected extra keys on the concurrency object,
so a typo like `labls` would be silently dropped.
Fix: introduce isPlainObject (proto === Object.prototype || null)
and require it for both the concurrency object and labels. Reject
any keys other than "global"/"labels". Tightened
validateConcurrencyValue to also require typeof === "number" before
the Number.isInteger check.
Tests:
- Reject extra keys on concurrency: { global, foo }.
- Reject Map / Date as concurrency value (call validateGraphData
directly to bypass anonymization's JSON round-trip, which would
otherwise mask the class identity).
- Reject Map / class instance as labels value.
graphai package tests: 254 -> 261. All 19 workspace suites pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codex cross-review iteration 3 — addressed (5d7314c)1. try/finally still left a gap before agent invocationCodex finding: Fix: pulled both 2. Validator accepted non-plain-object shapesCodex finding: Fix:
Tests
Test countsgraphai package: 254 → 261 tests (all pass). 19 workspace suites: fail count 0. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/graphai/src/task_manager.ts`:
- Around line 127-138: prepareForNesting currently increments the global
labelLimits (mutating labelLimits) which temporarily widens the shared cap and
lets unrelated tasks with the same label consume that slot; instead, stop
mutating labelLimits and implement a per-nesting reservation tracked on the
TaskManager instance (e.g., a new Map like nestingReservations keyed by the
nesting path or a per-run id) that increments on prepareForNesting(label) and
decrements on restoreAfterNesting(label), and change throttling/availability
checks to consult labelLimits minus any global usage plus the current run's
nestingReservations (so only the current nested path sees the reserved
capacity); update prepareForNesting and restoreAfterNesting to operate on this
per-nesting reservation map and add a regression test that runs a labeled parent
that nests without using the label while a queued sibling with the same label is
waiting and assert the sibling does not start during the parent’s nesting
window.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0f890848-88fa-470f-a785-d9decc27ce65
📒 Files selected for processing (7)
packages/graphai/src/node.tspackages/graphai/src/task_manager.tspackages/graphai/src/validators/computed_node_validator.tspackages/graphai/src/validators/graph_data_validator.tspackages/graphai/tests/units/test_task_manager.tspackages/graphai/tests/validators/test_validator_computed_node.tspackages/graphai/tests/validators/test_validator_graph_data.ts
Codex cross-review iteration 4 — LGTMCodex returned LGTM with one nit: Decision: defer. GraphAI's graph data is sourced from JSON.parse or in-process programmatic construction; cross-realm objects are not a realistic path for this codebase. The constraint matches lodash's Final status
Ready for human review and CI green-light. |
CodeRabbit raised that prior iterations widened the shared per-label
cap during nesting (labelLimits++), which meant unrelated tasks
sharing that label could opportunistically take the bumped slot --
defeating the throttle even when the nested graph never used the label.
Fix: introduce a per-parent-graphId bypass map.
nestingBypassByLabel: Map<label, Map<parentGraphId, count>>
prepareForNesting(label, parentGraphId) records a bypass for that
parent without touching labelLimits. canRun() consults the bypass
ONLY for tasks whose graphId differs from the parent's. So:
- parent on graphId X with label openai (limit 1) is running
- sibling on graphId X with label openai is queued
- parent invokes nested graph -> bypass recorded for parent X
- canRun(sibling): same graphId as parent -> bypass NOT applied
-> sibling stays queued (correct: per-label throttle preserved)
- canRun(child on nested graphId Y): different from parent -> +1 cap
-> child runs (correct: deadlock avoided)
Also: prepareForNesting now drains the queue (its global slot bump
or label bypass can immediately enable a queued task).
The Map<Map<>> shape is necessary: a single graph can host two
labeled nodes simultaneously invoking nested graphs, so refcounting
per parentGraphId is required (a Set would deduplicate). Kept as
the minimum data shape that handles refcount correctly.
Documented residual edge case in test file: two completely independent
top-level graphs sharing a TaskManager would still let an "unrelated"
queued node consume the bypass, because graphId comparison alone cannot
distinguish "my actual nested child" from "an unrelated independent graph
sharing this manager." That scenario is not part of GraphAI's documented
hierarchical usage and would require threading parent-child graphId
binding through the GraphAI constructor (out of scope).
Tests:
- New: child on nested graphId runs, sibling on parent's graphId blocks.
- Updated existing nesting tests to use the (label, parentGraphId)
signature and to model real nested-vs-sibling graphId topology.
graphai package tests: 261 -> 264. All 19 workspace suites pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CodeRabbit finding addressed (77ec2cf)CodeRabbit pointed out a subtle but real bug not caught by Codex: prior iterations widened the shared per-label cap during nesting ( Fix
nestingBypassByLabel: Map<label, Map<parentGraphId, count>>
The Map-of-Maps shape is the minimum needed for correctness: a single outer graph can host two labeled nodes invoking nested graphs simultaneously, so refcount per Other adjustments
Documented residual edge caseTwo completely independent top-level graphs sharing a single TaskManager would still let an "unrelated" queued node consume the bypass, because graphId comparison alone cannot distinguish "my actual nested child" from "an independent graph that happens to share this manager." That is not part of GraphAI's documented hierarchical usage; closing it would require threading parent→child graphId binding through the GraphAI constructor, which is out of scope here. Documented in Test countsgraphai package: 261 → 264 (all pass). 19 workspace suites: fail count 0. |
Cross-review evaluation of CodeRabbit + Codex findingsI reviewed every reviewer comment on this PR and evaluated whether each finding is valid and whether the fix is correct. Summary first, details below.
All actionable findings are valid and have been correctly addressed. No reviewer comment was dismissed without a defensible reason. CodeRabbit comment 1 —
|
Followups from snakajima's audit on #1314: 1. plan doc had a stale name `globalLimit` that did not match the actual field `concurrency` in TaskManager. Fixed. 2. TaskManager constructor previously silently coerced a malformed ConcurrencyConfig (e.g. labels as Array, Map, or class instance) into something benign, so direct callers bypassing validateGraphData could end up with broken label enforcement without any error. Fix: - Promoted `isPlainObject` to packages/graphai/src/utils/utils.ts (single source of truth, used by both task_manager and the graph_data_validator). - Hardened `normalizeConcurrencyConfig` to fail fast: rejects non-plain-object configs, non-integer / non-positive values, non-plain-object labels, and bad label values. - Reordered GraphAI constructor to run validateGraphData / validateAgent BEFORE constructing TaskManager, so user-facing ValidationError keeps wrapping malformed graph data instead of TaskManager's internal Error. New tests in test_task_manager.ts cover: zero/negative/non-integer concurrency, null/array/Date as config, Map/class-instance/Array as labels, and non-integer label values. Out of scope for this PR (filed as #1315): - reset() does not restore the original concurrency value, so prepare/restore left dangling across reset() can drift the global cap. Pre-existing bug, separate issue. graphai package tests: 264 -> 270. All 19 workspace suites pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Followups from @snakajima's audit (0e8e395)1. Plan doc naming inconsistency — fixed
2. Direct TaskManager construction not guarded — fixedThe previous
7 new tests in 3. Out of scope — filed as separate issue#1315 — Test countsgraphai package: 264 → 270 (all pass). 19 workspace suites: fail count 0. CI will rerun automatically. |
Closes #1304
Summary
GraphData.concurrencyis extended to accept either a number (legacy) or{ global: number, labels?: Record<string, number> }.ComputedNodeDatagains an optionallabel: string. When set on a node and the label has a configured limit, that node counts against the per-label cap as well as the global cap.TaskManagerwalks the priority-sorted queue and dispatches the first task whose label still has capacity ("head-of-line skip"), so a saturated label cannot stall unrelated work.concurrency: numberand unlabeled nodes behave exactly as before.Items to Confirm / Review
onCompletedrains aggressively — when a label slot opens,onCompletekeeps dispatching while the global limit allows. This is what makes mixed labeled/free-label workloads behave like the user expects, but please sanity-check the loop condition intask_manager.ts:108-114.prepareForNesting()/restoreAfterNesting()still touch only the global limit. Per-label limits are not bumped when nested graphs run. If a nested graph's tasks share a label with the parent, they share the cap. Flag if a different policy is wanted.Concurrency.global must be ...,Concurrency.labels.<key> must be ...) follow the existingConcurrency must be ...style. Check the tone matches your preference.User Prompt
Implementation Approach
Order of work (TDD-ish)
TaskManagerhad zero direct unit tests. Validator only had 4 negative tests forconcurrency.yarn build,yarn test(all 19 workspaces, fail 0),yarn format.Files changed
packages/graphai/src/type.tsComputedNodeData.label,ConcurrencyConfig, unionconcurrencypackages/graphai/src/task_manager.tspackages/graphai/src/node.tsdata.labelintoComputedNode.labelpackages/graphai/src/validators/common.tslabeladded to allowed computed-node attribute keyspackages/graphai/src/validators/graph_data_validator.tspackages/graphai/tests/units/test_task_manager.tspackages/graphai/tests/validators/test_validator_graph_data.tsplans/feat-concurrency-per-label.mdAlgorithm sketch (TaskManager.dequeueTaskIfPossible)
onCompletethen loopsdequeueTaskIfPossiblewhile progress is being made, so a single completion can dispatch multiple newly-eligible queued tasks (bounded by the global limit).Validation
yarn build— succeeds across all workspacesyarn test— 19 workspaces, 0 failures (graphai package: 212 → 229 tests after this PR)yarn format— cleanyarn pack --dry-runnot applicable (no package.json change)yarn eslint— preexisting errors only in untracked scratch files (tests/ensemble.ts,tests/runner.ts,tests/units/test_aaa.ts,tests/units/test_graph_aaa.ts); no errors introduced by this PRTest plan
concurrency: { global: 10, labels: { openai: 2 } }, send 6 nodes labeledopenaiand confirm only 2 run concurrently while other labels run freelyconcurrency: 5graphs continue to work without changesOut of scope (followups)
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Documentation