-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/hooks #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/hooks #47
Conversation
- Updated the webhook endpoint to validate workflows and handle errors more gracefully. - Introduced a retry mechanism for connecting to Kafka, improving resilience in message processing. - Adjusted the server port from 3002 to 3003 for consistency. - Added a new utility function for retry logic to manage transient errors during Kafka operations.
…origins format - Changed error message for workflow not found to be more descriptive. - Updated error response for webhook processing to provide a clearer failure message. - Adjusted allowed origins in CORS configuration to remove trailing slashes for consistency. - Added .gitignore file to exclude build artifacts and dependency folders in the processor directory.
📝 WalkthroughWalkthroughThe pull request refactors the webhook-to-processing pipeline: the hooks service now validates and looks up existing workflows rather than creating new ones, creates workflowExecution records, and publishes workflowExecutionIds to Kafka; the processor consumes these messages with added retry logic and continuous polling, while the database schema adds a Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Hooks as Hooks Service<br/>(port 3003)
participant DB as Database
participant Kafka as Kafka Broker
participant Processor as Processor Service
Client->>Hooks: POST /webhook<br/>(userId, workflowId, triggerData)
Hooks->>DB: Fetch workflow by id & userId<br/>+ nodes
alt Workflow not found or<br/>unauthorized
DB-->>Hooks: null
Hooks-->>Client: 500 Error
else Valid workflow
DB-->>Hooks: workflow + nodes
Hooks->>DB: Create workflowExecution
DB-->>Hooks: workflowExecutionId
Hooks->>DB: Insert into workflowExecutionTable<br/>(sent: false)
DB-->>Hooks: ✓
Hooks-->>Client: Return workflowExecutionId
end
Note over Processor: Continuous polling<br/>with 1s delay
loop Every iteration
Processor->>DB: Fetch rows where sent=false
DB-->>Processor: pending entries
Processor->>Kafka: Publish workflowExecutionIds<br/>to "First-Client" topic<br/>(with retryLogic)
Kafka-->>Processor: ✓ Published
Processor->>DB: Update rows: sent=true
DB-->>Processor: ✓
Processor->>Processor: Wait 1s
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a hooks-based workflow execution system with reliable message processing. The changes introduce an outbox pattern for workflow executions, add retry logic for Kafka connections, and refactor the webhook handler to properly validate and process workflow triggers.
Key Changes:
- Added an outbox pattern using a
sentboolean flag in the database to track processed workflow executions - Implemented retry logic with exponential backoff for Kafka producer connection reliability
- Refactored hooks service to validate workflows and properly handle webhook triggers
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/db/prisma/schema.prisma | Added sent field to WorkflowExecutionTable for outbox pattern implementation |
| apps/processor/src/lib/retry.ts | New utility function implementing retry logic with exponential backoff |
| apps/processor/src/index.ts | Updated processor to use retry logic, filter by sent status, and mark records as processed instead of deleting |
| apps/processor/tsconfig.json | Added include configuration for TypeScript compilation |
| apps/processor/.gitignore | Added standard ignore patterns for Node.js projects |
| apps/hooks/src/index.ts | Refactored webhook handler to validate workflows, properly structure trigger data, and improve error handling |
| apps/http-backend/src/routes/userRoutes/userRoutes.ts | Minor formatting changes and debug logging addition |
| apps/http-backend/src/index.ts | Updated CORS allowed origins configuration |
| apps/processor/tsconfig.tsbuildinfo | Build metadata update reflecting new retry module |
| apps/http-backend/tsconfig.tsbuildinfo | Build metadata with formatting changes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .status(statusCodes.UNAUTHORIZED) | ||
| .json({ message: "User is not logged in /not authorized" }); | ||
| const userId = req.user.id; | ||
| const userId = req.user.id ; |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spacing inconsistency: there's an extra space after req.user.id. Remove the trailing space for consistent formatting.
| const userId = req.user.id ; | |
| const userId = req.user.id; |
| userId | ||
| }, | ||
| }); | ||
| console.log(workflows) |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This console.log statement should be removed before merging. Debug logging should either be removed or replaced with a proper logging framework for production code.
| console.log(workflows) |
| while (true) { | ||
| try { | ||
| const pendingRows = await prismaClient.workflowExecutionTable.findMany({ | ||
| // take: 10, |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commented-out code for the take parameter should either be uncommented with a proper value or removed entirely. Leaving commented code in production reduces maintainability.
| // take: 10, |
|
|
||
| router.get("/workflows", | ||
| userMiddleware, | ||
| userMiddleware , |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spacing inconsistency: there's an extra space after userMiddleware. Remove the trailing space for consistent formatting.
| userMiddleware , | |
| userMiddleware, |
| }, | ||
| }); | ||
|
|
||
| const outBox = await tx.workflowExecutionTable.create({ |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unused variable outBox is created but never used. Either remove this variable if it's not needed, or use the result appropriately.
| const outBox = await tx.workflowExecutionTable.create({ | |
| await tx.workflowExecutionTable.create({ |
| const result = await fn(); | ||
| return result; | ||
| } catch (error) { | ||
| if (attempt === maxRetries - 1) |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition attempt === maxRetries - 1 is incorrect. With maxRetries = 3, the loop runs from attempt 0 to 3 (inclusive), so this condition will throw an error at attempt 2, allowing one more retry at attempt 3. The condition should be attempt === maxRetries to throw after all retries are exhausted.
| if (attempt === maxRetries - 1) | |
| if (attempt === maxRetries) |
| for (let attempt = 0; attempt <= maxRetries; attempt++) { | ||
| try { | ||
| const result = await fn(); | ||
| return result; | ||
| } catch (error) { | ||
| if (attempt === maxRetries - 1) | ||
| throw Error("Max Retries Reached. Try again after some time"); | ||
|
|
||
| const delay = Math.pow(2, attempt) * 1000; | ||
| console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); | ||
| await new Promise((res) => setTimeout(res, delay)); | ||
| } | ||
| } | ||
| throw Error("Max Retries Reached. Try again after some time Technically this won't get executed but putting for the sake of making typescript happy "); |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is unreachable code. The loop guarantees that either a successful result is returned or an error is thrown at maxRetries, making this line impossible to execute. Consider removing it or restructuring the function to avoid TypeScript complaints without adding unreachable code.
| for (let attempt = 0; attempt <= maxRetries; attempt++) { | |
| try { | |
| const result = await fn(); | |
| return result; | |
| } catch (error) { | |
| if (attempt === maxRetries - 1) | |
| throw Error("Max Retries Reached. Try again after some time"); | |
| const delay = Math.pow(2, attempt) * 1000; | |
| console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); | |
| await new Promise((res) => setTimeout(res, delay)); | |
| } | |
| } | |
| throw Error("Max Retries Reached. Try again after some time Technically this won't get executed but putting for the sake of making typescript happy "); | |
| for (let attempt = 0; attempt < maxRetries; attempt++) { | |
| try { | |
| const result = await fn(); | |
| return result; | |
| } catch (error) { | |
| if (attempt === maxRetries - 1) { | |
| break; | |
| } | |
| const delay = Math.pow(2, attempt) * 1000; | |
| console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); | |
| await new Promise((res) => setTimeout(res, delay)); | |
| } | |
| } | |
| throw Error("Max Retries Reached. Try again after some time"); |
| const { triggerData } = req.body; | ||
|
|
||
| const result = await prismaClient.$transaction(async (tx) => { | ||
| console.log("Request Recieved to hooks backed with", userId, workflowId); |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling error: "Recieved" should be "Received".
| console.log("Request Recieved to hooks backed with", userId, workflowId); | |
| console.log("Request Received to hooks backed with", userId, workflowId); |
| const workflowExecution = await tx.workflowExecution.create({ | ||
| data: { | ||
| workflowId: workflow.id, | ||
| // next time you see this line validate the trigger data thinnnnnnnnnn |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment indicates incomplete implementation. The trigger data validation should be added before merging to production to prevent invalid data from being stored in the workflow execution metadata.
| console.log(workflows) | ||
| return res | ||
| .status(statusCodes.OK) | ||
| .json({ message: "Workflows fetched succesfullu", Data: workflows }); |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling error in the response message: "succesfullu" should be "successfully".
| .json({ message: "Workflows fetched succesfullu", Data: workflows }); | |
| .json({ message: "Workflows fetched successfully", Data: workflows }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (8)
apps/hooks/src/index.ts (4)
27-27: Address the TODO comment or track it as an issue.This inline reminder suggests trigger data validation is needed. Leaving implementation notes in production code can be overlooked. Consider implementing the validation now or creating a tracked issue.
Would you like me to open a new issue to track this validation requirement?
33-38: Unused variableoutBox.The
outBoxvariable is created but never used. If you don't need to reference it, you can omit the assignment.🔎 Proposed fix
- const outBox = await tx.workflowExecutionTable.create({ + await tx.workflowExecutionTable.create({ data: { workflowExecutionId: workflowExecution.id, }, });
14-14: Fix typos in log message."Recieved" → "Received" and "backed" → "backend".
🔎 Proposed fix
- console.log("Request Recieved to hooks backed with", userId, workflowId); + console.log("Request Received to hooks backend with", userId, workflowId);
44-49: Addreturnbefore sending error response.While Express 5 handles this, explicitly returning after sending a response is a best practice to prevent accidental double-sends in future code changes.
🔎 Proposed fix
} catch (error: any) { console.log(error); - res.status(500).json({ + return res.status(500).json({ success: false, error: "Failed to process webhook" }); }apps/processor/src/index.ts (4)
9-12: Externalize Kafka broker configuration.Hardcoding
localhost:9092limits deployability. Consider using environment variables.🔎 Proposed fix
const kafka = new Kafka({ - brokers: ["localhost:9092"], + brokers: (process.env.KAFKA_BROKERS || "localhost:9092").split(","), clientId: "Processing App", });
23-26: Unbounded query could cause memory pressure.The
take: 10limit is commented out. Processing a large backlog could lead to memory issues and long Kafka send operations. Consider re-enabling a batch limit.🔎 Proposed fix
const pendingRows = await prismaClient.workflowExecutionTable.findMany({ - // take: 10, + take: 100, // Process in batches where : {sent : false} });
13-48: Missing graceful shutdown handling.The producer connection is never closed. On process termination (SIGTERM/SIGINT), in-flight messages may be lost and connections left dangling.
🔎 Proposed fix
Add graceful shutdown after the
mainfunction:async function main() { const producer = kafka.producer(); // ... existing code ... const shutdown = async () => { console.log("Shutting down..."); await producer.disconnect(); process.exit(0); }; process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); while (true) { // ... existing loop ... } }
31-33: Consider adding a message key for partition consistency.Messages without a key are round-robin distributed across partitions. If ordering per workflow matters, use
workflowExecutionIdas the key.🔎 Proposed fix
messages: pendingRows.map((r) => ({ + key: r.workflowExecutionId, value: r.workflowExecutionId, })),
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
apps/hooks/src/index.tsapps/http-backend/src/index.tsapps/http-backend/src/routes/userRoutes/userRoutes.tsapps/http-backend/tsconfig.tsbuildinfoapps/processor/.gitignoreapps/processor/src/index.tsapps/processor/src/lib/retry.tsapps/processor/tsconfig.jsonapps/processor/tsconfig.tsbuildinfopackages/db/prisma/schema.prisma
🧰 Additional context used
🧬 Code graph analysis (3)
apps/hooks/src/index.ts (1)
packages/db/src/index.ts (1)
prismaClient(17-18)
apps/http-backend/src/routes/userRoutes/userRoutes.ts (2)
apps/http-backend/src/routes/userRoutes/userMiddleware.ts (2)
userMiddleware(18-42)AuthRequest(8-10)packages/db/src/index.ts (1)
prismaClient(17-18)
apps/processor/src/index.ts (2)
apps/processor/src/lib/retry.ts (1)
retryLogic(1-19)packages/db/src/index.ts (1)
prismaClient(17-18)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Agent
🔇 Additional comments (7)
apps/http-backend/tsconfig.tsbuildinfo (1)
1-1: The.gitignorefile already contains proper entries excludingtsconfig.tsbuildinfofiles (includingapps/http-backend/tsconfig.tsbuildinfospecifically), and this file is not staged for commit. No action is required.Likely an incorrect or invalid review comment.
apps/processor/.gitignore (1)
1-12: LGTM! Standard .gitignore patterns.The .gitignore file includes all essential patterns for a TypeScript/Node.js project, covering dependencies, build artifacts, environment files, logs, and IDE configurations.
apps/http-backend/src/index.ts (1)
16-16: LGTM! Corrected CORS origin configuration.The change fixes what appears to be a typo (port 300 → 3001) and removes the trailing slash for consistency. The allowedOrigins now properly supports both port 3000 and 3001 for local development.
apps/processor/tsconfig.json (1)
6-7: LGTM! Proper TypeScript configuration.Adding the explicit
includedirective is a good practice that makes the TypeScript configuration more explicit about which source files to compile.apps/http-backend/src/routes/userRoutes/userRoutes.ts (1)
283-283: LGTM! Clean code improvements.The formatting adjustments and use of ES6 shorthand property notation (
userIdinstead ofuserId: userId) improve code readability and follow modern JavaScript best practices.Also applies to: 290-290, 294-294
apps/processor/tsconfig.tsbuildinfo (1)
1-1: Auto-generated build metadata.This file is auto-generated by TypeScript and reflects the addition of the new
retry.tssource file. No issues.apps/processor/src/index.ts (1)
28-40: Potential duplicate messages on failure between send and update.If the application crashes after
producer.send()succeeds but beforeupdateMany()completes, messages will be re-sent on restart. This is acceptable for at-least-once semantics, but downstream consumers should be idempotent.Ensure the Kafka consumer handling these messages is idempotent to handle potential duplicates.
| userId | ||
| }, | ||
| }); | ||
| console.log(workflows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debug console.log statement.
This debug statement should be removed before merging to avoid cluttering production logs.
🔎 Proposed fix
- console.log(workflows)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| console.log(workflows) |
🤖 Prompt for AI Agents
In apps/http-backend/src/routes/userRoutes/userRoutes.ts around line 297 there
is a leftover debug console.log(workflows); remove this debug statement before
merging — either delete the console.log call or replace it with a proper logger
call at an appropriate level (e.g., debug) if the information must be retained,
ensuring no direct console.log remains in production code.
| for (let attempt = 0; attempt <= maxRetries; attempt++) { | ||
| try { | ||
| const result = await fn(); | ||
| return result; | ||
| } catch (error) { | ||
| if (attempt === maxRetries - 1) | ||
| throw Error("Max Retries Reached. Try again after some time"); | ||
|
|
||
| const delay = Math.pow(2, attempt) * 1000; | ||
| console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); | ||
| await new Promise((res) => setTimeout(res, delay)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Off-by-one error in retry logic.
The loop condition and throw check are inconsistent:
- Loop runs
attemptfrom0tomaxRetriesinclusive (4 iterations whenmaxRetries=3) - Error is thrown when
attempt === maxRetries - 1(i.e., on the 3rd iteration,attempt=2) - This means the function throws before the final retry attempt (
attempt=3) is ever tried
🔎 Proposed fix
export async function retryLogic<T>(
fn: () => Promise<T>,
maxRetries = 3
): Promise<T> {
- for (let attempt = 0; attempt <= maxRetries; attempt++) {
+ for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const result = await fn();
return result;
} catch (error) {
- if (attempt === maxRetries - 1)
+ if (attempt === maxRetries - 1) {
throw Error("Max Retries Reached. Try again after some time");
+ }
const delay = Math.pow(2, attempt) * 1000;
console.log(`Retrying again with attempt ${attempt} after ${delay} ms`);
await new Promise((res) => setTimeout(res, delay));
}
}
- throw Error("Max Retries Reached. Try again after some time Technically this won't get executed but putting for the sake of making typescript happy ");
+ // Unreachable due to throw in catch block on final attempt
+ throw Error("Max Retries Reached");
}With this fix, for maxRetries=3:
attempt=0: 1st tryattempt=1: 2nd try (1st retry)attempt=2: 3rd try (2nd retry) → throws on failure sinceattempt === maxRetries - 1
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/processor/src/lib/retry.ts around lines 5 to 17, the retry loop runs
off-by-one: change the loop condition from "attempt = 0; attempt <= maxRetries;
attempt++" to "attempt = 0; attempt < maxRetries; attempt++" so attempts iterate
0..maxRetries-1 and the existing throw check (attempt === maxRetries - 1)
correctly throws after the final allowed attempt; keep the delay/logging and
await logic unchanged.
| id String @id @default(uuid()) | ||
| workflowExecutionId String @unique | ||
| workflowExecution WorkflowExecution @relation(fields: [workflowExecutionId], references: [id]) | ||
| sent Boolean @default(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# List migration files in the prisma migrations directory
ls -la packages/db/prisma/migrations 2>/dev/null | tail -20
# Check if migrations directory exists and list files
fd -t f . packages/db/prisma/migrations 2>/dev/null | head -30Repository: Dev-Pross/BuildFlow
Length of output: 289
🏁 Script executed:
# Get the full context of WorkflowExecutionTable in the schema
rg -A 20 "model WorkflowExecutionTable" packages/db/prisma/schema.prismaRepository: Dev-Pross/BuildFlow
Length of output: 441
🏁 Script executed:
# Search for any index definitions related to the sent field
rg "@@index.*sent|sent.*@@index" packages/db/prisma/schema.prismaRepository: Dev-Pross/BuildFlow
Length of output: 45
🏁 Script executed:
# Check if there are any queries using the sent field
rg -i "sent\s*=" --type ts --type js packages/db -A 2 | head -40Repository: Dev-Pross/BuildFlow
Length of output: 295
A Prisma migration file must be generated and applied for the sent field.
The sent field has been added to the schema but there is no corresponding migration file in packages/db/prisma/migrations/. Generate this migration with:
npx prisma migrate dev --name add_sent_field_to_workflow_execution_table
Additionally, consider adding an index on the sent field to improve query performance if the processor frequently queries for unsent records (WHERE sent = false):
model WorkflowExecutionTable {
id String @id @default(uuid())
workflowExecutionId String @unique
workflowExecution WorkflowExecution @relation(fields: [workflowExecutionId], references: [id])
sent Boolean @default(false)
@@index([sent])
}Then generate and apply the migration to update the index.
🤖 Prompt for AI Agents
In packages/db/prisma/schema.prisma at line 121 the new "sent" Boolean field was
added but no Prisma migration exists; generate and apply a migration (e.g., run
npx prisma migrate dev --name add_sent_field_to_workflow_execution_table) so the
database is updated, and if queries will frequently filter WHERE sent = false
add an @@index([sent]) to the WorkflowExecutionTable model in the same schema
before generating the migration so the index is created together with the
column.
Summary by CodeRabbit
New Features
Bug Fixes
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.