feat(evo-flow): execute Move to Pipeline Stage Journey node (EVO-1272)#37
Conversation
Implements the runtime for the Flow Builder "Move to Pipeline Stage" action node: - MoveToPipelineStageNode mirrors AssignToPipelineNode; calls the CRM move_conversation endpoint via CrmClientService#moveToPipelineStage. - Unwraps the success_response envelope (response.data.data) so a CRM skip for a deleted target stage surfaces as skipped, not a phantom move. - Registers the node in action-nodes.activities (interface, lazy singleton, activity) and dispatches the 'move-to-pipeline-stage-node' type in the journey execution workflow. Tests: node unit specs (happy / skip-on-missing-config / skip-on-deleted-stage / error) and a CrmClientService contract spec pinning URL, method, body and the nested envelope against fetch. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Reviewer's GuideImplements the runtime for the Move to Pipeline Stage journey action node by adding a CRM client method, a Temporal activity node with proper envelope handling and skip semantics, and wiring it into the action-node activities and journey execution workflow, plus tests that pin the CRM HTTP contract and node behavior. Sequence diagram for executing the Move to Pipeline Stage journey nodesequenceDiagram
actor TriggerEvent
participant JourneyExecutionWorkflow
participant ActionNodeActivities
participant MoveToPipelineStageNode
participant CrmClientService
participant CrmAPI
TriggerEvent ->> JourneyExecutionWorkflow: JourneyExecutionWorkflow(input)
JourneyExecutionWorkflow ->> JourneyExecutionWorkflow: switch currentNode.type
JourneyExecutionWorkflow ->> ActionNodeActivities: executeMoveToPipelineStageNode(input)
ActionNodeActivities ->> MoveToPipelineStageNode: execute(input)
alt [missing stageId or pipelineId or conversationId]
MoveToPipelineStageNode ->> MoveToPipelineStageNode: skipped(reason, pipelineId, stageId)
MoveToPipelineStageNode -->> ActionNodeActivities: NodeExecutionResult (skipped)
else [all IDs present]
MoveToPipelineStageNode ->> CrmClientService: moveToPipelineStage(pipelineId, conversationId, stageId, nodeType)
CrmClientService ->> CrmAPI: PATCH /pipelines/{pipelineId}/pipeline_items/move_conversation
CrmAPI -->> CrmClientService: { success, data: { data: MoveResponseData } }
CrmClientService -->> MoveToPipelineStageNode: CrmApiResponse
alt [response.success is false]
MoveToPipelineStageNode ->> MoveToPipelineStageNode: throw Error
MoveToPipelineStageNode -->> ActionNodeActivities: createErrorResult(error, executionTime)
else [crmData.skipped is true]
MoveToPipelineStageNode ->> MoveToPipelineStageNode: log warn "CRM skipped the move"
MoveToPipelineStageNode -->> ActionNodeActivities: NodeExecutionResult (skipped)
else [move succeeded]
MoveToPipelineStageNode -->> ActionNodeActivities: NodeExecutionResult (moved)
end
end
ActionNodeActivities -->> JourneyExecutionWorkflow: nodeResult
JourneyExecutionWorkflow ->> JourneyExecutionWorkflow: continue journey graph
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- In
MoveToPipelineStageNode.execute, the success path usesexecuteWithTiming’sexecutionTimewhile thecatchbranch setsexecutionTime = Date.now(), which likely mixes up a duration with an absolute timestamp; consider using the same timing mechanism in both paths for consistent metrics. - The
skippedhelper inMoveToPipelineStageNodedoes not includeconversationIdwhile the CRM-skipped path does, which makes downstream handling and debugging inconsistent; consider addingconversationIdto the helper signature and including it in all skipped results.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `MoveToPipelineStageNode.execute`, the success path uses `executeWithTiming`’s `executionTime` while the `catch` branch sets `executionTime = Date.now()`, which likely mixes up a duration with an absolute timestamp; consider using the same timing mechanism in both paths for consistent metrics.
- The `skipped` helper in `MoveToPipelineStageNode` does not include `conversationId` while the CRM-skipped path does, which makes downstream handling and debugging inconsistent; consider adding `conversationId` to the helper signature and including it in all skipped results.
## Individual Comments
### Comment 1
<location path="src/modules/temporal/activities/nodes/evoai/pipeline/move-to-pipeline-stage.node.ts" line_range="109-117" />
<code_context>
+ timestamp: new Date().toISOString(),
+ crmResponse: crmData,
+ };
+ })
+ .then(({ result, executionTime }) => {
+ return this.createSuccessResult(input, executionTime, {
+ [`node_${input.nodeId}_pipeline_moved`]: result.moved,
+ [`node_${input.nodeId}_pipeline_id`]: result.pipelineId,
+ [`node_${input.nodeId}_stage_id`]: result.stageId,
+ });
+ })
+ .catch((error) => {
+ const executionTime = Date.now();
+ this.logger.error('Failed to move conversation to pipeline stage', {
</code_context>
<issue_to_address>
**issue (bug_risk):** Error path uses `Date.now()` for `executionTime`, which likely diverges from the timing semantics used in the success path.
The success path gets `executionTime` as a duration from `executeWithTiming`, but the error path uses `Date.now()` and passes that into `createErrorResult`. If `executionTime` is meant to be a duration, this will distort error metrics and make them inconsistent with success timings. Please either route errors through `executeWithTiming` (if possible) or measure a duration around the call (e.g. `const start = Date.now(); ... catch { const executionTime = Date.now() - start; }`) so both paths use the same timing semantics.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| }) | ||
| .then(({ result, executionTime }) => { | ||
| return this.createSuccessResult(input, executionTime, { | ||
| [`node_${input.nodeId}_pipeline_moved`]: result.moved, | ||
| [`node_${input.nodeId}_pipeline_id`]: result.pipelineId, | ||
| [`node_${input.nodeId}_stage_id`]: result.stageId, | ||
| }); | ||
| }) | ||
| .catch((error) => { |
There was a problem hiding this comment.
issue (bug_risk): Error path uses Date.now() for executionTime, which likely diverges from the timing semantics used in the success path.
The success path gets executionTime as a duration from executeWithTiming, but the error path uses Date.now() and passes that into createErrorResult. If executionTime is meant to be a duration, this will distort error metrics and make them inconsistent with success timings. Please either route errors through executeWithTiming (if possible) or measure a duration around the call (e.g. const start = Date.now(); ... catch { const executionTime = Date.now() - start; }) so both paths use the same timing semantics.
dpaes
left a comment
There was a problem hiding this comment.
✅ Aprovado — EVO-1272 [10.14] Move to Pipeline Stage
Revisão dos 3 PRs (crm#129, evo-flow#37, frontend#148) concluída. Contrato cross-repo coerente ponta-a-ponta; o contract spec do evo-flow pegou e corrigiu o bug de envelope (data.skipped → data.data.skipped).
Pendências resolvidas:
- Rebase: crm#129 rebaseado na develop, agora MERGEABLE/CLEAN.
- AC4: confirmado sem desvio — a paridade é medida contra
Pipelines::StageAutomationService#move_to_pipeline, que faz o mesmoupdate!in-place deixando o pipeline anterior. AC1–AC4 atendidos.
Specs auto-reportados (CI não roda rspec/vitest) — recomendado rodar local. Aprovado e mergeado para develop.
What
Runtime for the Flow Builder Move to Pipeline Stage action node (EVO-1272).
MoveToPipelineStageNodemirrorsAssignToPipelineNode; calls the CRMmove_conversationendpoint viaCrmClientService#moveToPipelineStage.success_responseenvelope (response.data.data) so a skip for a deleted target stage surfaces as skipped, not a phantom move.action-nodes.activities(interface, lazy singleton, activity) and dispatches themove-to-pipeline-stage-nodetype injourney-execution.workflow.Tests
CrmClientServicecontract spec pinning URL, method, body and the nested envelope against mockedfetch— this caught a real bug where the node readresponse.data.skippedinstead ofresponse.data.data.skipped(a deleted stage would have been reported as a successful move).Known limitation (out of scope)
Journey trigger events that are contact-scoped (e.g. contact label add/remove) do not carry a
conversation_id, so this node — like the existing Assign to Pipeline node — has no conversation to act on when started that way. Wiringconversation_idinto Journey trigger context is Journey-infra (EVO-1634), tracked separately.Part of EVO-1272 — paired with CRM (
evo-ai-crm-community#129) and frontend PRs.Summary by Sourcery
Add runtime support for a Move to Pipeline Stage journey node that moves a conversation between CRM pipeline stages and wires it into journey execution.
New Features:
Tests: