diff --git a/apps/hooks/src/index.ts b/apps/hooks/src/index.ts index 16d06bb..6d09c48 100644 --- a/apps/hooks/src/index.ts +++ b/apps/hooks/src/index.ts @@ -1,56 +1,55 @@ + import { prismaClient } from "@repo/db/client"; -import express from "express"; +import express from "express"; const app = express(); app.use(express.json()); app.post("/hooks/catch/:userId/:workflowId", async (req, res) => { - console.log("REcieved Here in the initla error") - - const userId = req.params.userId; - const Data = req.body try { - const result = await prismaClient.$transaction( - async (tx) => { - console.log("REcieved Here") - const workflow = await tx.workflow.create({ - data: { - userId, - name: "Sample Workflow", - description: "Auto-generated workflow", - status: "Start", - config: {}, - }, - }); - console.log("REcieved Here 2222") - - const workflowExecution = await tx.workflowExecution.create({ - data: { - workflowId: workflow.id, - status: "Start", - metadata: Data - }, - }); - console.log("REcieved Here 22222") - - // Assuming that workflowExecutionTable requires a valid workflowExecution connection - const datInWork = await tx.workflowExecutionTable.create({ - data : { - workflowExecution : { - connect : {id : workflowExecution.id} - } - } - }); - return { workflow, workflowExecution , datInWork }; + const { userId, workflowId } = req.params; + const { triggerData } = req.body; + + const result = await prismaClient.$transaction(async (tx) => { + console.log("Request Recieved to hooks backed with", userId, workflowId); + + const workflow = await tx.workflow.findFirst({ + where: { id: workflowId, userId }, + include: { nodes: { orderBy: { position: "asc" } } }, + }); + if (!workflow) { + throw new Error("Workflow not found or access denied"); } - ); - res.status(200).json({ success: true, data: result }); - } catch (err) { - console.error(err); - res.status(500).json({ success: false, error: "Internal Server Error" }); + const workflowExecution = await tx.workflowExecution.create({ + data: { + workflowId: workflow.id, +// next time you see this line validate the trigger data thinnnnnnnnnn + status: "Pending", + metadata: triggerData, + }, + }); + + const outBox = await tx.workflowExecutionTable.create({ + data: { + workflowExecutionId: workflowExecution.id, + }, + }); + return { workflowExecution }; + }); + return res.status(200).json({ + success: true, + workflowExecutionId: result.workflowExecution.id, + }); + } catch (error: any) { + console.log(error); + res.status(500).json({ + success: false, + error: "Failed to process webhook" + }); } }); -app.listen(3002, () => console.log("Server listening on port 3002")); - +app.listen(3003, () => { + console.log("Server running on 3003"); +}); diff --git a/apps/http-backend/src/index.ts b/apps/http-backend/src/index.ts index 407fbd2..f1920ae 100644 --- a/apps/http-backend/src/index.ts +++ b/apps/http-backend/src/index.ts @@ -13,7 +13,7 @@ import { googleAuth } from "./routes/google_callback.js"; const app = express() -const allowedOrigins = ['http://localhost:3000' , 'http://localhost:300/' ]; +const allowedOrigins = ['http://localhost:3000', 'http://localhost:3001']; app.use(cors({ origin: allowedOrigins, credentials: true, diff --git a/apps/http-backend/src/routes/userRoutes/userRoutes.ts b/apps/http-backend/src/routes/userRoutes/userRoutes.ts index ab1d441..498eaa1 100644 --- a/apps/http-backend/src/routes/userRoutes/userRoutes.ts +++ b/apps/http-backend/src/routes/userRoutes/userRoutes.ts @@ -280,20 +280,21 @@ router.post("/create/workflow", // ------------------------------------ FETCHING WORKFLOWS ----------------------------------- router.get("/workflows", - userMiddleware, + userMiddleware , async (req: AuthRequest, res: Response) => { try { if (!req.user) return res .status(statusCodes.UNAUTHORIZED) .json({ message: "User is not logged in /not authorized" }); - const userId = req.user.id; + const userId = req.user.id ; const workflows = await prismaClient.workflow.findMany({ where: { - userId: userId, + userId }, }); + console.log(workflows) return res .status(statusCodes.OK) .json({ message: "Workflows fetched succesfullu", Data: workflows }); diff --git a/apps/http-backend/tsconfig.tsbuildinfo b/apps/http-backend/tsconfig.tsbuildinfo index 6d7fa0f..82bdad7 100644 --- a/apps/http-backend/tsconfig.tsbuildinfo +++ b/apps/http-backend/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/index.ts","./src/routes/google_callback.ts","./src/routes/nodes.routes.ts","./src/routes/userRoutes/userMiddleware.ts","./src/routes/userRoutes/userRoutes.ts"],"version":"5.7.3"} +{"root":["./src/index.ts","./src/routes/google_callback.ts","./src/routes/nodes.routes.ts","./src/routes/userRoutes/userMiddleware.ts","./src/routes/userRoutes/userRoutes.ts"],"version":"5.7.3"} \ No newline at end of file diff --git a/apps/processor/.gitignore b/apps/processor/.gitignore new file mode 100644 index 0000000..df095ef --- /dev/null +++ b/apps/processor/.gitignore @@ -0,0 +1,12 @@ +# Ignore build artifacts and dependency folders for production +node_modules/ +dist/ +build/ +*.log +*.tsbuildinfo +.env +.DS_Store +coverage/ +.tmp/ +.idea/ +.vscode/ \ No newline at end of file diff --git a/apps/processor/src/index.ts b/apps/processor/src/index.ts index 89f138d..2bdf1e4 100644 --- a/apps/processor/src/index.ts +++ b/apps/processor/src/index.ts @@ -1,37 +1,51 @@ -import {prismaClient} from "@repo/db/client" +import { prismaClient } from "@repo/db/client"; import { Kafka } from "kafkajs"; +import { retryLogic } from "./lib/retry.js"; -const kafka = new Kafka({ - clientId: 'Processing App', - brokers: ['localhost:9092'] - }) +const TOPIC_NAME = "First-Client"; - const TOPIC_NAME = "First-Client" -async function main () { - const producer = kafka.producer(); - await producer.connect(); +const kafka = new Kafka({ + brokers: ["localhost:9092"], + clientId: "Processing App", +}); +async function main() { + const producer = kafka.producer(); + + await retryLogic(async () => { + await producer.connect(); + console.log("Producer connected to kafka successfully"); + }, 3); - while (1) { - const pedningRows = await prismaClient.workflowExecutionTable.findMany({ - where : {}, - take : 10 - }) + while (true) { + try { + const pendingRows = await prismaClient.workflowExecutionTable.findMany({ + // take: 10, + where : {sent : false} + }); + if (pendingRows.length > 0) { await producer.send({ - topic: TOPIC_NAME, - messages: pedningRows.map(r => ({ value: String(r.workflowExecutionId) })) + topic: TOPIC_NAME, + + messages: pendingRows.map((r) => ({ + value: r.workflowExecutionId, + })), }); - await prismaClient.workflowExecutionTable.deleteMany({ - where : { - id : { - in : pedningRows.map(r => r.id) - } - } - }) + await prismaClient.workflowExecutionTable.updateMany({ + where: { id: { in: pendingRows.map((r) => r.id) } }, + data : {sent : true} + }); + console.log(`Published to kafka with ${pendingRows.length} to kafka `); + } + + await new Promise((resolve) => setTimeout(resolve, 1000)); + } catch (error) { + console.log("Processing Error", error); + // Continue loop even if there's an error } + } } -main() -console.log("Hello world"); +main(); diff --git a/apps/processor/src/lib/retry.ts b/apps/processor/src/lib/retry.ts new file mode 100644 index 0000000..bb0797c --- /dev/null +++ b/apps/processor/src/lib/retry.ts @@ -0,0 +1,19 @@ +export async function retryLogic( + fn: () => Promise, + maxRetries = 3 +): Promise { + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + const result = await fn(); + return result; + } catch (error) { + if (attempt === maxRetries - 1) + throw Error("Max Retries Reached. Try again after some time"); + + const delay = Math.pow(2, attempt) * 1000; + console.log(`Retrying again with attempt ${attempt} after ${delay} ms`); + await new Promise((res) => setTimeout(res, delay)); + } + } + throw Error("Max Retries Reached. Try again after some time Technically this won't get executed but putting for the sake of making typescript happy "); +} diff --git a/apps/processor/tsconfig.json b/apps/processor/tsconfig.json index e1035ba..d787082 100644 --- a/apps/processor/tsconfig.json +++ b/apps/processor/tsconfig.json @@ -3,5 +3,7 @@ "compilerOptions" : { "rootDir" : "./src", "outDir" : "./dist" - } + }, + "include": ["src"] + } \ No newline at end of file diff --git a/apps/processor/tsconfig.tsbuildinfo b/apps/processor/tsconfig.tsbuildinfo index 44940f3..003e3ae 100644 --- a/apps/processor/tsconfig.tsbuildinfo +++ b/apps/processor/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/index.ts"],"version":"5.7.3"} \ No newline at end of file +{"root":["./src/index.ts","./src/lib/retry.ts"],"version":"5.7.3"} \ No newline at end of file diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index edddb57..7e21468 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -118,6 +118,7 @@ model WorkflowExecutionTable { id String @id @default(uuid()) workflowExecutionId String @unique workflowExecution WorkflowExecution @relation(fields: [workflowExecutionId], references: [id]) + sent Boolean @default(false) } enum WorkflowStatus {