-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/hooks #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/hooks #47
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,56 +1,55 @@ | ||||||
|
|
||||||
| import { prismaClient } from "@repo/db/client"; | ||||||
| import express from "express"; | ||||||
|
|
||||||
| import express from "express"; | ||||||
| const app = express(); | ||||||
| app.use(express.json()); | ||||||
|
|
||||||
| app.post("/hooks/catch/:userId/:workflowId", async (req, res) => { | ||||||
| console.log("REcieved Here in the initla error") | ||||||
|
|
||||||
| const userId = req.params.userId; | ||||||
| const Data = req.body | ||||||
| try { | ||||||
| const result = await prismaClient.$transaction( | ||||||
| async (tx) => { | ||||||
| console.log("REcieved Here") | ||||||
| const workflow = await tx.workflow.create({ | ||||||
| data: { | ||||||
| userId, | ||||||
| name: "Sample Workflow", | ||||||
| description: "Auto-generated workflow", | ||||||
| status: "Start", | ||||||
| config: {}, | ||||||
| }, | ||||||
| }); | ||||||
| console.log("REcieved Here 2222") | ||||||
|
|
||||||
| const workflowExecution = await tx.workflowExecution.create({ | ||||||
| data: { | ||||||
| workflowId: workflow.id, | ||||||
| status: "Start", | ||||||
| metadata: Data | ||||||
| }, | ||||||
| }); | ||||||
| console.log("REcieved Here 22222") | ||||||
|
|
||||||
| // Assuming that workflowExecutionTable requires a valid workflowExecution connection | ||||||
| const datInWork = await tx.workflowExecutionTable.create({ | ||||||
| data : { | ||||||
| workflowExecution : { | ||||||
| connect : {id : workflowExecution.id} | ||||||
| } | ||||||
| } | ||||||
| }); | ||||||
| return { workflow, workflowExecution , datInWork }; | ||||||
| const { userId, workflowId } = req.params; | ||||||
| const { triggerData } = req.body; | ||||||
|
|
||||||
| const result = await prismaClient.$transaction(async (tx) => { | ||||||
| console.log("Request Recieved to hooks backed with", userId, workflowId); | ||||||
|
|
||||||
| const workflow = await tx.workflow.findFirst({ | ||||||
| where: { id: workflowId, userId }, | ||||||
| include: { nodes: { orderBy: { position: "asc" } } }, | ||||||
| }); | ||||||
| if (!workflow) { | ||||||
| throw new Error("Workflow not found or access denied"); | ||||||
| } | ||||||
| ); | ||||||
|
|
||||||
| res.status(200).json({ success: true, data: result }); | ||||||
| } catch (err) { | ||||||
| console.error(err); | ||||||
| res.status(500).json({ success: false, error: "Internal Server Error" }); | ||||||
| const workflowExecution = await tx.workflowExecution.create({ | ||||||
| data: { | ||||||
| workflowId: workflow.id, | ||||||
| // next time you see this line validate the trigger data thinnnnnnnnnn | ||||||
|
||||||
| status: "Pending", | ||||||
| metadata: triggerData, | ||||||
| }, | ||||||
| }); | ||||||
|
|
||||||
| const outBox = await tx.workflowExecutionTable.create({ | ||||||
|
||||||
| const outBox = await tx.workflowExecutionTable.create({ | |
| await tx.workflowExecutionTable.create({ |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -280,20 +280,21 @@ router.post("/create/workflow", | |||||
| // ------------------------------------ FETCHING WORKFLOWS ----------------------------------- | ||||||
|
|
||||||
| router.get("/workflows", | ||||||
| userMiddleware, | ||||||
| userMiddleware , | ||||||
|
||||||
| userMiddleware , | |
| userMiddleware, |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spacing inconsistency: there's an extra space after req.user.id. Remove the trailing space for consistent formatting.
| const userId = req.user.id ; | |
| const userId = req.user.id; |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This console.log statement should be removed before merging. Debug logging should either be removed or replaced with a proper logging framework for production code.
| console.log(workflows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debug console.log statement.
This debug statement should be removed before merging to avoid cluttering production logs.
🔎 Proposed fix
- console.log(workflows)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| console.log(workflows) |
🤖 Prompt for AI Agents
In apps/http-backend/src/routes/userRoutes/userRoutes.ts around line 297 there
is a leftover debug console.log(workflows); remove this debug statement before
merging — either delete the console.log call or replace it with a proper logger
call at an appropriate level (e.g., debug) if the information must be retained,
ensuring no direct console.log remains in production code.
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling error in the response message: "succesfullu" should be "successfully".
| .json({ message: "Workflows fetched succesfullu", Data: workflows }); | |
| .json({ message: "Workflows fetched successfully", Data: workflows }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| {"root":["./src/index.ts","./src/routes/google_callback.ts","./src/routes/nodes.routes.ts","./src/routes/userRoutes/userMiddleware.ts","./src/routes/userRoutes/userRoutes.ts"],"version":"5.7.3"} | ||
| {"root":["./src/index.ts","./src/routes/google_callback.ts","./src/routes/nodes.routes.ts","./src/routes/userRoutes/userMiddleware.ts","./src/routes/userRoutes/userRoutes.ts"],"version":"5.7.3"} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| # Ignore build artifacts and dependency folders for production | ||
| node_modules/ | ||
| dist/ | ||
| build/ | ||
| *.log | ||
| *.tsbuildinfo | ||
| .env | ||
| .DS_Store | ||
| coverage/ | ||
| .tmp/ | ||
| .idea/ | ||
| .vscode/ |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,37 +1,51 @@ | ||||||
| import {prismaClient} from "@repo/db/client" | ||||||
| import { prismaClient } from "@repo/db/client"; | ||||||
|
|
||||||
| import { Kafka } from "kafkajs"; | ||||||
| import { retryLogic } from "./lib/retry.js"; | ||||||
|
|
||||||
| const kafka = new Kafka({ | ||||||
| clientId: 'Processing App', | ||||||
| brokers: ['localhost:9092'] | ||||||
| }) | ||||||
| const TOPIC_NAME = "First-Client"; | ||||||
|
|
||||||
| const TOPIC_NAME = "First-Client" | ||||||
| async function main () { | ||||||
| const producer = kafka.producer(); | ||||||
| await producer.connect(); | ||||||
|
|
||||||
| const kafka = new Kafka({ | ||||||
| brokers: ["localhost:9092"], | ||||||
| clientId: "Processing App", | ||||||
| }); | ||||||
| async function main() { | ||||||
| const producer = kafka.producer(); | ||||||
|
|
||||||
| await retryLogic(async () => { | ||||||
| await producer.connect(); | ||||||
| console.log("Producer connected to kafka successfully"); | ||||||
| }, 3); | ||||||
|
|
||||||
| while (1) { | ||||||
| const pedningRows = await prismaClient.workflowExecutionTable.findMany({ | ||||||
| where : {}, | ||||||
| take : 10 | ||||||
| }) | ||||||
| while (true) { | ||||||
| try { | ||||||
| const pendingRows = await prismaClient.workflowExecutionTable.findMany({ | ||||||
| // take: 10, | ||||||
|
||||||
| // take: 10, |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in console.log message: there's a duplicate word "to kafka" at the end. The message should read "Published X messages to kafka" without the redundant "to kafka" at the end.
| console.log(`Published to kafka with ${pendingRows.length} to kafka `); | |
| console.log(`Published ${pendingRows.length} messages to kafka`); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,19 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function retryLogic<T>( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| fn: () => Promise<T>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxRetries = 3 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<T> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (let attempt = 0; attempt <= maxRetries; attempt++) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const result = await fn(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (attempt === maxRetries - 1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (attempt === maxRetries - 1) | |
| if (attempt === maxRetries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Off-by-one error in retry logic.
The loop condition and throw check are inconsistent:
- Loop runs
attemptfrom0tomaxRetriesinclusive (4 iterations whenmaxRetries=3) - Error is thrown when
attempt === maxRetries - 1(i.e., on the 3rd iteration,attempt=2) - This means the function throws before the final retry attempt (
attempt=3) is ever tried
🔎 Proposed fix
export async function retryLogic<T>(
fn: () => Promise<T>,
maxRetries = 3
): Promise<T> {
- for (let attempt = 0; attempt <= maxRetries; attempt++) {
+ for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const result = await fn();
return result;
} catch (error) {
- if (attempt === maxRetries - 1)
+ if (attempt === maxRetries - 1) {
throw Error("Max Retries Reached. Try again after some time");
+ }
const delay = Math.pow(2, attempt) * 1000;
console.log(`Retrying again with attempt ${attempt} after ${delay} ms`);
await new Promise((res) => setTimeout(res, delay));
}
}
- throw Error("Max Retries Reached. Try again after some time Technically this won't get executed but putting for the sake of making typescript happy ");
+ // Unreachable due to throw in catch block on final attempt
+ throw Error("Max Retries Reached");
}With this fix, for maxRetries=3:
attempt=0: 1st tryattempt=1: 2nd try (1st retry)attempt=2: 3rd try (2nd retry) → throws on failure sinceattempt === maxRetries - 1
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/processor/src/lib/retry.ts around lines 5 to 17, the retry loop runs
off-by-one: change the loop condition from "attempt = 0; attempt <= maxRetries;
attempt++" to "attempt = 0; attempt < maxRetries; attempt++" so attempts iterate
0..maxRetries-1 and the existing throw check (attempt === maxRetries - 1)
correctly throws after the final allowed attempt; keep the delay/logging and
await logic unchanged.
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is unreachable code. The loop guarantees that either a successful result is returned or an error is thrown at maxRetries, making this line impossible to execute. Consider removing it or restructuring the function to avoid TypeScript complaints without adding unreachable code.
| for (let attempt = 0; attempt <= maxRetries; attempt++) { | |
| try { | |
| const result = await fn(); | |
| return result; | |
| } catch (error) { | |
| if (attempt === maxRetries - 1) | |
| throw Error("Max Retries Reached. Try again after some time"); | |
| const delay = Math.pow(2, attempt) * 1000; | |
| console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); | |
| await new Promise((res) => setTimeout(res, delay)); | |
| } | |
| } | |
| throw Error("Max Retries Reached. Try again after some time Technically this won't get executed but putting for the sake of making typescript happy "); | |
| for (let attempt = 0; attempt < maxRetries; attempt++) { | |
| try { | |
| const result = await fn(); | |
| return result; | |
| } catch (error) { | |
| if (attempt === maxRetries - 1) { | |
| break; | |
| } | |
| const delay = Math.pow(2, attempt) * 1000; | |
| console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); | |
| await new Promise((res) => setTimeout(res, delay)); | |
| } | |
| } | |
| throw Error("Max Retries Reached. Try again after some time"); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,5 +3,7 @@ | |
| "compilerOptions" : { | ||
| "rootDir" : "./src", | ||
| "outDir" : "./dist" | ||
| } | ||
| }, | ||
| "include": ["src"] | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| {"root":["./src/index.ts"],"version":"5.7.3"} | ||
| {"root":["./src/index.ts","./src/lib/retry.ts"],"version":"5.7.3"} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,6 +118,7 @@ model WorkflowExecutionTable { | |
| id String @id @default(uuid()) | ||
| workflowExecutionId String @unique | ||
| workflowExecution WorkflowExecution @relation(fields: [workflowExecutionId], references: [id]) | ||
| sent Boolean @default(false) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # List migration files in the prisma migrations directory
ls -la packages/db/prisma/migrations 2>/dev/null | tail -20
# Check if migrations directory exists and list files
fd -t f . packages/db/prisma/migrations 2>/dev/null | head -30Repository: Dev-Pross/BuildFlow Length of output: 289 🏁 Script executed: # Get the full context of WorkflowExecutionTable in the schema
rg -A 20 "model WorkflowExecutionTable" packages/db/prisma/schema.prismaRepository: Dev-Pross/BuildFlow Length of output: 441 🏁 Script executed: # Search for any index definitions related to the sent field
rg "@@index.*sent|sent.*@@index" packages/db/prisma/schema.prismaRepository: Dev-Pross/BuildFlow Length of output: 45 🏁 Script executed: # Check if there are any queries using the sent field
rg -i "sent\s*=" --type ts --type js packages/db -A 2 | head -40Repository: Dev-Pross/BuildFlow Length of output: 295 A Prisma migration file must be generated and applied for the The Additionally, consider adding an index on the model WorkflowExecutionTable {
id String @id @default(uuid())
workflowExecutionId String @unique
workflowExecution WorkflowExecution @relation(fields: [workflowExecutionId], references: [id])
sent Boolean @default(false)
@@index([sent])
}Then generate and apply the migration to update the index. 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| enum WorkflowStatus { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling error: "Recieved" should be "Received".