Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/modules/temporal/activities/action-nodes.activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { SendMessageNode, SendMessageNodeInput } from './nodes/evoai/communicati
import { SendCannedResponseNode, SendCannedResponseNodeInput } from './nodes/evoai/communication/send-canned-response.node';
import { SendEmailTeamNode, SendEmailTeamNodeInput } from './nodes/evoai/communication/send-email-team.node';
import { AssignToPipelineNode, AssignToPipelineNodeInput } from './nodes/evoai/pipeline/assign-to-pipeline.node';
import { MoveToPipelineStageNode, MoveToPipelineStageNodeInput } from './nodes/evoai/pipeline/move-to-pipeline-stage.node';
import { SendTranscriptNode, SendTranscriptNodeInput } from './nodes/evoai/communication/send-transcript.node';
import { AssignAgentNode, AssignAgentNodeInput } from './nodes/evoai/assignment/assign-agent.node';
import { AssignTeamNode, AssignTeamNodeInput } from './nodes/evoai/assignment/assign-team.node';
Expand Down Expand Up @@ -66,6 +67,7 @@ export {
SendCannedResponseNodeInput,
SendEmailTeamNodeInput,
AssignToPipelineNodeInput,
MoveToPipelineStageNodeInput,
ChangePriorityNodeInput,
ScheduledActionNodeInput,
};
Expand Down Expand Up @@ -112,6 +114,9 @@ export interface ActionNodeActivities {
executeAssignToPipelineNode(
input: AssignToPipelineNodeInput,
): Promise<NodeExecutionResult>;
executeMoveToPipelineStageNode(
input: MoveToPipelineStageNodeInput,
): Promise<NodeExecutionResult>;
executeSendTranscriptNode(
input: SendTranscriptNodeInput,
): Promise<NodeExecutionResult>;
Expand Down Expand Up @@ -164,6 +169,7 @@ let sendMessageNode: SendMessageNode;
let sendCannedResponseNode: SendCannedResponseNode;
let sendEmailTeamNode: SendEmailTeamNode;
let assignToPipelineNode: AssignToPipelineNode;
let moveToPipelineStageNode: MoveToPipelineStageNode;
let sendTranscriptNode: SendTranscriptNode;
let assignAgentNode: AssignAgentNode;
let assignTeamNode: AssignTeamNode;
Expand Down Expand Up @@ -252,6 +258,12 @@ function getAssignToPipelineNode() {
return assignToPipelineNode;
}

function getMoveToPipelineStageNode() {
if (!moveToPipelineStageNode)
moveToPipelineStageNode = new MoveToPipelineStageNode();
return moveToPipelineStageNode;
}

function getSendTranscriptNode() {
if (!sendTranscriptNode) sendTranscriptNode = new SendTranscriptNode();
return sendTranscriptNode;
Expand Down Expand Up @@ -461,6 +473,12 @@ export const actionNodeActivities: ActionNodeActivities = {
return await getAssignToPipelineNode().execute(input);
},

async executeMoveToPipelineStageNode(
input: MoveToPipelineStageNodeInput,
): Promise<NodeExecutionResult> {
return await getMoveToPipelineStageNode().execute(input);
},

async executeSendTranscriptNode(
input: SendTranscriptNodeInput,
): Promise<NodeExecutionResult> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {
MoveToPipelineStageNode,
MoveToPipelineStageNodeInput,
} from './move-to-pipeline-stage.node';

describe('MoveToPipelineStageNode', () => {
let node: MoveToPipelineStageNode;
let moveToPipelineStage: jest.Mock;

const baseInput: MoveToPipelineStageNodeInput = {
nodeId: 'n1',
conversationId: 'conv-1',
sessionId: 's1',
nodeData: { pipeline_id: 'p1', pipeline_stage_id: 'st1' },
};

beforeEach(() => {
node = new MoveToPipelineStageNode();
moveToPipelineStage = jest.fn();
(node as any).crmService = { moveToPipelineStage };
jest
.spyOn(node as any, 'interpolateNodeData')
.mockImplementation(async (_input, nodeData) => nodeData);
});

// The mocks below mirror the real CRM `success_response` envelope:
// executeRequest stores the whole body under `data`, so the move result is
// nested at `data.data` — the node must unwrap that level (regression guard).
it('moves the conversation to the target pipeline stage (happy path)', async () => {
moveToPipelineStage.mockResolvedValue({
success: true,
data: { success: true, data: { moved: true, movement_type: 'cross_pipeline' } },
});

const result = await node.execute(baseInput);

expect(moveToPipelineStage).toHaveBeenCalledWith(
'p1',
'conv-1',
'st1',
'move-to-pipeline-stage',
);
expect(result.success).toBe(true);
expect(result.variables).toMatchObject({
node_n1_pipeline_moved: true,
node_n1_pipeline_id: 'p1',
node_n1_stage_id: 'st1',
});
});

it('skips when stage_id is missing', async () => {
const result = await node.execute({
...baseInput,
nodeData: { pipeline_id: 'p1' },
});

expect(moveToPipelineStage).not.toHaveBeenCalled();
expect(result.success).toBe(true);
});

it('skips when pipeline_id is missing', async () => {
const result = await node.execute({
...baseInput,
nodeData: { pipeline_stage_id: 'st1' },
});

expect(moveToPipelineStage).not.toHaveBeenCalled();
expect(result.success).toBe(true);
});

it('surfaces a CRM skip for a deleted target stage as skipped (AC3)', async () => {
moveToPipelineStage.mockResolvedValue({
success: true,
data: { success: true, data: { moved: false, skipped: true, reason: 'stage_not_found' } },
});

const result = await node.execute(baseInput);

expect(moveToPipelineStage).toHaveBeenCalledTimes(1);
expect(result.success).toBe(true);
expect(result.variables).toMatchObject({
node_n1_pipeline_moved: false,
});
});

it('returns an error result when the CRM call fails', async () => {
moveToPipelineStage.mockResolvedValue({ success: false, error: 'boom' });

const result = await node.execute(baseInput);

expect(result.success).toBe(false);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { BaseNode, NodeExecutionResult } from '../../base.node';
import { CrmClientService } from '../../../../../../shared/crm-client/crm-client.service';

interface MoveResponseData {
moved?: boolean;
skipped?: boolean;
reason?: string;
movement_type?: string;
}

export interface MoveToPipelineStageNodeInput {
nodeId: string;
conversationId?: string;
sessionId: string;
nodeData: {
pipeline_id?: string;
pipelineId?: string;
stage_id?: string;
pipeline_stage_id?: string;
nextNodeId?: string;
};
}

export class MoveToPipelineStageNode extends BaseNode {
private crmService: CrmClientService | null = null;

constructor() {
super('move-to-pipeline-stage');
}

private getCrmService(): CrmClientService {
if (!this.crmService) this.crmService = new CrmClientService();
return this.crmService;
}

async execute(
input: MoveToPipelineStageNodeInput,
): Promise<NodeExecutionResult> {
return await this.executeWithTiming(input.nodeId, input, async () => {
const data = await this.interpolateNodeData(input, input.nodeData);
const pipelineId = data.pipeline_id || data.pipelineId;
const stageId = data.pipeline_stage_id || data.stage_id;

if (!stageId) {
this.logger.warn('No stage_id configured; skipping', {
nodeId: input.nodeId,
});
return this.skipped('no_stage_id', pipelineId, stageId);
}
if (!pipelineId) {
this.logger.warn('No pipeline_id configured; skipping', {
nodeId: input.nodeId,
});
return this.skipped('no_pipeline_id', pipelineId, stageId);
}
if (!input.conversationId) {
this.logger.warn('No conversationId available from trigger event', {
nodeId: input.nodeId,
});
return this.skipped('no_conversation_id', pipelineId, stageId);
}

const response = await this.getCrmService().moveToPipelineStage(
String(pipelineId),
input.conversationId,
String(stageId),
'move-to-pipeline-stage',
);

if (!response.success) {
throw new Error(
`Failed to move conversation to pipeline stage: ${response.error}`,
);
}

// The CRM wraps payloads in a `success_response` envelope
// (`{ success, data, meta }`), and executeRequest stores that whole body
// under `response.data` — so the move result lives at `response.data.data`.
const envelope = (response.data ?? {}) as { data?: MoveResponseData };
const crmData = (envelope.data ?? {}) as MoveResponseData;

// A deleted/invalid target stage degrades to a logged skip on the CRM
// side (AC3) — surface it as skipped rather than a successful move.
if (crmData.skipped) {
this.logger.warn('CRM skipped the move', {
nodeId: input.nodeId,
reason: crmData.reason,
});
return {
moved: false,
skipped: true,
reason: crmData.reason || 'stage_not_found',
pipelineId,
stageId,
conversationId: input.conversationId,
timestamp: new Date().toISOString(),
};
}

return {
moved: true,
movementType: crmData.movement_type,
pipelineId,
stageId,
conversationId: input.conversationId,
timestamp: new Date().toISOString(),
crmResponse: crmData,
};
})
.then(({ result, executionTime }) => {
return this.createSuccessResult(input, executionTime, {
[`node_${input.nodeId}_pipeline_moved`]: result.moved,
[`node_${input.nodeId}_pipeline_id`]: result.pipelineId,
[`node_${input.nodeId}_stage_id`]: result.stageId,
});
})
.catch((error) => {
Comment on lines +109 to +117

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Error path uses Date.now() for executionTime, which likely diverges from the timing semantics used in the success path.

The success path gets executionTime as a duration from executeWithTiming, but the error path uses Date.now() and passes that into createErrorResult. If executionTime is meant to be a duration, this will distort error metrics and make them inconsistent with success timings. Please either route errors through executeWithTiming (if possible) or measure a duration around the call (e.g. const start = Date.now(); ... catch { const executionTime = Date.now() - start; }) so both paths use the same timing semantics.

const executionTime = Date.now();
this.logger.error('Failed to move conversation to pipeline stage', {
conversationId: input.conversationId,
nodeId: input.nodeId,
error: error.message,
});
return this.createErrorResult(error, executionTime);
});
}

private skipped(reason: string, pipelineId?: string, stageId?: string) {
return {
moved: false,
skipped: true,
reason,
pipelineId,
stageId,
timestamp: new Date().toISOString(),
};
}
}
11 changes: 11 additions & 0 deletions src/modules/temporal/workflows/journey-execution.workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,17 @@ export async function JourneyExecutionWorkflow(
});
break;

case 'move-to-pipeline-stage-node':
nodeResult =
await actionNodeActivities.executeMoveToPipelineStageNode({
nodeId: currentNode.id,
conversationId:
input.triggerEvent?.properties?.conversation_id || undefined,
sessionId: input.sessionId,
nodeData: currentNode.data,
});
break;

case 'send-transcript-node':
nodeResult = await actionNodeActivities.executeSendTranscriptNode({
nodeId: currentNode.id,
Expand Down
32 changes: 32 additions & 0 deletions src/shared/crm-client/crm-client.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,38 @@ describe('CrmClientService', () => {
});
});

// EVO-1272: pins the HTTP contract the Journey "Move to Pipeline Stage" node
// depends on. Mocking the client method elsewhere can't catch a URL/param/
// envelope drift between evo-flow and the Rails endpoint — this can.
describe('moveToPipelineStage — Journey move node contract', () => {
it('PATCHes /pipeline_items/move_conversation with conversation_id + pipeline_stage_id', async () => {
fetchMock.mockResolvedValueOnce(
buildFetchResponse({
status: 200,
body: {
success: true,
data: { moved: true, movement_type: 'cross_pipeline' },
},
}),
);

const result = await service.moveToPipelineStage('p1', 'conv-1', 'st9');

const [url, init] = fetchMock.mock.calls[0];
expect(url).toBe(
'http://crm-test.local/api/v1/pipelines/p1/pipeline_items/move_conversation',
);
expect(init.method).toBe('PATCH');
expect(JSON.parse(init.body)).toEqual({
conversation_id: 'conv-1',
pipeline_stage_id: 'st9',
});
// The move result is nested one level under the success_response envelope.
expect(result.success).toBe(true);
expect((result.data as any).data.movement_type).toBe('cross_pipeline');
});
});

describe('auth headers', () => {
it('uses X-Service-Token header by default (s2s)', async () => {
fetchMock.mockResolvedValueOnce(
Expand Down
20 changes: 20 additions & 0 deletions src/shared/crm-client/crm-client.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,26 @@ export class CrmClientService {
);
}

async moveToPipelineStage(
pipelineId: string,
conversationId: string,
stageId: string,
nodeType: string = 'move-to-pipeline-stage',
): Promise<CrmApiResponse<any>> {
const url = `${this.baseURL}/api/v1/pipelines/${pipelineId}/pipeline_items/move_conversation`;
return this.executeRequest(
url,
{
method: 'PATCH',
body: JSON.stringify({
conversation_id: conversationId,
pipeline_stage_id: stageId,
}),
},
{ nodeType, conversationId },
);
}

async sendEmailTeam(
context: CrmConversationContext,
teamIds: string[],
Expand Down
Loading