feat(api): Live-State custom procedures — plan §1–§15 (through timeline update)#248
feat(api): Live-State custom procedures — plan §1–§15 (through timeline update)#248pedroscosta wants to merge 30 commits into
Conversation
Replace deprecated collection insert/update on pipelineIdempotencyKey and pipelineJob with internalApiKey-gated procedures (upsert, invalidate, create, update) and migrate the worker pipeline callers. Made-with: Cursor
There was a problem hiding this comment.
1 issue found across 3 files
Confidence score: 3/5
- There is a concrete concurrency risk in
apps/api/src/live-state/router.ts:upsertuses a non-atomic find-then-insert/update pattern, which can intermittently fail under parallel requests for the same key. - Given the 7/10 severity and high confidence (8/10), this is more than a minor code-quality note and could cause user-visible write failures in production race conditions.
- Pay close attention to
apps/api/src/live-state/router.ts- make the upsert path atomic (or handle unique-key conflicts safely) to avoid intermittent request failures.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="apps/api/src/live-state/router.ts">
<violation number="1" location="apps/api/src/live-state/router.ts:756">
P1: `upsert` is implemented as a non-atomic find-then-insert/update, so concurrent requests for the same key can hit unique-key races and fail intermittently.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
Wrap upsert in a short transaction: update when row exists, otherwise insert and recover from Postgres unique violations on concurrent inserts for the same key. Invalidate uses the same transactional read-then-update pattern. Made-with: Cursor
Made-with: Cursor
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request migrates internal pipeline collections from generic Live-State mutations to explicit procedure-based mutations with built-in authorization and transaction handling. The changes include implementing custom procedures for Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
docs/live-state-custom-procedures-plan.md (1)
9-9: Clean up MD037 emphasis spacing to keep markdownlint green.There are many inline emphasis spans with spaces inside marker boundaries (for example around inline code). Normalizing those will remove the repeated MD037 warnings.
Also applies to: 21-23, 32-32, 44-45, 59-59, 74-74, 92-92, 105-105, 124-124, 128-128, 144-145, 157-157, 161-161, 172-172, 176-176, 188-188, 193-193, 206-206, 212-213, 234-234, 247-247, 254-254, 273-274, 278-278, 293-294, 298-298, 314-315, 319-320, 325-325, 335-337, 373-373
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/live-state-custom-procedures-plan.md` at line 9, Normalize inline emphasis spacing across the document by removing spaces between emphasis markers and their content (e.g., change " `**insert: () => false`** " to "`**insert: () => false**`" and similarly for "`**update: false**`"); scan occurrences listed (including the example snippets `insert: () => false` and `update: false`) and update each inline code/emphasis span so there are no spaces inside the markdown emphasis/backtick markers to satisfy MD037.apps/worker/src/pipeline/core/idempotency.ts (1)
169-173: Batch upserts are currently serialized; this can slow larger runs.Line 169 performs one network mutation per key sequentially. If ordering is not required, parallelizing these upserts will reduce latency.
⚡ Suggested refactor
try { - for (const { key, hash } of keyHashPairs) { - await fetchClient.mutate.pipelineIdempotencyKey.upsert({ - key, - hash, - }); - } + await Promise.all( + keyHashPairs.map(({ key, hash }) => + fetchClient.mutate.pipelineIdempotencyKey.upsert({ + key, + hash, + }), + ), + ); return true; } catch (error) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/worker/src/pipeline/core/idempotency.ts` around lines 169 - 173, The current loop in idempotency.ts is performing sequential network upserts via fetchClient.mutate.pipelineIdempotencyKey.upsert which slows large runs; change the logic to fire the upserts in parallel (e.g., map the keys to calls to fetchClient.mutate.pipelineIdempotencyKey.upsert and await Promise.all) when ordering is not required, or use a bounded concurrency helper (p-map or a simple limit) if you need to cap concurrent requests; ensure you preserve existing error handling semantics by collecting/rethrowing/reporting errors from the parallel calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/api/src/live-state/router.ts`:
- Around line 872-879: The insert into pipelineJob using
db.insert(schema.pipelineJob, ...) must be made idempotent: wrap that call in a
try/catch, and on a duplicate-key/unique-constraint error (e.g., Postgres
'23505' or error message containing 'duplicate'/'unique') treat it as a success
path instead of throwing—either return the existing row (by selecting where id =
req.input.id) or proceed as if the insert succeeded; ensure this logic is
applied around the code that constructs/invokes db.insert(schema.pipelineJob,
...) so transient retries with the same id do not produce hard failures.
In `@docs/live-state-custom-procedures-plan.md`:
- Line 5: The document uses "github" in two places; update both occurrences to
use the official product capitalization "GitHub" (specifically change the
instance inside the bolded app list that currently reads `**apps/github**` and
the other occurrence on line 17) so the text consistently uses "GitHub" across
the file.
---
Nitpick comments:
In `@apps/worker/src/pipeline/core/idempotency.ts`:
- Around line 169-173: The current loop in idempotency.ts is performing
sequential network upserts via fetchClient.mutate.pipelineIdempotencyKey.upsert
which slows large runs; change the logic to fire the upserts in parallel (e.g.,
map the keys to calls to fetchClient.mutate.pipelineIdempotencyKey.upsert and
await Promise.all) when ordering is not required, or use a bounded concurrency
helper (p-map or a simple limit) if you need to cap concurrent requests; ensure
you preserve existing error handling semantics by
collecting/rethrowing/reporting errors from the parallel calls.
In `@docs/live-state-custom-procedures-plan.md`:
- Line 9: Normalize inline emphasis spacing across the document by removing
spaces between emphasis markers and their content (e.g., change " `**insert: ()
=> false`** " to "`**insert: () => false**`" and similarly for "`**update:
false**`"); scan occurrences listed (including the example snippets `insert: ()
=> false` and `update: false`) and update each inline code/emphasis span so
there are no spaces inside the markdown emphasis/backtick markers to satisfy
MD037.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b93f5193-6a9f-4804-bd16-8994c49bcba4
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
apps/api/src/live-state/router.tsapps/worker/src/pipeline/core/idempotency.tsapps/worker/src/pipeline/core/persistence.tsdocs/live-state-custom-procedures-plan.mdpackage.json
| await db.insert(schema.pipelineJob, { | ||
| id: req.input.id, | ||
| name: req.input.name, | ||
| status: req.input.status, | ||
| metadataStr: req.input.metadataStr, | ||
| createdAt: req.input.createdAt, | ||
| updatedAt: req.input.updatedAt, | ||
| }); |
There was a problem hiding this comment.
pipelineJob.create should be retry-safe (idempotent) to avoid duplicate-key hard failures.
A transient timeout/retry with the same id can hit a unique constraint and fail even though the job row already exists. Please handle duplicate-key as an idempotent success path.
🛡️ Suggested retry-safe create pattern
).handler(async ({ req, db }) => {
if (!req.context?.internalApiKey) {
throw new Error("UNAUTHORIZED");
}
- await db.insert(schema.pipelineJob, {
- id: req.input.id,
- name: req.input.name,
- status: req.input.status,
- metadataStr: req.input.metadataStr,
- createdAt: req.input.createdAt,
- updatedAt: req.input.updatedAt,
- });
+ try {
+ await db.insert(schema.pipelineJob, {
+ id: req.input.id,
+ name: req.input.name,
+ status: req.input.status,
+ metadataStr: req.input.metadataStr,
+ createdAt: req.input.createdAt,
+ updatedAt: req.input.updatedAt,
+ });
+ } catch (error: unknown) {
+ if (!isPostgresUniqueViolation(error)) {
+ throw error;
+ }
+ const existing = await db.findOne(schema.pipelineJob, req.input.id);
+ if (!existing) {
+ throw error;
+ }
+ }
return { success: true as const };
}),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/api/src/live-state/router.ts` around lines 872 - 879, The insert into
pipelineJob using db.insert(schema.pipelineJob, ...) must be made idempotent:
wrap that call in a try/catch, and on a duplicate-key/unique-constraint error
(e.g., Postgres '23505' or error message containing 'duplicate'/'unique') treat
it as a success path instead of throwing—either return the existing row (by
selecting where id = req.input.id) or proceed as if the insert succeeded; ensure
this logic is applied around the code that constructs/invokes
db.insert(schema.pipelineJob, ...) so transient retries with the same id do not
produce hard failures.
There was a problem hiding this comment.
0 issues found across 3 files (changes from recent commits).
Requires human review: This is a significant structural refactor that migrates core pipeline idempotency and job logic into API procedures, requiring human review of the new transaction logic and auth checks.
…ates Made-with: Cursor
Made-with: Cursor
Add author.create (idempotent) and author.update procedures, disable generic author insert/update, and migrate web/devtools, Discord, and Slack call sites. Extend web optimistic mutations for standalone author flows. Made-with: Cursor
Add invite.create, cancel, accept, and decline with withProcedures; disable generic invite update. Move bulk invite flow from organizationUser.inviteUser to invite.create. Team settings uses WebSocket mutate for create/cancel with optimistic cancel. Remove unused resend import from router.ts. Made-with: Cursor
… §6) Replace generic collection writes with withProcedures: validate, create, recrawl, and discriminated update (setProgress for the worker, patch/delete for owners). Point web and crawl-documentation call sites at the new API. Made-with: Cursor
…an §7) Use withProcedures on agentChat, authorize for org checks, and typed db.* / trx.* collection entry points instead of schema-level helpers. Made-with: Cursor
…edures (plan §8–§10) Disable generic collection writes; add withProcedures (onboarding create plus step flows; user.update; organizationUser.update). Wire router imports, rename onboarding.initialize to create, use object payloads for user and member updates, and extend optimistic mutations for onboarding, user, and organizationUser. Made-with: Cursor
…ootstrap inserts Plan §10: add create procedure for membership rows (owner/internalApiKey); use db.organization/db.organizationUser.insert in organization create. Made-with: Cursor
Plan §11: extract organization route with withProcedures (create, update,
API key helpers); block generic insert/update; migrate web and digest
worker to mutate.organization.update({ id, ... }); add optimistic org patch.
Made-with: Cursor
Plan §12: disable generic integration insert/update; add create, update, and fetchSlackChannels procedures with authorize; migrate web, Discord, Slack, and GitHub call sites to object-shaped updates. Made-with: Cursor
Plan §13: label/threadLabel create+update procedures with authorize; migrate web label flows off collection insert/update. Plan §14: suggestion create+update procedures (worker + devtools + UI); migrate fetchClient and mutate call sites to object-shaped updates. Plan §15: timeline update create+update procedures; migrate web, GitHub webhooks, Discord, Slack, and thread actions off raw collection writes. Made-with: Cursor
…neric writes Made-with: Cursor
…ic writes Made-with: Cursor
…r to answer Made-with: Cursor
Made-with: Cursor
Made-with: Cursor
Made-with: Cursor
Made-with: Cursor
Made-with: Cursor
Made-with: Cursor
…ues and context type
…orization checks and simplifying query syntax
…s and streamline allowlist route checks
Summary
Rollout for Live-State custom procedures (
docs/live-state-custom-procedures-plan.md):pipelineIdempotencyKeyandpipelineJobuse named procedures; generic collectioninsert/updateare disabled for those tables; worker call sites updated.allowlistusescreate/updateprocedures; genericinsert/updatedisabled;apps/api/src/live-state/router/allowlist.tsadded.subscriptionusescreate/updateprocedures; generic collectionupdatedisabled;apps/api/src/live-state/router/subscription.tsadded; org bootstrap still usesdb.subscription.insertinsideorganization.create; Dodo webhooks remain onstorageoutside Live-State.authorusescreate/updateprocedures (idempotent get-or-create byuserId+organizationId, ormetaId+organizationId); generic collectioninsert/updatedisabled;apps/api/src/live-state/router/author.tsadded; web devtools, deprecated thread input, and Discord/Slack useauthor.create; optimisticauthor.create/author.updateinapps/web/src/lib/live-state.ts.inviteusescreate,cancel,accept, anddeclineprocedures; generic collectionupdatedisabled;organizationUser.inviteUserremoved in favor ofinvite.create;apps/api/src/live-state/router/invite.tsadded;team.tsxusesmutate.invite.createandmutate.invite.cancel; optimisticinvite.cancelinapps/web/src/lib/live-state.ts.documentationSourceuseswithProcedures(validate,create,recrawl, discriminatedupdate); genericinsert/updatedisabled; worker crawl progress usesupdatewithaction: "setProgress"; settings UI updated.agentChatuseswithProcedures(same RPC names); reads/writes in handlers use typeddb.*/trx.*collection APIs;authorizefor org membership.onboardinggenericinsert/updatedisabled;initializerenamed tocreatewithwithProcedures;completeStep/skip/completeunchanged by name; optimistic handlers inlive-state.ts.useruseswithProceduresupdateonly; generic collectionupdatedisabled;apps/api/src/live-state/router/user.ts; profile settings use object payload; optimisticuser.update.organizationUseruseswithProcedurescreateandupdate; generic collectioninsert/updatedisabled;apps/api/src/live-state/router/organization-user.ts; org bootstrap usesdb.organizationUser.insert; team member role/remove use object payloads; optimisticorganizationUser.update.organizationextracted toapps/api/src/live-state/router/organization.tswithwithProcedures(create,update,createPublicApiKey,revokePublicApiKey,listApiKeys); generic collectioninsert/updatedisabled; web org settings and digest worker usemutate.organization.update({ id, ... }); optimisticorganization.updateinlive-state.ts.integrationusesapps/api/src/live-state/router/integration.tswithcreate,update, andfetchSlackChannels; generic collectioninsert/updatedisabled; web OAuth flows,activate.ts, Discord/Slack/GitHub apps use object-shapedupdateandcreate.labelandthreadLabelusewithProcedurescreate/updateinapps/api/src/live-state/router/labels.ts; generic collectioninsert/updatedisabled; web labels UI, thread labels, and quick actions migrated.suggestionuseswithProcedurescreate/updateinapps/api/src/live-state/router/suggestions.ts; generic collectioninsert/updatedisabled; worker processors, devtools, signal, and thread toolbars use object-shaped updates andcreate.updatecollection useswithProcedurescreate/updateinapps/api/src/live-state/router/update.ts; generic collectioninsert/updatedisabled; web thread actions, GitHub webhooks, Discord, and Slack migrated tomutate.update.create/mutate.update.update({ id, ... }).Plan checklist (files cited in the plan)
Cross-cutting / router:
apps/api/src/live-state/router.tsapps/api/src/live-state/router/allowlist.tsapps/api/src/live-state/router/subscription.tsapps/api/src/live-state/router/author.tsapps/api/src/live-state/router/invite.tsapps/api/src/live-state/router/documentation-sources.tsapps/api/src/live-state/router/user.tsapps/api/src/live-state/router/organization-user.tsapps/api/src/live-state/router/organization.tsapps/api/src/live-state/router/integration.tsapps/api/src/live-state/router/labels.tsapps/api/src/live-state/router/suggestions.tsapps/api/src/live-state/router/update.tsapps/web/src/lib/live-state.tsapps/api/src/lib/authorize.tsWorker — pipeline & docs & suggestions:
apps/worker/src/pipeline/core/idempotency.tsapps/worker/src/pipeline/core/persistence.tsapps/worker/src/pipeline/processors/suggest-status.tsapps/worker/src/pipeline/processors/suggest-labels.tsapps/worker/src/pipeline/processors/suggest-duplicates.tsapps/worker/src/lib/database/client.tsapps/worker/src/handlers/match-pr-threads.tsapps/worker/src/handlers/digest-scan.tsapps/worker/src/handlers/digest-deliver.tsapps/worker/src/handlers/crawl-documentation.tsWeb — threads, signal, actions, toolbar:
apps/web/src/actions/threads.tsapps/web/src/components/threads/properties.tsxapps/web/src/components/threads/issues.tsxapps/web/src/components/threads/pull-requests.tsxapps/web/src/components/threads/linked-pr-suggestions-section.tsxapps/web/src/components/threads/thread-toolbar/quick-actions.tsxapps/web/src/components/threads/thread-toolbar/reply-editor.tsxapps/web/src/routes/app/_workspace/_main/signal/index.tsxapps/web/src/routes/app/_workspace/_main/threads/$id.tsxapps/web/src/routes/app/_workspace/_main/threads/archive/$id.tsxapps/web/src/routes/support/$slug/threads/$id.tsxWeb — labels & org & team & user & integrations:
apps/web/src/components/threads/labels.tsxapps/web/src/routes/app/_workspace/settings/organization/labels.tsxapps/web/src/routes/app/_workspace/settings/organization/support-intelligence.tsxapps/web/src/components/threads/thread-input-area-deprecated/support-intelligence.tsxapps/web/src/components/devtools/devtools-menu/add-pr-suggestion-command.tsxapps/web/src/components/devtools/devtools-menu/signals-submenu.tsxapps/web/src/lib/integrations/activate.tsapps/web/src/routes/app/_workspace/settings/organization/index.tsxapps/web/src/routes/app/_workspace/settings/organization/team.tsxapps/web/src/routes/app/_workspace/settings/user/index.tsxapps/web/src/routes/app/_workspace/settings/organization/documentation.tsxapps/web/src/routes/app/_workspace/settings/organization/api-keys.tsxapps/web/src/lib/server-funcs/payment.tsapps/web/src/lib/server-funcs/invitations.tsxWeb — devtools / deprecated / onboarding / playground:
apps/web/src/components/devtools/devtools-menu/create-thread-button.tsxapps/web/src/components/devtools/devtools-menu/create-thread-dialog.tsxapps/web/src/components/threads/create-thread-dialog.tsxapps/web/src/components/devtools/devtools-menu/duplicate-thread-command.tsxapps/web/src/components/threads/thread-input-area-deprecated/index.tsxapps/web/src/lib/onboarding/use-onboarding.tsapps/web/src/routes/app/_workspace/_main/playground/index.tsxDiscord & Slack:
apps/discord/src/index.tsapps/slack/src/index.tsapps/discord/src/lib/utils.tsapps/slack/src/lib/utils.tsapps/slack/src/lib/installation-store.tsGitHub app:
apps/github/src/webhooks/index.tsapps/github/src/routes/setup.tsWeb — integration settings (OAuth / UI; not every file was in the original checklist):
apps/web/src/routes/app/_workspace/settings/organization/integration/discord/index.tsxapps/web/src/routes/app/_workspace/settings/organization/integration/discord/redirect.tsapps/web/src/routes/app/_workspace/settings/organization/integration/slack/index.tsxapps/web/src/routes/app/_workspace/settings/organization/integration/slack/redirect.tsapps/web/src/routes/app/_workspace/settings/organization/integration/github/index.tsxSummary by CodeRabbit
Bug Fixes
Infrastructure