From a94e33f380233c41b56bfe32d1bea512b437fc88 Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Tue, 9 Jun 2026 16:33:25 -0300 Subject: [PATCH] feat(evo-flow): execute Move to Pipeline Stage Journey node (EVO-1272) Implements the runtime for the Flow Builder "Move to Pipeline Stage" action node: - MoveToPipelineStageNode mirrors AssignToPipelineNode; calls the CRM move_conversation endpoint via CrmClientService#moveToPipelineStage. - Unwraps the success_response envelope (response.data.data) so a CRM skip for a deleted target stage surfaces as skipped, not a phantom move. - Registers the node in action-nodes.activities (interface, lazy singleton, activity) and dispatches the 'move-to-pipeline-stage-node' type in the journey execution workflow. Tests: node unit specs (happy / skip-on-missing-config / skip-on-deleted-stage / error) and a CrmClientService contract spec pinning URL, method, body and the nested envelope against fetch. Co-Authored-By: Claude Opus 4.8 --- .../activities/action-nodes.activities.ts | 18 +++ .../move-to-pipeline-stage.node.spec.ts | 93 ++++++++++++ .../pipeline/move-to-pipeline-stage.node.ts | 138 ++++++++++++++++++ .../workflows/journey-execution.workflow.ts | 11 ++ .../crm-client/crm-client.service.spec.ts | 32 ++++ src/shared/crm-client/crm-client.service.ts | 20 +++ 6 files changed, 312 insertions(+) create mode 100644 src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.spec.ts create mode 100644 src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.ts diff --git a/src/modules/temporal/activities/action-nodes.activities.ts b/src/modules/temporal/activities/action-nodes.activities.ts index 58e97ad..a9ae7f5 100644 --- a/src/modules/temporal/activities/action-nodes.activities.ts +++ b/src/modules/temporal/activities/action-nodes.activities.ts @@ -29,6 +29,7 @@ import { SendMessageNode, SendMessageNodeInput } from './nodes/evoai/communicati import { SendCannedResponseNode, SendCannedResponseNodeInput } from './nodes/evoai/communication/send-canned-response.node'; import { SendEmailTeamNode, SendEmailTeamNodeInput } from './nodes/evoai/communication/send-email-team.node'; import { AssignToPipelineNode, AssignToPipelineNodeInput } from './nodes/evoai/pipeline/assign-to-pipeline.node'; +import { MoveToPipelineStageNode, MoveToPipelineStageNodeInput } from './nodes/evoai/pipeline/move-to-pipeline-stage.node'; import { SendTranscriptNode, SendTranscriptNodeInput } from './nodes/evoai/communication/send-transcript.node'; import { AssignAgentNode, AssignAgentNodeInput } from './nodes/evoai/assignment/assign-agent.node'; import { AssignTeamNode, AssignTeamNodeInput } from './nodes/evoai/assignment/assign-team.node'; @@ -66,6 +67,7 @@ export { SendCannedResponseNodeInput, SendEmailTeamNodeInput, AssignToPipelineNodeInput, + MoveToPipelineStageNodeInput, ChangePriorityNodeInput, ScheduledActionNodeInput, }; @@ -112,6 +114,9 @@ export interface ActionNodeActivities { executeAssignToPipelineNode( input: AssignToPipelineNodeInput, ): Promise; + executeMoveToPipelineStageNode( + input: MoveToPipelineStageNodeInput, + ): Promise; executeSendTranscriptNode( input: SendTranscriptNodeInput, ): Promise; @@ -164,6 +169,7 @@ let sendMessageNode: SendMessageNode; let sendCannedResponseNode: SendCannedResponseNode; let sendEmailTeamNode: SendEmailTeamNode; let assignToPipelineNode: AssignToPipelineNode; +let moveToPipelineStageNode: MoveToPipelineStageNode; let sendTranscriptNode: SendTranscriptNode; let assignAgentNode: AssignAgentNode; let assignTeamNode: AssignTeamNode; @@ -252,6 +258,12 @@ function getAssignToPipelineNode() { return assignToPipelineNode; } +function getMoveToPipelineStageNode() { + if (!moveToPipelineStageNode) + moveToPipelineStageNode = new MoveToPipelineStageNode(); + return moveToPipelineStageNode; +} + function getSendTranscriptNode() { if (!sendTranscriptNode) sendTranscriptNode = new SendTranscriptNode(); return sendTranscriptNode; @@ -461,6 +473,12 @@ export const actionNodeActivities: ActionNodeActivities = { return await getAssignToPipelineNode().execute(input); }, + async executeMoveToPipelineStageNode( + input: MoveToPipelineStageNodeInput, + ): Promise { + return await getMoveToPipelineStageNode().execute(input); + }, + async executeSendTranscriptNode( input: SendTranscriptNodeInput, ): Promise { diff --git a/src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.spec.ts b/src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.spec.ts new file mode 100644 index 0000000..fdc7556 --- /dev/null +++ b/src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.spec.ts @@ -0,0 +1,93 @@ +import { + MoveToPipelineStageNode, + MoveToPipelineStageNodeInput, +} from './move-to-pipeline-stage.node'; + +describe('MoveToPipelineStageNode', () => { + let node: MoveToPipelineStageNode; + let moveToPipelineStage: jest.Mock; + + const baseInput: MoveToPipelineStageNodeInput = { + nodeId: 'n1', + conversationId: 'conv-1', + sessionId: 's1', + nodeData: { pipeline_id: 'p1', pipeline_stage_id: 'st1' }, + }; + + beforeEach(() => { + node = new MoveToPipelineStageNode(); + moveToPipelineStage = jest.fn(); + (node as any).crmService = { moveToPipelineStage }; + jest + .spyOn(node as any, 'interpolateNodeData') + .mockImplementation(async (_input, nodeData) => nodeData); + }); + + // The mocks below mirror the real CRM `success_response` envelope: + // executeRequest stores the whole body under `data`, so the move result is + // nested at `data.data` — the node must unwrap that level (regression guard). + it('moves the conversation to the target pipeline stage (happy path)', async () => { + moveToPipelineStage.mockResolvedValue({ + success: true, + data: { success: true, data: { moved: true, movement_type: 'cross_pipeline' } }, + }); + + const result = await node.execute(baseInput); + + expect(moveToPipelineStage).toHaveBeenCalledWith( + 'p1', + 'conv-1', + 'st1', + 'move-to-pipeline-stage', + ); + expect(result.success).toBe(true); + expect(result.variables).toMatchObject({ + node_n1_pipeline_moved: true, + node_n1_pipeline_id: 'p1', + node_n1_stage_id: 'st1', + }); + }); + + it('skips when stage_id is missing', async () => { + const result = await node.execute({ + ...baseInput, + nodeData: { pipeline_id: 'p1' }, + }); + + expect(moveToPipelineStage).not.toHaveBeenCalled(); + expect(result.success).toBe(true); + }); + + it('skips when pipeline_id is missing', async () => { + const result = await node.execute({ + ...baseInput, + nodeData: { pipeline_stage_id: 'st1' }, + }); + + expect(moveToPipelineStage).not.toHaveBeenCalled(); + expect(result.success).toBe(true); + }); + + it('surfaces a CRM skip for a deleted target stage as skipped (AC3)', async () => { + moveToPipelineStage.mockResolvedValue({ + success: true, + data: { success: true, data: { moved: false, skipped: true, reason: 'stage_not_found' } }, + }); + + const result = await node.execute(baseInput); + + expect(moveToPipelineStage).toHaveBeenCalledTimes(1); + expect(result.success).toBe(true); + expect(result.variables).toMatchObject({ + node_n1_pipeline_moved: false, + }); + }); + + it('returns an error result when the CRM call fails', async () => { + moveToPipelineStage.mockResolvedValue({ success: false, error: 'boom' }); + + const result = await node.execute(baseInput); + + expect(result.success).toBe(false); + }); +}); diff --git a/src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.ts b/src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.ts new file mode 100644 index 0000000..4675f4d --- /dev/null +++ b/src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.ts @@ -0,0 +1,138 @@ +import { BaseNode, NodeExecutionResult } from '../../base.node'; +import { CrmClientService } from '../../../../../../shared/crm-client/crm-client.service'; + +interface MoveResponseData { + moved?: boolean; + skipped?: boolean; + reason?: string; + movement_type?: string; +} + +export interface MoveToPipelineStageNodeInput { + nodeId: string; + conversationId?: string; + sessionId: string; + nodeData: { + pipeline_id?: string; + pipelineId?: string; + stage_id?: string; + pipeline_stage_id?: string; + nextNodeId?: string; + }; +} + +export class MoveToPipelineStageNode extends BaseNode { + private crmService: CrmClientService | null = null; + + constructor() { + super('move-to-pipeline-stage'); + } + + private getCrmService(): CrmClientService { + if (!this.crmService) this.crmService = new CrmClientService(); + return this.crmService; + } + + async execute( + input: MoveToPipelineStageNodeInput, + ): Promise { + return await this.executeWithTiming(input.nodeId, input, async () => { + const data = await this.interpolateNodeData(input, input.nodeData); + const pipelineId = data.pipeline_id || data.pipelineId; + const stageId = data.pipeline_stage_id || data.stage_id; + + if (!stageId) { + this.logger.warn('No stage_id configured; skipping', { + nodeId: input.nodeId, + }); + return this.skipped('no_stage_id', pipelineId, stageId); + } + if (!pipelineId) { + this.logger.warn('No pipeline_id configured; skipping', { + nodeId: input.nodeId, + }); + return this.skipped('no_pipeline_id', pipelineId, stageId); + } + if (!input.conversationId) { + this.logger.warn('No conversationId available from trigger event', { + nodeId: input.nodeId, + }); + return this.skipped('no_conversation_id', pipelineId, stageId); + } + + const response = await this.getCrmService().moveToPipelineStage( + String(pipelineId), + input.conversationId, + String(stageId), + 'move-to-pipeline-stage', + ); + + if (!response.success) { + throw new Error( + `Failed to move conversation to pipeline stage: ${response.error}`, + ); + } + + // The CRM wraps payloads in a `success_response` envelope + // (`{ success, data, meta }`), and executeRequest stores that whole body + // under `response.data` — so the move result lives at `response.data.data`. + const envelope = (response.data ?? {}) as { data?: MoveResponseData }; + const crmData = (envelope.data ?? {}) as MoveResponseData; + + // A deleted/invalid target stage degrades to a logged skip on the CRM + // side (AC3) — surface it as skipped rather than a successful move. + if (crmData.skipped) { + this.logger.warn('CRM skipped the move', { + nodeId: input.nodeId, + reason: crmData.reason, + }); + return { + moved: false, + skipped: true, + reason: crmData.reason || 'stage_not_found', + pipelineId, + stageId, + conversationId: input.conversationId, + timestamp: new Date().toISOString(), + }; + } + + return { + moved: true, + movementType: crmData.movement_type, + pipelineId, + stageId, + conversationId: input.conversationId, + timestamp: new Date().toISOString(), + crmResponse: crmData, + }; + }) + .then(({ result, executionTime }) => { + return this.createSuccessResult(input, executionTime, { + [`node_${input.nodeId}_pipeline_moved`]: result.moved, + [`node_${input.nodeId}_pipeline_id`]: result.pipelineId, + [`node_${input.nodeId}_stage_id`]: result.stageId, + }); + }) + .catch((error) => { + const executionTime = Date.now(); + this.logger.error('Failed to move conversation to pipeline stage', { + conversationId: input.conversationId, + nodeId: input.nodeId, + error: error.message, + }); + return this.createErrorResult(error, executionTime); + }); + } + + private skipped(reason: string, pipelineId?: string, stageId?: string) { + return { + moved: false, + skipped: true, + reason, + pipelineId, + stageId, + timestamp: new Date().toISOString(), + }; + } +} diff --git a/src/modules/temporal/workflows/journey-execution.workflow.ts b/src/modules/temporal/workflows/journey-execution.workflow.ts index 8da7366..478e3dc 100644 --- a/src/modules/temporal/workflows/journey-execution.workflow.ts +++ b/src/modules/temporal/workflows/journey-execution.workflow.ts @@ -833,6 +833,17 @@ export async function JourneyExecutionWorkflow( }); break; + case 'move-to-pipeline-stage-node': + nodeResult = + await actionNodeActivities.executeMoveToPipelineStageNode({ + nodeId: currentNode.id, + conversationId: + input.triggerEvent?.properties?.conversation_id || undefined, + sessionId: input.sessionId, + nodeData: currentNode.data, + }); + break; + case 'send-transcript-node': nodeResult = await actionNodeActivities.executeSendTranscriptNode({ nodeId: currentNode.id, diff --git a/src/shared/crm-client/crm-client.service.spec.ts b/src/shared/crm-client/crm-client.service.spec.ts index 8e06c38..e566ab8 100644 --- a/src/shared/crm-client/crm-client.service.spec.ts +++ b/src/shared/crm-client/crm-client.service.spec.ts @@ -222,6 +222,38 @@ describe('CrmClientService', () => { }); }); + // EVO-1272: pins the HTTP contract the Journey "Move to Pipeline Stage" node + // depends on. Mocking the client method elsewhere can't catch a URL/param/ + // envelope drift between evo-flow and the Rails endpoint — this can. + describe('moveToPipelineStage — Journey move node contract', () => { + it('PATCHes /pipeline_items/move_conversation with conversation_id + pipeline_stage_id', async () => { + fetchMock.mockResolvedValueOnce( + buildFetchResponse({ + status: 200, + body: { + success: true, + data: { moved: true, movement_type: 'cross_pipeline' }, + }, + }), + ); + + const result = await service.moveToPipelineStage('p1', 'conv-1', 'st9'); + + const [url, init] = fetchMock.mock.calls[0]; + expect(url).toBe( + 'http://crm-test.local/api/v1/pipelines/p1/pipeline_items/move_conversation', + ); + expect(init.method).toBe('PATCH'); + expect(JSON.parse(init.body)).toEqual({ + conversation_id: 'conv-1', + pipeline_stage_id: 'st9', + }); + // The move result is nested one level under the success_response envelope. + expect(result.success).toBe(true); + expect((result.data as any).data.movement_type).toBe('cross_pipeline'); + }); + }); + describe('auth headers', () => { it('uses X-Service-Token header by default (s2s)', async () => { fetchMock.mockResolvedValueOnce( diff --git a/src/shared/crm-client/crm-client.service.ts b/src/shared/crm-client/crm-client.service.ts index c72a543..f29507e 100644 --- a/src/shared/crm-client/crm-client.service.ts +++ b/src/shared/crm-client/crm-client.service.ts @@ -812,6 +812,26 @@ export class CrmClientService { ); } + async moveToPipelineStage( + pipelineId: string, + conversationId: string, + stageId: string, + nodeType: string = 'move-to-pipeline-stage', + ): Promise> { + const url = `${this.baseURL}/api/v1/pipelines/${pipelineId}/pipeline_items/move_conversation`; + return this.executeRequest( + url, + { + method: 'PATCH', + body: JSON.stringify({ + conversation_id: conversationId, + pipeline_stage_id: stageId, + }), + }, + { nodeType, conversationId }, + ); + } + async sendEmailTeam( context: CrmConversationContext, teamIds: string[],