diff --git a/apps/http-backend/tsconfig.tsbuildinfo b/apps/http-backend/tsconfig.tsbuildinfo index 82bdad7..cd93c21 100644 --- a/apps/http-backend/tsconfig.tsbuildinfo +++ b/apps/http-backend/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/index.ts","./src/routes/google_callback.ts","./src/routes/nodes.routes.ts","./src/routes/userRoutes/userMiddleware.ts","./src/routes/userRoutes/userRoutes.ts"],"version":"5.7.3"} \ No newline at end of file +{"root":["./src/index.ts","./src/routes/google_callback.ts","./src/routes/nodes.routes.ts","./src/routes/userroutes/usermiddleware.ts","./src/routes/userroutes/userroutes.ts","./src/scheduler/token-scheduler.ts","./src/services/token-refresh.service.ts"],"version":"5.7.3"} \ No newline at end of file diff --git a/apps/web/next.config.mjs b/apps/web/next.config.mjs index bd167d9..f1c82ff 100644 --- a/apps/web/next.config.mjs +++ b/apps/web/next.config.mjs @@ -1,6 +1,7 @@ /** @type {import('next').NextConfig} */ const nextConfig = { - transpilePackages: ["@workspace/ui"], + transpilePackages: ["@workspace/ui", "@repo/db"], + serverExternalPackages: ["@prisma/client", "prisma"], } export default nextConfig diff --git a/apps/worker/src/engine/executor.ts b/apps/worker/src/engine/executor.ts index e058e71..a12a5c9 100644 --- a/apps/worker/src/engine/executor.ts +++ b/apps/worker/src/engine/executor.ts @@ -34,32 +34,36 @@ export async function executeWorkflow( status: "InProgress", }, }); + if(!update.error) console.log('updated the workflow execution') const nodes = data?.workflow.nodes; + console.log(`Total nodes - ${nodes.length}`) for (const node of nodes) { + console.log(`${node.name}, ${node.position}th - started Execution`) const nodeType = node.AvailableNode.type; const context = { - nodeId: node.id, + // nodeId: node.id, userId: data.workflow.userId, - credentialId: node.credentials[0]?.id, + credId: node.credentials[0]?.id, config: node.config as Record, inputData: currentInputData, }; - + console.log(`Executing with context: ${context}`) const execute = await register.execute(nodeType, context); - if (!execute.success) { - await prismaClient.workflowExecution.update({ - where: { id: workflowExecutionId }, - data: { - status: "Failed", - error: execute.error, - completedAt: new Date(), - }, - }); - return; - } + // if (!execute.success) { + // await prismaClient.workflowExecution.update({ + // where: { id: workflowExecutionId }, + // data: { + // status: "Failed", + // error: execute.error, + // completedAt: new Date(), + // }, + // }); + // return; + // } currentInputData = execute.output; + console.log("output: ", JSON.stringify(execute)) } const updatedStatus = await prismaClient.workflowExecution.update({ where: { id: workflowExecutionId }, diff --git a/apps/worker/src/engine/registory.ts b/apps/worker/src/engine/registory.ts index 16954bb..d4ba2a5 100644 --- a/apps/worker/src/engine/registory.ts +++ b/apps/worker/src/engine/registory.ts @@ -1,5 +1,6 @@ import { ExecutionContext, ExecutionResult, NodeExecutor } from "../types.js"; import { GmailExecutor } from "@repo/nodes/nodeClient"; +import {GoogleSheetsNodeExecutor} from "@repo/nodes/nodeClient"; class ExecutionRegistry { private executors = new Map(); @@ -19,7 +20,7 @@ class ExecutionRegistry { }; } try { - const result = await executor.execute(context); + const result = await executor.execute(context) console.log("Execute result:", result); return result; @@ -38,6 +39,7 @@ class ExecutionRegistry { //wehen visits this next time make sure chang gmail executor implements NodeExecutor this.register("gmail", new GmailExecutor() as unknown as NodeExecutor); + this.register("google_sheet", new GoogleSheetsNodeExecutor() as unknown as NodeExecutor) console.log(`The current Executors are ${this.executors.size}`); } } diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index de8e826..b752a45 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -1,5 +1,6 @@ import { Kafka } from "kafkajs"; import { executeWorkflow } from "./engine/executor.js"; +import { register } from "./engine/registory.js"; const kafka = new Kafka({ // clientId: "Processing App", clientId: "BuildFlow-Worker", @@ -7,6 +8,8 @@ const kafka = new Kafka({ }); const TOPIC_NAME = "First-Client"; +register.initialize() + async function main() { const consumer = kafka.consumer({ groupId: "test-group" }); await consumer.connect(); diff --git a/apps/worker/src/types.ts b/apps/worker/src/types.ts index dfb0927..34ab9bc 100644 --- a/apps/worker/src/types.ts +++ b/apps/worker/src/types.ts @@ -1,5 +1,5 @@ export interface ExecutionContext { - nodeId: string; + nodeId?: string; userId: string; credentialsId?: string; config: Record; diff --git a/apps/worker/tsconfig.tsbuildinfo b/apps/worker/tsconfig.tsbuildinfo index bab5e89..dd1e64e 100644 --- a/apps/worker/tsconfig.tsbuildinfo +++ b/apps/worker/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/index.ts","./src/test.ts","./src/types.ts","./src/engine/executor.ts","./src/engine/registory.ts"],"version":"5.7.3"} \ No newline at end of file +{"root":["./src/index.ts","./src/types.ts","./src/engine/executor.ts","./src/engine/registory.ts","./src/tests/create-execution.ts","./src/tests/setup-test.ts","./src/tests/test.ts","./src/tests/update-node.ts"],"version":"5.7.3"} \ No newline at end of file diff --git a/package.json b/package.json index 9bfde54..550f954 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "turbo": "^2.7.2", "typescript": "5.7.3" }, - "packageManager": "pnpm@10.26.2", + "packageManager": "pnpm@10.27.0", "engines": { "node": ">=20" } diff --git a/packages/db/package.json b/packages/db/package.json index ebb4e49..e17c123 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -24,4 +24,4 @@ "dotenv": "^17.2.3", "prisma": "^6.19.0" } -} +} \ No newline at end of file diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 7e21468..602b1bf 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -1,5 +1,6 @@ generator client { provider = "prisma-client-js" + // output = "../node_modules/.prisma/client" } datasource db { @@ -117,8 +118,8 @@ model NodeExecution { model WorkflowExecutionTable { id String @id @default(uuid()) workflowExecutionId String @unique + sent Boolean @default(false) workflowExecution WorkflowExecution @relation(fields: [workflowExecutionId], references: [id]) - sent Boolean @default(false) } enum WorkflowStatus { diff --git a/packages/db/tsconfig.tsbuildinfo b/packages/db/tsconfig.tsbuildinfo index 968d9e9..6ea85aa 100644 --- a/packages/db/tsconfig.tsbuildinfo +++ b/packages/db/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/index.ts","./src/seed.ts"],"version":"5.9.2"} \ No newline at end of file +{"root":["./src/index.ts","./src/seed.ts"],"version":"5.7.3"} \ No newline at end of file diff --git a/packages/nodes/src/common/google-oauth-service.ts b/packages/nodes/src/common/google-oauth-service.ts index ccd1916..c3b721b 100644 --- a/packages/nodes/src/common/google-oauth-service.ts +++ b/packages/nodes/src/common/google-oauth-service.ts @@ -124,7 +124,7 @@ class GoogleOAuthService { `Failed to get all credentials: ${err instanceof Error ? err.message : "unknown error"}` ); } - + } async updateCredentials(credentialId: string, tokens: Partial): Promise { try{ const existing = await this.prisma.credential.findUnique({ @@ -164,7 +164,7 @@ class GoogleOAuthService { } } } -} + export { GoogleOAuthService }; export type { OAuthTokens }; diff --git a/packages/nodes/src/gmail/gmail.executor.ts b/packages/nodes/src/gmail/gmail.executor.ts index 7241e6a..9efc990 100644 --- a/packages/nodes/src/gmail/gmail.executor.ts +++ b/packages/nodes/src/gmail/gmail.executor.ts @@ -71,6 +71,6 @@ class GmailExecutor { } } -export default GmailExecutor; +export { GmailExecutor } ; // export { default as GmailExecutor } from "./gmail.executor.js"; // export { GmailService } from "./gmail.service.js"; diff --git a/packages/nodes/src/gmail/gmail.node.ts b/packages/nodes/src/gmail/gmail.node.ts new file mode 100644 index 0000000..ffc79fa --- /dev/null +++ b/packages/nodes/src/gmail/gmail.node.ts @@ -0,0 +1,43 @@ +import NodeRegistry from "../registry/node-registry.js"; +import { GmailExecutor } from "./gmail.executor.js"; + +export class GmailNode { + static definition = { + name: "Gmail", + type: "gmail", + description: 'Communicate with Gmail', + config: { + fields:[ + { + name: "From", + type: "email", + require: true + }, + { + name: "To", + type: "email", + require: true + }, + { + name:"Body", + type: "textArea", + require: true + } + ] + }, + requireAuth: true, + authType: 'google_oauth' + + }; + + static async register(){ + await NodeRegistry.register(this.definition) + // console.log(`✅ Registered node: ${this.definition.name}`); + await NodeRegistry.registerTrigger(this.definition) + // console.log(`✅ Registered Trigger: ${this.definition.name}`); + } + + static getExecutor(){ + return new GmailExecutor(); + } +} \ No newline at end of file diff --git a/packages/nodes/src/google-sheets/google-sheets.executor.ts b/packages/nodes/src/google-sheets/google-sheets.executor.ts index db360bf..f416dd2 100644 --- a/packages/nodes/src/google-sheets/google-sheets.executor.ts +++ b/packages/nodes/src/google-sheets/google-sheets.executor.ts @@ -139,6 +139,7 @@ class GoogleSheetsNodeExecutor{ } const operation = context.config.operation; + console.log("operation from sheet executor: ",operation) switch(operation){ case 'read_rows': return await this.executeReadRows(sheetService,context); diff --git a/packages/nodes/src/google-sheets/google-sheets.service.ts b/packages/nodes/src/google-sheets/google-sheets.service.ts index 1a16357..b70c437 100644 --- a/packages/nodes/src/google-sheets/google-sheets.service.ts +++ b/packages/nodes/src/google-sheets/google-sheets.service.ts @@ -1,103 +1,112 @@ -import { google, sheets_v4, drive_v3 } from "googleapis"; -import { OAuth2Client } from "google-auth-library"; -// import { OAuthTokens } from "../common/google-oauth-service.js"; - -interface GoogleSheetsCredentials { - access_token: string; - refresh_token: string; - token_type: string; - expiry_date: number; +import { google, sheets_v4, drive_v3 } from 'googleapis'; +import { OAuth2Client } from 'google-auth-library'; +import { OAuthTokens } from '../common/google-oauth-service.js'; + +interface GoogleSheetsCredentials{ + access_token: string, + refresh_token: string, + token_type: string, + expiry_date: number } -interface ReadRowsParams { - spreadsheetId: string; - range: string; +interface ReadRowsParams{ + spreadsheetId: string, + range: string } -class GoogleSheetsService { - private sheets: sheets_v4.Sheets; - private auth: OAuth2Client; - private drive: drive_v3.Drive; - constructor(credentials: GoogleSheetsCredentials) { - this.auth = new google.auth.OAuth2( - process.env.GOOGLE_CLIENT_ID, - process.env.GOOGLE_CLIENT_SECRET, - process.env.GOOGLE_REDIRECT_URI - ); - - this.auth.setCredentials({ - access_token: credentials.access_token, - refresh_token: credentials.refresh_token, - token_type: credentials.token_type, - expiry_date: credentials.expiry_date, - }); - - this.sheets = google.sheets({ - version: "v4", - auth: this.auth, - }); - - this.drive = google.drive({ - version: "v3", - auth: this.auth, - }); - } - - async getSheets(): Promise { - const files = await this.drive.files.list({ - q: "mimeType='application/vnd.google-apps.spreadsheet'", - spaces: "drive", - pageSize: 10, - fields: "files(id, name, createdTime)", - }); - - if (files) { - return { - success: true, - data: files, - }; +class GoogleSheetsService{ + private sheets : sheets_v4.Sheets; + private auth: OAuth2Client; + private drive: drive_v3.Drive; + constructor(credentials: GoogleSheetsCredentials){ + this.auth = new google.auth.OAuth2( + process.env.GOOGLE_CLIENT_ID, + process.env.GOOGLE_CLIENT_SECRET, + process.env.GOOGLE_REDIRECT_URI + ); + + this.auth.setCredentials({ + access_token: credentials.access_token, + refresh_token: credentials.refresh_token, + token_type: credentials.token_type, + expiry_date: credentials.expiry_date + }); + + this.sheets = google.sheets({ + version: 'v4', + auth: this.auth + }); + + this.drive = google.drive({ + version: 'v3', + auth: this.auth + }) + + } + + async getSheets(): Promise{ + const files = await this.drive.files.list({ + q: "mimeType='application/vnd.google-apps.spreadsheet'", + spaces: 'drive', + pageSize: 10, + fields: 'files(id, name, createdTime)', + }) + + if(files){ + return { + success: true, + data: files + } + } + return { + success: false, + data: null + } + } + + async getSheetTabs(spreadsheetId: string): Promise{ + try{ + const response = await this.sheets.spreadsheets.get({ + spreadsheetId: spreadsheetId, + fields: 'sheets.properties' + }); + + const tabs = response.data.sheets?.map(sheet => ({ + id: sheet.properties?.sheetId, + name: sheet.properties?.title + })) || []; + + return { + success: true, + data: tabs + }; + } + catch(error){ + return { + success: false, + error: error instanceof Error ? error.message : 'Failed to fetch sheet tabs' + }; + } } - return { - success: false, - data: null, - }; - } - - async getSheetTabs(spreadsheetId: string): Promise { - try { - const response = await this.sheets.spreadsheets.get({ - spreadsheetId: spreadsheetId, - fields: "sheets.properties", - }); - - const tabs = - response.data.sheets?.map((sheet) => ({ - id: sheet.properties?.sheetId, - name: sheet.properties?.title, - })) || []; - - return { - success: true, - data: tabs, - }; - } catch (error) { - return { - success: false, - error: - error instanceof Error ? error.message : "Failed to fetch sheet tabs", - }; + + async readRows(params: ReadRowsParams): Promise{ + try{ + const response = await this.sheets.spreadsheets.values.get({ + spreadsheetId: params.spreadsheetId, + range: params.range + }); + return response.data.values || [] + } + catch(error){ + throw new Error(`Failed to fetch the rows: ${error}`) + } } - } - - async readRows(params: ReadRowsParams): Promise { - try { - const response = await this.sheets.spreadsheets.values.get({ - spreadsheetId: params.spreadsheetId, - range: params.range, - }); - return response.data.values || []; - } catch (error) { - throw new Error(`Failed to fetch the rows: ${error}`); + + isTokenExpired():boolean { + const credentials = this.auth.credentials; + if( !credentials.expiry_date) return false; + + return Date.now() >= credentials.expiry_date - (5 *60 * 1000); } async refreshAccessToken(): Promise { @@ -124,8 +133,7 @@ class GoogleSheetsService { throw new Error(`Failed to refresh token: ${error}`) } } - } } -export { GoogleSheetsService }; -export type { GoogleSheetsCredentials, ReadRowsParams }; +export { GoogleSheetsService } +export type { GoogleSheetsCredentials, ReadRowsParams } \ No newline at end of file diff --git a/packages/nodes/src/index.ts b/packages/nodes/src/index.ts index 5b229fe..20f9caf 100644 --- a/packages/nodes/src/index.ts +++ b/packages/nodes/src/index.ts @@ -1,6 +1,8 @@ // Central export for all major modules // export { default as NodeRegistry } from './registry/node-registry.js'; +export { GmailExecutor } from './gmail/gmail.executor.js'; + // Fixed lint error: Use correct export name 'GoogleSheetNode' export { GoogleSheetNode } from './google-sheets/google-sheets.node.js'; diff --git a/packages/nodes/src/registry/node-registry.ts b/packages/nodes/src/registry/node-registry.ts index 187f3af..9362317 100644 --- a/packages/nodes/src/registry/node-registry.ts +++ b/packages/nodes/src/registry/node-registry.ts @@ -1,5 +1,7 @@ import { prismaClient } from "@repo/db"; import { GoogleSheetNode } from "../google-sheets/google-sheets.node.js"; +import { GmailService } from "../gmail/gmail.service.js"; +import { GmailNode } from "../gmail/gmail.node.js"; interface NodeDefinition { name: string; @@ -68,6 +70,7 @@ class NodeRegistry { } static async registerAll() { await GoogleSheetNode.register(); + await GmailNode.register(); } } diff --git a/packages/nodes/tsconfig.tsbuildinfo b/packages/nodes/tsconfig.tsbuildinfo index 57f96b1..716ded6 100644 --- a/packages/nodes/tsconfig.tsbuildinfo +++ b/packages/nodes/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/index.ts","./src/common/google-oauth-service.ts","./src/gmail/gmail.executor.ts","./src/gmail/gmail.service.ts","./src/google-sheets/google-sheets.executor.ts","./src/google-sheets/google-sheets.node.ts","./src/google-sheets/google-sheets.service.ts","./src/google-sheets/test.ts","./src/registry/node-registry.ts"],"version":"5.7.3"} \ No newline at end of file +{"root":["./src/index.ts","./src/common/google-oauth-service.ts","./src/gmail/gmail.executor.ts","./src/gmail/gmail.node.ts","./src/gmail/gmail.service.ts","./src/google-sheets/google-sheets.executor.ts","./src/google-sheets/google-sheets.node.ts","./src/google-sheets/google-sheets.service.ts","./src/google-sheets/test.ts","./src/registry/node-registry.ts"],"version":"5.7.3"} \ No newline at end of file