diff --git a/apps/api/src/controllers/Workflows.ts b/apps/api/src/controllers/Workflows.ts index 64afe64b..eed3e2ee 100644 --- a/apps/api/src/controllers/Workflows.ts +++ b/apps/api/src/controllers/Workflows.ts @@ -305,6 +305,26 @@ export class Workflows { return res.status(201).json(execution); } + /** + * GET /workflows/:id/step-execution-counts + * Get active execution counts per step for a workflow + */ + @Get(':id/step-execution-counts') + @Middleware([requireAuth, requireEmailVerified]) + @CatchAsync + public async getStepExecutionCounts(req: Request, res: Response, _next: NextFunction) { + const auth = res.locals.auth; + const workflowId = req.params.id; + + if (!workflowId) { + return res.status(400).json({error: 'Workflow ID is required'}); + } + + const result = await WorkflowService.getStepExecutionCounts(auth.projectId!, workflowId); + + return res.status(200).json(result); + } + /** * GET /workflows/:id/executions * List executions for a workflow diff --git a/apps/api/src/services/ActivityService.ts b/apps/api/src/services/ActivityService.ts index 9a66e383..e6455c35 100644 --- a/apps/api/src/services/ActivityService.ts +++ b/apps/api/src/services/ActivityService.ts @@ -744,7 +744,7 @@ export class ActivityService { }); return stepExecutions - .filter(stepExec => stepExec.step.type === 'SEND_EMAIL' && stepExec.scheduledFor) + .filter(stepExec => stepExec.step?.type === 'SEND_EMAIL' && stepExec.scheduledFor) .map(stepExec => ({ id: `workflow_step_${stepExec.id}_scheduled`, type: ActivityType.WORKFLOW_EMAIL_SCHEDULED, @@ -753,9 +753,9 @@ export class ActivityService { contactId: stepExec.execution.contactId, metadata: { workflowName: stepExec.execution.workflow.name, - stepName: stepExec.step.name, + stepName: stepExec.step?.name, subject: - stepExec.step.config && + stepExec.step?.config && typeof stepExec.step.config === 'object' && stepExec.step.config !== null && 'subject' in stepExec.step.config && diff --git a/apps/api/src/services/EventService.ts b/apps/api/src/services/EventService.ts index af678fc7..48a02400 100644 --- a/apps/api/src/services/EventService.ts +++ b/apps/api/src/services/EventService.ts @@ -1,7 +1,7 @@ import type {Event} from '@plunk/db'; import {Prisma} from '@plunk/db'; import type {FilterCondition, FilterGroup} from '@plunk/types'; -import {toPrismaJson} from '@plunk/types'; +import {buildWorkflowSnapshot, toPrismaJson} from '@plunk/types'; import signale from 'signale'; import {prisma} from '../database/prisma.js'; @@ -466,6 +466,22 @@ export class EventService { return; } + // Build workflow snapshot — freeze the graph so in-flight executions are immune to live edits + const fullWorkflow = await prisma.workflow.findUniqueOrThrow({ + where: {id: workflowId}, + include: { + steps: { + include: { + template: { + select: {id: true, subject: true, body: true, from: true, fromName: true, replyTo: true}, + }, + outgoingTransitions: true, + }, + }, + }, + }); + const snapshot = buildWorkflowSnapshot(fullWorkflow); + // Create workflow execution const execution = await prisma.workflowExecution.create({ data: { @@ -474,6 +490,7 @@ export class EventService { status: 'RUNNING', currentStepId: triggerStep.id, context: context ? toPrismaJson(context) : undefined, + workflowSnapshot: toPrismaJson(snapshot), }, }); diff --git a/apps/api/src/services/WorkflowExecutionService.ts b/apps/api/src/services/WorkflowExecutionService.ts index 926eddd9..00ed517d 100644 --- a/apps/api/src/services/WorkflowExecutionService.ts +++ b/apps/api/src/services/WorkflowExecutionService.ts @@ -1,14 +1,13 @@ import type { Contact, Prisma, - Template, Workflow, WorkflowExecution, - WorkflowStep, WorkflowStepExecution, } from '@plunk/db'; import {StepExecutionStatus, WorkflowExecutionStatus} from '@plunk/db'; -import {toPrismaJson} from '@plunk/types'; +import {resolveWorkflowGraph, toPrismaJson} from '@plunk/types'; +import type {ResolvedWorkflowGraph, WorkflowSnapshotStep} from '@plunk/types'; import {renderTemplate, WorkflowStepConfigSchemas} from '@plunk/shared'; import signale from 'signale'; @@ -23,15 +22,6 @@ import {QueueService} from './QueueService.js'; type StepConfig = Prisma.JsonValue; type StepResult = Record; type WorkflowExecutionWithRelations = WorkflowExecution & {contact: Contact; workflow: Workflow}; -type WorkflowStepWithTemplate = WorkflowStep & {template?: Template | null}; -type WorkflowStepWithTransitions = WorkflowStep & { - outgoingTransitions?: Array<{ - id: string; - condition: Prisma.JsonValue; - priority: number; - toStep: WorkflowStep; - }>; -}; /** * Core Workflow Execution Engine @@ -113,7 +103,9 @@ export class WorkflowExecutionService { // Allow execution to continue - no action needed } - const step = initialExecution.workflow.steps.find(s => s.id === stepId); + const graph = resolveWorkflowGraph(initialExecution.workflowSnapshot, initialExecution.workflow); + + const step = graph.steps.find(s => s.id === stepId); if (!step) { signale.error(`[WORKFLOW] Step ${stepId} not found in workflow ${initialExecution.workflow.id}`); throw new HttpException(404, 'Step not found in workflow'); @@ -209,7 +201,7 @@ export class WorkflowExecutionService { try { // Execute the step based on its type signale.info(`[WORKFLOW] Executing step ${stepId} of type ${step.type}`); - const result = await this.executeStep(step, execution, stepExecution); + const result = await this.executeStep(step, execution, stepExecution, graph); signale.info(`[WORKFLOW] Step ${stepId} executed successfully`); // Check if step is in a waiting state (WAIT_FOR_EVENT steps only) @@ -248,7 +240,7 @@ export class WorkflowExecutionService { }); // Determine next step(s) based on transitions and conditions - await this.processNextSteps(execution, step, result); + await this.processNextSteps(execution, step, result, graph); } catch (error) { signale.error(`[WORKFLOW] Error executing step ${step.id}:`, error); // Mark step as failed @@ -301,19 +293,28 @@ export class WorkflowExecutionService { * Called by BullMQ worker when timeout job executes */ public static async processTimeout(executionId: string, stepId: string, stepExecutionId: string): Promise { - // Fetch the step execution + // Fetch the step execution along with the workflow execution snapshot and live step data const stepExecution = await prisma.workflowStepExecution.findUnique({ where: {id: stepExecutionId}, include: { - execution: true, - step: { + execution: { include: { - outgoingTransitions: { - include: {toStep: true}, - orderBy: {priority: 'asc'}, + workflow: { + include: { + steps: { + include: { + template: true, + outgoingTransitions: { + orderBy: {priority: 'asc'}, + include: {toStep: true}, + }, + }, + }, + }, }, }, }, + step: true, }, }); @@ -326,6 +327,16 @@ export class WorkflowExecutionService { return; } + // Resolve graph from snapshot (or fall back to live workflow data) + const graph = resolveWorkflowGraph( + stepExecution.execution.workflowSnapshot, + stepExecution.execution.workflow, + ); + + // Use snapshot step config for eventName if available, otherwise live step config + const snapshotStep = graph.steps.find(s => s.id === stepExecution.stepId); + const stepConfig = snapshotStep?.config ?? stepExecution.step?.config ?? null; + // Mark step as completed with timeout await prisma.workflowStepExecution.update({ where: {id: stepExecution.id}, @@ -335,17 +346,20 @@ export class WorkflowExecutionService { output: { timedOut: true, eventName: - stepExecution.step.config && - typeof stepExecution.step.config === 'object' && - 'eventName' in stepExecution.step.config - ? stepExecution.step.config.eventName + stepConfig && + typeof stepConfig === 'object' && + 'eventName' in stepConfig + ? stepConfig.eventName : undefined, }, }, }); - // Continue workflow - find transitions with timeout/fallback logic - const transitions = stepExecution.step.outgoingTransitions || []; + // Continue workflow - find transitions with timeout/fallback logic using graph + const transitions = graph.transitions + .filter(t => t.fromStepId === stepExecution.stepId) + .sort((a, b) => a.priority - b.priority); + const fallbackTransition = transitions.find( t => (t.condition && @@ -357,20 +371,8 @@ export class WorkflowExecutionService { if (fallbackTransition) { // Follow timeout branch - await prisma.workflowExecution.update({ - where: {id: stepExecution.executionId}, - data: { - status: WorkflowExecutionStatus.RUNNING, - currentStepId: fallbackTransition.toStep.id, - }, - }); - - await this.processStepExecution(stepExecution.executionId, fallbackTransition.toStep.id); - } else if (transitions.length > 0) { - // No timeout branch, follow first transition - const firstTransition = transitions[0]; - if (firstTransition?.toStep) { - const nextStep = firstTransition.toStep; + const nextStep = graph.steps.find(s => s.id === fallbackTransition.toStepId); + if (nextStep) { await prisma.workflowExecution.update({ where: {id: stepExecution.executionId}, data: { @@ -381,6 +383,23 @@ export class WorkflowExecutionService { await this.processStepExecution(stepExecution.executionId, nextStep.id); } + } else if (transitions.length > 0) { + // No timeout branch, follow first transition + const firstTransition = transitions[0]; + if (firstTransition) { + const nextStep = graph.steps.find(s => s.id === firstTransition.toStepId); + if (nextStep) { + await prisma.workflowExecution.update({ + where: {id: stepExecution.executionId}, + data: { + status: WorkflowExecutionStatus.RUNNING, + currentStepId: nextStep.id, + }, + }); + + await this.processStepExecution(stepExecution.executionId, nextStep.id); + } + } } else { // No transitions, complete workflow await prisma.workflowExecution.update({ @@ -418,24 +437,35 @@ export class WorkflowExecutionService { execution: { include: { contact: true, - workflow: true, - }, - }, - step: { - include: { - outgoingTransitions: { - orderBy: {priority: 'asc'}, - include: {toStep: true}, + workflow: { + include: { + steps: { + include: { + template: true, + outgoingTransitions: { + orderBy: {priority: 'asc'}, + include: {toStep: true}, + }, + }, + }, + }, }, }, }, + step: true, }, }); for (const stepExecution of waitingExecutions) { - const config = stepExecution.step.config; + // Use snapshot config if available, otherwise fall back to live step config + const graph = resolveWorkflowGraph( + stepExecution.execution.workflowSnapshot, + stepExecution.execution.workflow, + ); + const snapshotStep = graph.steps.find(s => s.id === stepExecution.stepId); + const stepConfig = snapshotStep?.config ?? stepExecution.step?.config ?? null; - if (config && typeof config === 'object' && 'eventName' in config && config.eventName === eventName) { + if (stepConfig && typeof stepConfig === 'object' && 'eventName' in stepConfig && stepConfig.eventName === eventName) { // Event matches, resume execution await prisma.workflowStepExecution.update({ where: {id: stepExecution.id}, @@ -453,8 +483,10 @@ export class WorkflowExecutionService { // Cancel any pending timeout job await QueueService.cancelWorkflowTimeout(stepExecution.id); - // Continue workflow - await this.processNextSteps(stepExecution.execution, stepExecution.step, {eventReceived: true}); + // Continue workflow (snapshotStep is always found since graph is built from snapshot or live data) + if (snapshotStep) { + await this.processNextSteps(stepExecution.execution, snapshotStep, {eventReceived: true}, graph); + } } } } @@ -463,9 +495,10 @@ export class WorkflowExecutionService { * Execute a specific step based on its type */ private static async executeStep( - step: WorkflowStepWithTemplate, + step: WorkflowSnapshotStep, execution: WorkflowExecutionWithRelations, stepExecution: WorkflowStepExecution, + graph: ResolvedWorkflowGraph, ): Promise { const config = step.config; @@ -477,7 +510,7 @@ export class WorkflowExecutionService { return await this.executeSendEmail(step, execution, stepExecution, config); case 'DELAY': - return await this.executeDelay(step, execution, stepExecution, config); + return await this.executeDelay(step, execution, stepExecution, config, graph); case 'WAIT_FOR_EVENT': return await this.executeWaitForEvent(step, execution, stepExecution, config); @@ -503,7 +536,7 @@ export class WorkflowExecutionService { * TRIGGER step - Entry point of workflow */ private static async executeTrigger( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, _execution: WorkflowExecutionWithRelations, _stepExecution: WorkflowStepExecution, config: StepConfig, @@ -522,7 +555,7 @@ export class WorkflowExecutionService { * SEND_EMAIL step - Send an email to the contact or a custom recipient */ private static async executeSendEmail( - step: WorkflowStepWithTemplate, + step: WorkflowSnapshotStep, execution: WorkflowExecutionWithRelations, stepExecution: WorkflowStepExecution, config: StepConfig, @@ -594,10 +627,11 @@ export class WorkflowExecutionService { * DELAY step - Wait for a specified duration */ private static async executeDelay( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, _execution: WorkflowExecutionWithRelations, stepExecution: WorkflowStepExecution, config: StepConfig, + graph: ResolvedWorkflowGraph, ): Promise { const {amount, unit} = WorkflowStepConfigSchemas.delay.parse(config); @@ -642,18 +676,18 @@ export class WorkflowExecutionService { }, }); - // Find next steps to queue - const transitions = await prisma.workflowTransition.findMany({ - where: {fromStepId: _step.id}, - include: {toStep: true}, - orderBy: {priority: 'asc'}, - }); + // Find next steps to queue using graph instead of live DB query + const transitions = graph.transitions + .filter(t => t.fromStepId === _step.id) + .sort((a, b) => a.priority - b.priority); if (transitions.length > 0) { const firstTransition = transitions[0]; - if (firstTransition?.toStep) { - const nextStep = firstTransition.toStep; - await QueueService.queueWorkflowStep(_execution.id, nextStep.id, Math.max(0, delayMs)); + if (firstTransition) { + const nextStep = graph.steps.find(s => s.id === firstTransition.toStepId); + if (nextStep) { + await QueueService.queueWorkflowStep(_execution.id, nextStep.id, Math.max(0, delayMs)); + } } } @@ -669,7 +703,7 @@ export class WorkflowExecutionService { * WAIT_FOR_EVENT step - Wait for a specific event to occur */ private static async executeWaitForEvent( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, _execution: WorkflowExecutionWithRelations, stepExecution: WorkflowStepExecution, config: StepConfig, @@ -713,7 +747,7 @@ export class WorkflowExecutionService { * CONDITION step - Evaluate a condition and determine branching */ private static async executeCondition( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, execution: WorkflowExecutionWithRelations, _stepExecution: WorkflowStepExecution, config: StepConfig, @@ -792,7 +826,7 @@ export class WorkflowExecutionService { * EXIT step - Terminate the workflow */ private static async executeExit( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, execution: WorkflowExecutionWithRelations, _stepExecution: WorkflowStepExecution, config: StepConfig, @@ -826,7 +860,7 @@ export class WorkflowExecutionService { * WEBHOOK step - Call an external webhook */ private static async executeWebhook( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, execution: WorkflowExecutionWithRelations, _stepExecution: WorkflowStepExecution, config: StepConfig, @@ -889,7 +923,7 @@ export class WorkflowExecutionService { * UPDATE_CONTACT step - Update contact data */ private static async executeUpdateContact( - _step: WorkflowStep, + _step: WorkflowSnapshotStep, execution: WorkflowExecutionWithRelations, _stepExecution: WorkflowStepExecution, config: StepConfig, @@ -928,10 +962,13 @@ export class WorkflowExecutionService { */ private static async processNextSteps( execution: WorkflowExecutionWithRelations, - currentStep: WorkflowStepWithTransitions, + currentStep: WorkflowSnapshotStep, stepResult: StepResult, + graph: ResolvedWorkflowGraph, ): Promise { - const transitions = currentStep.outgoingTransitions || []; + const transitions = graph.transitions + .filter(t => t.fromStepId === currentStep.id) + .sort((a, b) => a.priority - b.priority); if (transitions.length === 0) { // No more steps, complete the workflow @@ -954,7 +991,7 @@ export class WorkflowExecutionService { // If no condition, always follow if (!condition) { - nextStep = transition.toStep; + nextStep = graph.steps.find(s => s.id === transition.toStepId) ?? null; break; } @@ -968,13 +1005,13 @@ export class WorkflowExecutionService { 'branch' in condition && condition.branch === stepResult.branch ) { - nextStep = transition.toStep; + nextStep = graph.steps.find(s => s.id === transition.toStepId) ?? null; break; } // For other conditional logic if (this.evaluateTransitionCondition(condition, stepResult, execution)) { - nextStep = transition.toStep; + nextStep = graph.steps.find(s => s.id === transition.toStepId) ?? null; break; } } diff --git a/apps/api/src/services/WorkflowService.ts b/apps/api/src/services/WorkflowService.ts index 5253312e..9123bc34 100644 --- a/apps/api/src/services/WorkflowService.ts +++ b/apps/api/src/services/WorkflowService.ts @@ -1,7 +1,7 @@ import type {Workflow, WorkflowExecution, WorkflowStep, WorkflowTransition} from '@plunk/db'; import {Prisma, WorkflowExecutionStatus} from '@plunk/db'; import type {PaginatedResponse, WorkflowExecutionWithDetails, WorkflowWithDetails} from '@plunk/types'; -import {toPrismaJson} from '@plunk/types'; +import {buildWorkflowSnapshot, toPrismaJson} from '@plunk/types'; import signale from 'signale'; import {prisma} from '../database/prisma.js'; @@ -74,6 +74,7 @@ export class WorkflowService { }, include: { steps: { + where: {deletedAt: null}, include: { template: { select: { @@ -175,25 +176,6 @@ export class WorkflowService { // Verify workflow exists and belongs to project const workflow = await this.get(projectId, workflowId); - // Check if workflow is enabled and has active executions - if (workflow.enabled) { - const activeExecutions = await this.hasActiveExecutions(workflowId); - - if (activeExecutions > 0) { - // Block changes to trigger configuration while executions are running - const hasCriticalChanges = data.triggerType !== undefined || data.triggerConfig !== undefined; - - if (hasCriticalChanges) { - throw new HttpException( - 409, - `Cannot modify workflow trigger while workflow has ${activeExecutions} active execution(s). ` + - 'Please disable the workflow first or wait for executions to complete. ' + - 'You can still update name, description, and re-entry settings.', - ); - } - } - } - // Use transaction to update workflow and TRIGGER step atomically const updated = await prisma.$transaction(async tx => { const updateData: Prisma.WorkflowUpdateInput = {}; @@ -402,25 +384,6 @@ export class WorkflowService { throw new HttpException(404, 'Workflow step not found'); } - // Check if workflow is enabled and has active executions - if (workflow.enabled) { - const activeExecutions = await this.hasActiveExecutions(workflowId); - - if (activeExecutions > 0) { - // Only allow safe changes: name and position updates - const hasCriticalChanges = data.config !== undefined || data.templateId !== undefined; - - if (hasCriticalChanges) { - throw new HttpException( - 409, - `Cannot modify step configuration while workflow has ${activeExecutions} active execution(s). ` + - 'Please disable the workflow first or wait for executions to complete. ' + - 'You can still update the step name and position.', - ); - } - } - } - const updateData: Prisma.WorkflowStepUpdateInput = {}; if (data.name !== undefined) updateData.name = data.name; @@ -462,83 +425,10 @@ export class WorkflowService { throw new HttpException(400, 'Cannot delete the trigger step. Every workflow must have a trigger.'); } - // Check if workflow is enabled and has active executions on this step or downstream - if (workflow.enabled) { - // Check if any active executions are currently on this step - const executionsOnStep = await prisma.workflowExecution.count({ - where: { - workflowId, - currentStepId: stepId, - status: { - in: [WorkflowExecutionStatus.RUNNING, WorkflowExecutionStatus.WAITING], - }, - }, - }); - - if (executionsOnStep > 0) { - throw new HttpException( - 409, - `Cannot delete step "${step.name}" while ${executionsOnStep} execution(s) are currently on this step. ` + - 'Please disable the workflow first or wait for executions to complete.', - ); - } - - // Also check downstream steps for active executions - const allSteps = await prisma.workflowStep.findMany({ - where: {workflowId}, - include: {outgoingTransitions: true}, - }); - - // Build adjacency map - const adjacencyMap = new Map(); - for (const s of allSteps) { - adjacencyMap.set( - s.id, - s.outgoingTransitions.map(t => t.toStepId), - ); - } - - // Find all downstream steps - const downstreamSteps = new Set([stepId]); - const queue = [stepId]; - - while (queue.length > 0) { - const currentStepId = queue.shift()!; - const outgoingStepIds = adjacencyMap.get(currentStepId) || []; - - for (const nextStepId of outgoingStepIds) { - if (!downstreamSteps.has(nextStepId)) { - downstreamSteps.add(nextStepId); - queue.push(nextStepId); - } - } - } - - // Check if any active executions are on downstream steps - const executionsOnDownstream = await prisma.workflowExecution.count({ - where: { - workflowId, - currentStepId: {in: Array.from(downstreamSteps)}, - status: { - in: [WorkflowExecutionStatus.RUNNING, WorkflowExecutionStatus.WAITING], - }, - }, - }); - - if (executionsOnDownstream > 0) { - throw new HttpException( - 409, - `Cannot delete step "${step.name}" while ${executionsOnDownstream} execution(s) are on downstream steps. ` + - 'Deleting this step would orphan those executions. ' + - 'Please disable the workflow first or wait for executions to complete.', - ); - } - } - - // Find all downstream steps that need to be deleted (cascade) - // First, get all steps and transitions for this workflow to build a graph + // Find all downstream steps that need to be soft-deleted (cascade) + // Get all active steps and transitions for this workflow to build a graph const allSteps = await prisma.workflowStep.findMany({ - where: {workflowId}, + where: {workflowId, deletedAt: null}, include: {outgoingTransitions: true}, }); @@ -567,12 +457,25 @@ export class WorkflowService { } } - // Delete all affected steps (Prisma will cascade delete the transitions) - await prisma.workflowStep.deleteMany({ + const stepIds = Array.from(stepsToDelete); + + // Soft-delete all affected steps + await prisma.workflowStep.updateMany({ where: { - id: {in: Array.from(stepsToDelete)}, - workflowId, // Safety check to ensure we only delete steps from this workflow + id: {in: stepIds}, + workflowId, // Safety check to ensure we only affect steps from this workflow }, + data: {deletedAt: new Date()}, + }); + + // Hard-delete transitions FROM the soft-deleted steps (no longer relevant to the live graph) + await prisma.workflowTransition.deleteMany({ + where: {fromStepId: {in: stepIds}}, + }); + + // Hard-delete transitions TO the initially deleted step (disconnect from upstream) + await prisma.workflowTransition.deleteMany({ + where: {toStepId: stepId}, }); } @@ -669,60 +572,6 @@ export class WorkflowService { throw new HttpException(404, 'Transition not found'); } - // Check if workflow is enabled and has active executions that could be affected - if (workflow.enabled) { - // Get all steps that would become orphaned by removing this transition - const allSteps = await prisma.workflowStep.findMany({ - where: {workflowId}, - include: {outgoingTransitions: true, incomingTransitions: true}, - }); - - // Build adjacency map without this transition - const adjacencyMap = new Map(); - for (const s of allSteps) { - adjacencyMap.set( - s.id, - s.outgoingTransitions.filter(t => t.id !== transitionId).map(t => t.toStepId), - ); - } - - // Find all steps reachable from the toStep (downstream) - const downstreamSteps = new Set([transition.toStepId]); - const queue = [transition.toStepId]; - - while (queue.length > 0) { - const currentStepId = queue.shift()!; - const outgoingStepIds = adjacencyMap.get(currentStepId) || []; - - for (const nextStepId of outgoingStepIds) { - if (!downstreamSteps.has(nextStepId)) { - downstreamSteps.add(nextStepId); - queue.push(nextStepId); - } - } - } - - // Check if any active executions are on the toStep or downstream steps - const executionsAffected = await prisma.workflowExecution.count({ - where: { - workflowId, - currentStepId: {in: Array.from(downstreamSteps)}, - status: { - in: [WorkflowExecutionStatus.RUNNING, WorkflowExecutionStatus.WAITING], - }, - }, - }); - - if (executionsAffected > 0) { - throw new HttpException( - 409, - `Cannot delete transition from "${transition.fromStep.name}" to "${transition.toStep.name}" ` + - `while ${executionsAffected} execution(s) are on affected steps. ` + - 'Please disable the workflow first or wait for executions to complete.', - ); - } - } - await prisma.workflowTransition.delete({ where: {id: transitionId}, }); @@ -794,6 +643,22 @@ export class WorkflowService { throw new HttpException(400, 'Workflow has no trigger step'); } + // Build workflow snapshot — freeze the graph so in-flight executions are immune to live edits + const fullWorkflow = await prisma.workflow.findUniqueOrThrow({ + where: {id: workflowId}, + include: { + steps: { + include: { + template: { + select: {id: true, subject: true, body: true, from: true, fromName: true, replyTo: true}, + }, + outgoingTransitions: true, + }, + }, + }, + }); + const snapshot = buildWorkflowSnapshot(fullWorkflow); + // Create workflow execution const execution = await prisma.workflowExecution.create({ data: { @@ -802,6 +667,7 @@ export class WorkflowService { status: WorkflowExecutionStatus.RUNNING, currentStepId: triggerStep.id, context: context ?? Prisma.JsonNull, + workflowSnapshot: toPrismaJson(snapshot), }, }); @@ -995,6 +861,36 @@ export class WorkflowService { }; } + /** + * Get active execution counts per step for a workflow + */ + public static async getStepExecutionCounts( + projectId: string, + workflowId: string, + ): Promise> { + // Verify workflow belongs to project + await this.get(projectId, workflowId); + + const counts = await prisma.workflowExecution.groupBy({ + by: ['currentStepId'], + where: { + workflowId, + status: {in: [WorkflowExecutionStatus.RUNNING, WorkflowExecutionStatus.WAITING]}, + currentStepId: {not: null}, + }, + _count: {id: true}, + }); + + const result: Record = {}; + for (const row of counts) { + if (row.currentStepId) { + result[row.currentStepId] = row._count.id; + } + } + + return result; + } + /** * Check if a workflow has active executions */ diff --git a/apps/api/src/services/__tests__/WorkflowSnapshot.test.ts b/apps/api/src/services/__tests__/WorkflowSnapshot.test.ts new file mode 100644 index 00000000..898c3338 --- /dev/null +++ b/apps/api/src/services/__tests__/WorkflowSnapshot.test.ts @@ -0,0 +1,403 @@ +import {beforeEach, describe, expect, it, vi} from 'vitest'; +import { + StepExecutionStatus, + WorkflowExecutionStatus, + WorkflowStepType, +} from '@plunk/db'; +import {buildWorkflowSnapshot, toPrismaJson} from '@plunk/types'; +import {WorkflowExecutionService} from '../WorkflowExecutionService'; +import {WorkflowService} from '../WorkflowService'; +import {factories, getPrismaClient} from '../../../../../test/helpers'; + +// Mock QueueService to prevent actual BullMQ operations +vi.mock('../QueueService', () => ({ + QueueService: { + queueWorkflowStep: vi.fn(), + queueWorkflowTimeout: vi.fn(), + cancelWorkflowTimeout: vi.fn(), + }, +})); + +// Mock EmailService to prevent actual email sending +vi.mock('../EmailService', () => ({ + EmailService: { + sendWorkflowEmail: vi.fn().mockResolvedValue({id: 'mock-email-id', createdAt: new Date()}), + }, +})); + +// Mock NtfyService to prevent notification sending +vi.mock('../NtfyService', () => ({ + NtfyService: { + notifyWorkflowCreated: vi.fn(), + notifyWorkflowEnabled: vi.fn(), + notifyWorkflowDisabled: vi.fn(), + notifyWorkflowDeleted: vi.fn(), + notifyWorkflowExecutionFailed: vi.fn(), + }, +})); + +// Mock Redis +vi.mock('../../database/redis', () => { + const store = new Map(); + return { + redis: { + get: vi.fn(async (key: string) => { + const item = store.get(key); + if (!item) return null; + if (item.expiry && Date.now() > item.expiry) { + store.delete(key); + return null; + } + return item.value; + }), + set: vi.fn(async (key: string, value: string) => { + store.set(key, {value}); + return 'OK'; + }), + setex: vi.fn(async (key: string, seconds: number, value: string) => { + store.set(key, {value, expiry: Date.now() + seconds * 1000}); + return 'OK'; + }), + del: vi.fn(async (key: string) => { + store.delete(key); + return 1; + }), + incr: vi.fn(async () => 1), + expire: vi.fn(async () => 1), + clear: () => store.clear(), + }, + }; +}); + +const prisma = getPrismaClient(); + +/** + * Helper: create a workflow with steps and transitions, build a snapshot, and create an execution. + * This simulates what WorkflowService.startExecution() does but gives us full control. + */ +async function createWorkflowWithSnapshot(projectId: string, opts: { + steps: Array<{ + type: WorkflowStepType; + name?: string; + config?: unknown; + templateId?: string; + }>; + enabled?: boolean; +}) { + // Create workflow with trigger step + const workflow = await factories.createWorkflow({projectId, enabled: opts.enabled ?? true}); + + // Get the trigger step created by the factory + const triggerStep = await prisma.workflowStep.findFirst({ + where: {workflowId: workflow.id, type: 'TRIGGER'}, + }); + + // Create additional steps + const createdSteps = []; + for (let i = 0; i < opts.steps.length; i++) { + const s = opts.steps[i]; + const step = await factories.createWorkflowStep({ + workflowId: workflow.id, + type: s.type, + name: s.name || `Step ${i + 1}`, + config: s.config, + templateId: s.templateId, + }); + createdSteps.push(step); + } + + // Create transitions: trigger -> step1 -> step2 -> ... + const allSteps = [triggerStep!, ...createdSteps]; + for (let i = 0; i < allSteps.length - 1; i++) { + await prisma.workflowTransition.create({ + data: { + fromStepId: allSteps[i].id, + toStepId: allSteps[i + 1].id, + priority: 0, + }, + }); + } + + // Build snapshot by fetching the full workflow + const fullWorkflow = await prisma.workflow.findUniqueOrThrow({ + where: {id: workflow.id}, + include: { + steps: { + include: { + template: { + select: {id: true, subject: true, body: true, from: true, fromName: true, replyTo: true}, + }, + outgoingTransitions: true, + }, + }, + }, + }); + const snapshot = buildWorkflowSnapshot(fullWorkflow); + + return {workflow, triggerStep: triggerStep!, steps: createdSteps, allSteps, snapshot, fullWorkflow}; +} + +describe('Workflow Snapshot', () => { + let projectId: string; + + beforeEach(async () => { + const {project} = await factories.createUserWithProject(); + projectId = project.id; + }); + + describe('snapshot creation', () => { + it('should build a snapshot with all steps and transitions', async () => { + const template = await factories.createTemplate({projectId}); + const {snapshot, allSteps} = await createWorkflowWithSnapshot(projectId, { + steps: [ + {type: WorkflowStepType.SEND_EMAIL, templateId: template.id}, + {type: WorkflowStepType.DELAY, config: {amount: 1, unit: 'hours'}}, + ], + }); + + expect(snapshot.version).toBe(1); + expect(snapshot.steps).toHaveLength(allSteps.length); + expect(snapshot.transitions).toHaveLength(allSteps.length - 1); + + // Verify template content is denormalized + const emailStep = snapshot.steps.find(s => s.type === 'SEND_EMAIL'); + expect(emailStep?.template).toBeDefined(); + expect(emailStep?.template?.subject).toBe('Test Subject'); + expect(emailStep?.template?.body).toContain('Hello'); + }); + + it('should store snapshot on execution via WorkflowService.startExecution', async () => { + const template = await factories.createTemplate({projectId}); + const {workflow, steps} = await createWorkflowWithSnapshot(projectId, { + steps: [{type: WorkflowStepType.SEND_EMAIL, templateId: template.id}], + enabled: true, + }); + + const contact = await factories.createContact({projectId}); + const execution = await WorkflowService.startExecution(projectId, workflow.id, contact.id); + + // Fetch execution with snapshot + const fullExecution = await prisma.workflowExecution.findUniqueOrThrow({ + where: {id: execution.id}, + }); + + expect(fullExecution.workflowSnapshot).not.toBeNull(); + const snapshot = fullExecution.workflowSnapshot as Record; + expect(snapshot.version).toBe(1); + expect((snapshot.steps as unknown[]).length).toBeGreaterThan(0); + }); + }); + + describe('snapshot isolation', () => { + it('should use snapshot config even after step config is modified', async () => { + const template = await factories.createTemplate({projectId}); + const {workflow, triggerStep, steps, snapshot} = await createWorkflowWithSnapshot(projectId, { + steps: [{type: WorkflowStepType.SEND_EMAIL, templateId: template.id}], + }); + + const contact = await factories.createContact({projectId}); + + // Create execution WITH snapshot + const execution = await prisma.workflowExecution.create({ + data: { + workflowId: workflow.id, + contactId: contact.id, + status: WorkflowExecutionStatus.RUNNING, + currentStepId: triggerStep.id, + workflowSnapshot: toPrismaJson(snapshot), + }, + }); + + // NOW modify the live step config (simulating a user editing the workflow) + await prisma.workflowStep.update({ + where: {id: steps[0].id}, + data: { + config: {templateId: 'completely-different-template'}, + name: 'Modified Step Name', + }, + }); + + // Process the trigger step — engine should use snapshot, not live data + await WorkflowExecutionService.processStepExecution(execution.id, triggerStep.id); + + // The execution should have advanced past the trigger + const updatedExecution = await prisma.workflowExecution.findUniqueOrThrow({ + where: {id: execution.id}, + }); + + // Execution should still be running or completed (not failed) + expect([ + WorkflowExecutionStatus.RUNNING, + WorkflowExecutionStatus.COMPLETED, + ]).toContain(updatedExecution.status); + }); + + it('should use snapshot transitions even after transition is deleted', async () => { + const {workflow, triggerStep, steps, snapshot} = await createWorkflowWithSnapshot(projectId, { + steps: [{type: WorkflowStepType.EXIT}], + }); + + const contact = await factories.createContact({projectId}); + + // Create execution WITH snapshot + const execution = await prisma.workflowExecution.create({ + data: { + workflowId: workflow.id, + contactId: contact.id, + status: WorkflowExecutionStatus.RUNNING, + currentStepId: triggerStep.id, + workflowSnapshot: toPrismaJson(snapshot), + }, + }); + + // Delete the live transition (simulating user editing the workflow) + await prisma.workflowTransition.deleteMany({ + where: {fromStepId: triggerStep.id}, + }); + + // Process trigger — should still find the EXIT step via snapshot transitions + await WorkflowExecutionService.processStepExecution(execution.id, triggerStep.id); + + const updatedExecution = await prisma.workflowExecution.findUniqueOrThrow({ + where: {id: execution.id}, + }); + + // Should have reached EXIT and completed + // EXIT step sets EXITED, then processNextSteps (finding no transitions) overwrites to COMPLETED + expect(updatedExecution.status).toBe(WorkflowExecutionStatus.COMPLETED); + }); + + it('should use snapshot template content even after template is edited', async () => { + const template = await factories.createTemplate({ + projectId, + subject: 'Original Subject', + body: '

Original body

', + }); + + const {workflow, triggerStep, steps, snapshot} = await createWorkflowWithSnapshot(projectId, { + steps: [{type: WorkflowStepType.SEND_EMAIL, templateId: template.id}], + }); + + // Verify the snapshot captured original template content + const snapshotEmailStep = snapshot.steps.find(s => s.type === 'SEND_EMAIL'); + expect(snapshotEmailStep?.template?.subject).toBe('Original Subject'); + + // Edit the live template + await prisma.template.update({ + where: {id: template.id}, + data: { + subject: 'Modified Subject', + body: '

Modified body

', + }, + }); + + // The snapshot should still have the original content + expect(snapshotEmailStep?.template?.subject).toBe('Original Subject'); + expect(snapshotEmailStep?.template?.body).toBe('

Original body

'); + }); + }); + + describe('legacy fallback', () => { + it('should work without snapshot (legacy execution)', async () => { + const {workflow, triggerStep, steps} = await createWorkflowWithSnapshot(projectId, { + steps: [{type: WorkflowStepType.EXIT}], + }); + + const contact = await factories.createContact({projectId}); + + // Create execution WITHOUT snapshot (legacy behavior) + const execution = await prisma.workflowExecution.create({ + data: { + workflowId: workflow.id, + contactId: contact.id, + status: WorkflowExecutionStatus.RUNNING, + currentStepId: triggerStep.id, + // No workflowSnapshot — should fall back to live DB + }, + }); + + // Process trigger — should work via legacy fallback + await WorkflowExecutionService.processStepExecution(execution.id, triggerStep.id); + + const updatedExecution = await prisma.workflowExecution.findUniqueOrThrow({ + where: {id: execution.id}, + }); + + // Should have reached EXIT and completed + // EXIT step sets EXITED, then processNextSteps (finding no transitions) overwrites to COMPLETED + expect(updatedExecution.status).toBe(WorkflowExecutionStatus.COMPLETED); + }); + }); + + describe('condition step with snapshot', () => { + it('should evaluate conditions using snapshot config', async () => { + const {workflow, triggerStep, steps, snapshot} = await createWorkflowWithSnapshot(projectId, { + steps: [ + { + type: WorkflowStepType.CONDITION, + config: {field: 'data.plan', operator: 'equals', value: 'pro'}, + }, + {type: WorkflowStepType.EXIT, name: 'Yes Exit'}, + {type: WorkflowStepType.EXIT, name: 'No Exit'}, + ], + }); + + // Rewire transitions: condition -> yes exit (branch: yes), condition -> no exit (branch: no) + // First remove auto-created linear transitions + await prisma.workflowTransition.deleteMany({ + where: {fromStepId: steps[0].id}, + }); + await prisma.workflowTransition.create({ + data: {fromStepId: steps[0].id, toStepId: steps[1].id, condition: {branch: 'yes'}, priority: 0}, + }); + await prisma.workflowTransition.create({ + data: {fromStepId: steps[0].id, toStepId: steps[2].id, condition: {branch: 'no'}, priority: 1}, + }); + + // Rebuild snapshot with the new transitions + const fullWorkflow = await prisma.workflow.findUniqueOrThrow({ + where: {id: workflow.id}, + include: { + steps: { + include: { + template: { + select: {id: true, subject: true, body: true, from: true, fromName: true, replyTo: true}, + }, + outgoingTransitions: true, + }, + }, + }, + }); + const updatedSnapshot = buildWorkflowSnapshot(fullWorkflow); + + const contact = await factories.createContact({projectId, data: {plan: 'pro'}}); + + const execution = await prisma.workflowExecution.create({ + data: { + workflowId: workflow.id, + contactId: contact.id, + status: WorkflowExecutionStatus.RUNNING, + currentStepId: triggerStep.id, + workflowSnapshot: toPrismaJson(updatedSnapshot), + }, + }); + + // Now change the condition on the live step (user edits workflow) + await prisma.workflowStep.update({ + where: {id: steps[0].id}, + data: {config: {field: 'data.plan', operator: 'equals', value: 'enterprise'}}, + }); + + // Process from trigger + await WorkflowExecutionService.processStepExecution(execution.id, triggerStep.id); + + const updatedExecution = await prisma.workflowExecution.findUniqueOrThrow({ + where: {id: execution.id}, + }); + + // Contact has plan=pro, snapshot condition checks for plan=pro -> should take YES branch -> EXIT + // EXIT step sets EXITED, then processNextSteps (finding no transitions) overwrites to COMPLETED + expect(updatedExecution.status).toBe(WorkflowExecutionStatus.COMPLETED); + }); + }); +}); diff --git a/apps/web/src/components/WorkflowBuilder.tsx b/apps/web/src/components/WorkflowBuilder.tsx index be2ba176..eb9d2566 100644 --- a/apps/web/src/components/WorkflowBuilder.tsx +++ b/apps/web/src/components/WorkflowBuilder.tsx @@ -55,6 +55,7 @@ interface WorkflowBuilderProps { priority: number; }>; })[]; + stepExecutionCounts?: Record; onUpdate: () => void; } @@ -147,12 +148,21 @@ function getLayoutedElements(nodes: Node[], edges: Edge[]) { const layoutedNodes = nodes.map(node => { const nodeWithPosition = dagreGraph.node(node.id); + const dagrePosition = { + x: nodeWithPosition.x - nodeWidth / 2, + y: nodeWithPosition.y - nodeHeight / 2, + }; + + // Preserve user-saved positions for real step nodes (not addStep nodes). + // A non-default position means the user has dragged this node before. + const hasSavedPosition = + node.type !== 'addStep' && + node.position && + (node.position.x !== 0 || node.position.y !== 0); + return { ...node, - position: { - x: nodeWithPosition.x - nodeWidth / 2, - y: nodeWithPosition.y - nodeHeight / 2, - }, + position: hasSavedPosition ? node.position : dagrePosition, }; }); @@ -233,6 +243,7 @@ function CustomNode({ onDelete?: () => void; template?: {name: string}; config?: any; + executionCount?: number; }; }) { const Icon = data.icon; @@ -264,6 +275,17 @@ function CustomNode({ onMouseEnter={() => setShowActions(true)} onMouseLeave={() => setShowActions(false)} > + {/* Execution count badge */} + {data.executionCount && data.executionCount > 0 ? ( +
+ {data.executionCount} +
+ ) : null} + {/* Action buttons - shown on hover */} {showActions && data.type === 'TRIGGER' && (
@@ -435,7 +457,7 @@ const STEP_TYPE_OPTIONS = [ {value: 'EXIT', label: 'Exit', icon: LogOut, color: STEP_TYPE_COLORS.EXIT}, ]; -export function WorkflowBuilder({workflowId, steps, onUpdate}: WorkflowBuilderProps) { +export function WorkflowBuilder({workflowId, steps, stepExecutionCounts, onUpdate}: WorkflowBuilderProps) { const reactFlowInstance = useReactFlow(); const [addStepContext, setAddStepContext] = useState<{ fromStepId: string | null; @@ -490,6 +512,7 @@ export function WorkflowBuilder({workflowId, steps, onUpdate}: WorkflowBuilderPr bgColor, template: step.template, config: step.config, + executionCount: stepExecutionCounts?.[step.id] ?? 0, onEdit: () => handleEditStep(step.id), onDelete: () => handleDeleteStepClick(step.id), }, @@ -835,6 +858,23 @@ export function WorkflowBuilder({workflowId, steps, onUpdate}: WorkflowBuilderPr [steps], ); + // Save node position to DB when user finishes dragging + const handleNodeDragStop = useCallback( + async (_event: React.MouseEvent, node: Node) => { + // Only save positions for real step nodes, not addStep nodes + if (node.type === 'addStep') return; + + try { + await network.fetch('PATCH', `/workflows/${workflowId}/steps/${node.id}`, { + position: {x: Math.round(node.position.x), y: Math.round(node.position.y)}, + } as any); + } catch { + // Silently fail — position save is non-critical + } + }, + [workflowId], + ); + const handleDeleteStep = async () => { if (!stepToDelete) return; @@ -884,6 +924,7 @@ export function WorkflowBuilder({workflowId, steps, onUpdate}: WorkflowBuilderPr edges={edges} onNodesChange={handleNodesChange} onEdgesChange={onEdgesChange} + onNodeDragStop={handleNodeDragStop} nodeTypes={nodeTypes} fitView fitViewOptions={{ diff --git a/apps/web/src/pages/workflows/[id].tsx b/apps/web/src/pages/workflows/[id].tsx index 14b2f957..7c262e02 100644 --- a/apps/web/src/pages/workflows/[id].tsx +++ b/apps/web/src/pages/workflows/[id].tsx @@ -147,6 +147,12 @@ export default function WorkflowEditorPage() { // Check for active executions const activeExecutionsCount = (activeExecutionsData?.total || 0) + (waitingExecutionsData?.total || 0); + // Fetch per-step execution counts for badges (only when there are active executions) + const {data: stepExecutionCounts} = useSWR>( + id && activeExecutionsCount > 0 ? `/workflows/${id}/step-execution-counts` : null, + {revalidateOnFocus: false, refreshInterval: 10000}, + ); + // Handler for cancelling a single execution const handleCancelExecution = async (executionId: string) => { setIsCancelling(true); @@ -485,28 +491,17 @@ export default function WorkflowEditorPage() {
- {/* Active Executions Warning Banner */} + {/* Active Executions Info Banner */} {activeExecutionsCount > 0 && ( - {workflow.enabled ? 'Workflow is active with running executions' : 'Workflow has active executions'} + {activeExecutionsCount} contact{activeExecutionsCount !== 1 ? 's' : ''} currently in this workflow

- This workflow has {activeExecutionsCount} active execution - {activeExecutionsCount !== 1 ? 's' : ''}.{' '} - {!workflow.enabled && 'Even though the workflow is disabled, existing executions will continue. '} - To protect running workflows, you cannot: -

-
    -
  • Delete steps or transitions
  • -
  • Modify step configurations (email templates, conditions, etc.)
  • -
  • Change the workflow trigger
  • -
-

- You can still rename steps and adjust their position. To make configuration changes, wait for executions - to complete or cancel them from the Executions tab. + Changes you make will only affect new executions. Contacts already in the workflow will + continue on the version they started with.

@@ -572,7 +567,7 @@ export default function WorkflowEditorPage() { - mutate()} /> + mutate()} /> diff --git a/packages/db/prisma/migrations/20260402000000_add_workflow_snapshot/migration.sql b/packages/db/prisma/migrations/20260402000000_add_workflow_snapshot/migration.sql new file mode 100644 index 00000000..f83ab1dc --- /dev/null +++ b/packages/db/prisma/migrations/20260402000000_add_workflow_snapshot/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "workflow_executions" ADD COLUMN "workflowSnapshot" JSONB; diff --git a/packages/db/prisma/migrations/20260402000001_soft_delete_steps/migration.sql b/packages/db/prisma/migrations/20260402000001_soft_delete_steps/migration.sql new file mode 100644 index 00000000..4bec7937 --- /dev/null +++ b/packages/db/prisma/migrations/20260402000001_soft_delete_steps/migration.sql @@ -0,0 +1,15 @@ +-- AlterTable: Add soft-delete to WorkflowStep +ALTER TABLE "workflow_steps" ADD COLUMN "deletedAt" TIMESTAMP(3); + +-- AlterTable: Make WorkflowStepExecution.stepId nullable with SetNull +ALTER TABLE "workflow_step_executions" ALTER COLUMN "stepId" DROP NOT NULL; + +-- DropForeignKey: Change WorkflowStepExecution.stepId from Cascade to SetNull +ALTER TABLE "workflow_step_executions" DROP CONSTRAINT "workflow_step_executions_stepId_fkey"; +ALTER TABLE "workflow_step_executions" ADD CONSTRAINT "workflow_step_executions_stepId_fkey" + FOREIGN KEY ("stepId") REFERENCES "workflow_steps"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- DropForeignKey: Change WorkflowExecution.currentStepId to SetNull +ALTER TABLE "workflow_executions" DROP CONSTRAINT "workflow_executions_currentStepId_fkey"; +ALTER TABLE "workflow_executions" ADD CONSTRAINT "workflow_executions_currentStepId_fkey" + FOREIGN KEY ("currentStepId") REFERENCES "workflow_steps"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 72152bff..b0d2a853 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -395,6 +395,9 @@ model WorkflowStep { executions WorkflowStepExecution[] currentWorkflowExecutions WorkflowExecution[] // Executions currently on this step + // Soft delete — null means active, set to hide from UI while preserving FK integrity + deletedAt DateTime? + // Timestamps createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -441,7 +444,7 @@ model WorkflowExecution { status WorkflowExecutionStatus @default(RUNNING) // Current position - currentStep WorkflowStep? @relation(fields: [currentStepId], references: [id]) + currentStep WorkflowStep? @relation(fields: [currentStepId], references: [id], onDelete: SetNull) currentStepId String? // Exit information @@ -450,6 +453,10 @@ model WorkflowExecution { // Context data (merged with contact.data when executing) context Json? // Additional variables for this execution + // Frozen workflow graph at execution start (steps, transitions, template content) + // Executions run from this snapshot, isolating them from live edits + workflowSnapshot Json? + // Relations stepExecutions WorkflowStepExecution[] emails Email[] @@ -475,8 +482,8 @@ model WorkflowStepExecution { execution WorkflowExecution @relation(fields: [executionId], references: [id], onDelete: Cascade) executionId String - step WorkflowStep @relation(fields: [stepId], references: [id], onDelete: Cascade) - stepId String + step WorkflowStep? @relation(fields: [stepId], references: [id], onDelete: SetNull) + stepId String? // Execution state status StepExecutionStatus @default(PENDING) diff --git a/packages/types/src/prisma/extended.ts b/packages/types/src/prisma/extended.ts index 40e0cd65..4d732b92 100644 --- a/packages/types/src/prisma/extended.ts +++ b/packages/types/src/prisma/extended.ts @@ -14,6 +14,54 @@ import type { Prisma, } from '@plunk/db'; +/** + * Frozen workflow graph stored on each execution at start time. + * Isolates in-flight executions from live edits to the workflow. + * Template content is denormalized so template edits don't affect in-flight executions. + */ +export interface WorkflowSnapshot { + version: 1; + workflowId: string; + name: string; + triggerType: string; + triggerConfig: Prisma.JsonValue; + steps: WorkflowSnapshotStep[]; + transitions: WorkflowSnapshotTransition[]; +} + +export interface WorkflowSnapshotStep { + id: string; + type: string; + name: string; + config: Prisma.JsonValue; + templateId: string | null; + template: { + id: string; + subject: string; + body: string; + from: string; + fromName: string | null; + replyTo: string | null; + } | null; +} + +export interface WorkflowSnapshotTransition { + id: string; + fromStepId: string; + toStepId: string; + condition: Prisma.JsonValue; + priority: number; +} + +/** + * Resolved workflow graph — normalized structure used by the execution engine. + * Built from either a snapshot (new executions) or live DB data (legacy fallback). + */ +export interface ResolvedWorkflowGraph { + steps: WorkflowSnapshotStep[]; + transitions: WorkflowSnapshotTransition[]; +} + /** * Workflow with all steps, transitions, and template details * Used for workflow editor and detailed workflow views diff --git a/packages/types/src/prisma/helpers.ts b/packages/types/src/prisma/helpers.ts index 182d3017..7b36aca3 100644 --- a/packages/types/src/prisma/helpers.ts +++ b/packages/types/src/prisma/helpers.ts @@ -7,6 +7,8 @@ import {Prisma} from '@plunk/db'; +import type {ResolvedWorkflowGraph, WorkflowSnapshot, WorkflowSnapshotStep, WorkflowSnapshotTransition} from './extended.js'; + export function toPrismaJson(value: T | null | undefined): Prisma.InputJsonValue { // Prisma.InputJsonValue accepts: string | number | boolean | null | JsonObject | JsonArray // We trust that T is JSON-serializable at runtime (including null) @@ -30,3 +32,148 @@ export function fromPrismaJsonOptional(value: Prisma.JsonValue | null | undef } return value as unknown as T; } + +/** + * Build a workflow snapshot from a workflow with fully loaded steps, transitions, and templates. + * Call this at execution start time to freeze the graph. + */ +export function buildWorkflowSnapshot(workflow: { + id: string; + name: string; + triggerType: string; + triggerConfig: Prisma.JsonValue; + steps: Array<{ + id: string; + type: string; + name: string; + config: Prisma.JsonValue; + templateId: string | null; + template?: { + id: string; + subject: string; + body: string; + from: string; + fromName: string | null; + replyTo: string | null; + } | null; + outgoingTransitions: Array<{ + id: string; + fromStepId: string; + toStepId: string; + condition: Prisma.JsonValue; + priority: number; + }>; + }>; +}): WorkflowSnapshot { + const steps: WorkflowSnapshotStep[] = workflow.steps.map(s => ({ + id: s.id, + type: s.type, + name: s.name, + config: s.config, + templateId: s.templateId, + template: s.template + ? { + id: s.template.id, + subject: s.template.subject, + body: s.template.body, + from: s.template.from, + fromName: s.template.fromName, + replyTo: s.template.replyTo, + } + : null, + })); + + const transitions: WorkflowSnapshotTransition[] = workflow.steps.flatMap(s => + s.outgoingTransitions.map(t => ({ + id: t.id, + fromStepId: t.fromStepId, + toStepId: t.toStepId, + condition: t.condition, + priority: t.priority, + })), + ); + + return { + version: 1, + workflowId: workflow.id, + name: workflow.name, + triggerType: workflow.triggerType, + triggerConfig: workflow.triggerConfig, + steps, + transitions, + }; +} + +/** + * Resolve a workflow graph from either a snapshot or live DB data. + * Returns a normalized structure the execution engine can work with uniformly. + */ +export function resolveWorkflowGraph( + workflowSnapshot: Prisma.JsonValue | null | undefined, + liveWorkflow?: { + steps: Array<{ + id: string; + type: string; + name: string; + config: Prisma.JsonValue; + templateId: string | null; + template?: { + id: string; + subject: string; + body: string; + from: string; + fromName: string | null; + replyTo: string | null; + } | null; + outgoingTransitions: Array<{ + id: string; + fromStepId: string; + toStepId: string; + condition: Prisma.JsonValue; + priority: number; + }>; + }>; + }, +): ResolvedWorkflowGraph { + if (workflowSnapshot) { + const snapshot = workflowSnapshot as unknown as WorkflowSnapshot; + return { + steps: snapshot.steps, + transitions: snapshot.transitions, + }; + } + + // Legacy fallback: build from live DB data + if (!liveWorkflow) { + return {steps: [], transitions: []}; + } + + return { + steps: liveWorkflow.steps.map(s => ({ + id: s.id, + type: s.type, + name: s.name, + config: s.config, + templateId: s.templateId, + template: s.template + ? { + id: s.template.id, + subject: s.template.subject, + body: s.template.body, + from: s.template.from, + fromName: s.template.fromName, + replyTo: s.template.replyTo, + } + : null, + })), + transitions: liveWorkflow.steps.flatMap(s => + s.outgoingTransitions.map(t => ({ + id: t.id, + fromStepId: t.fromStepId, + toStepId: t.toStepId, + condition: t.condition, + priority: t.priority, + })), + ), + }; +}