From 5cd04fff6b17f80cadffbbb4b7fd7fef4cd51a9b Mon Sep 17 00:00:00 2001 From: "Leo (OpenClaw Agent)" Date: Wed, 20 May 2026 17:28:15 +0000 Subject: [PATCH 1/9] feat: add Fan Out core component (#3312, #4921) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new FanOut component that reads an array from the upstream payload and emits one downstream event per element. - Input: arrayExpression (evaluated via ExpressionContext) - Output: one fanout.item event per element on the "item" channel - Each payload includes: item, index, totalCount - Empty array → Pass() with no events emitted - Modeled on readMemory oneByOne emit pattern Closes #3312 Related to #4921 Signed-off-by: Leo (OpenClaw Agent) --- pkg/components/fanout/fan_out.go | 210 +++++++++++++++++++++++++ pkg/components/fanout/fan_out_test.go | 100 ++++++++++++ pkg/registryimports/registryimports.go | 1 + 3 files changed, 311 insertions(+) create mode 100644 pkg/components/fanout/fan_out.go create mode 100644 pkg/components/fanout/fan_out_test.go diff --git a/pkg/components/fanout/fan_out.go b/pkg/components/fanout/fan_out.go new file mode 100644 index 0000000000..5a550197cf --- /dev/null +++ b/pkg/components/fanout/fan_out.go @@ -0,0 +1,210 @@ +package fanout + +import ( + "fmt" + "net/http" + "reflect" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/registry" +) + +const ComponentName = "fanOut" +const PayloadType = "fanout.item" +const ChannelNameItem = "item" + +func init() { + registry.RegisterAction(ComponentName, &FanOut{}) +} + +type FanOut struct{} + +type Spec struct { + ArrayExpression string `json:"arrayExpression"` +} + +func (f *FanOut) Name() string { + return ComponentName +} + +func (f *FanOut) Label() string { + return "Fan Out" +} + +func (f *FanOut) Description() string { + return "Emit one downstream event per item in an array" +} + +func (f *FanOut) Documentation() string { + return `The Fan Out component reads an array from the upstream payload and emits one downstream event per element. + +## Use Cases + +- Iterate over a list of results and process each one independently +- Fan out runner output arrays into per-item workflow paths +- Process each page, service, or record with the same downstream steps + +## How It Works + +1. Evaluates the configured array expression against the incoming event data +2. Emits one ` + "`fanout.item`" + ` event to the ` + "`item`" + ` channel for each element +3. If the array is empty, passes without emitting any events + +## Output Fields (per item) + +- **item**: The array element value +- **index**: Zero-based index of the element +- **totalCount**: Total number of items in the array + +## Expression Environment + +- **$**: The run context data +- **root()**: Access root event data +- **previous()**: Access previous node outputs` +} + +func (f *FanOut) Icon() string { + return "split" +} + +func (f *FanOut) Color() string { + return "blue" +} + +func (f *FanOut) ExampleOutput() map[string]any { + return map[string]any{ + "item": map[string]any{"service": "EC2", "cost_usd": 42.5}, + "index": 0, + "totalCount": 3, + } +} + +func (f *FanOut) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{ + {Name: ChannelNameItem, Label: "Item"}, + } +} + +func (f *FanOut) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "arrayExpression", + Label: "Array Expression", + Type: configuration.FieldTypeExpression, + Description: "Expression that evaluates to the array to fan out", + Required: true, + }, + } +} + +func (f *FanOut) Setup(ctx core.SetupContext) error { + spec, err := decodeSpec(ctx.Configuration) + if err != nil { + return err + } + return validateSpec(spec) +} + +func (f *FanOut) Execute(ctx core.ExecutionContext) error { + spec, err := decodeSpec(ctx.Configuration) + if err != nil { + return err + } + if err := validateSpec(spec); err != nil { + return err + } + + result, err := ctx.Expressions.Run(spec.ArrayExpression) + if err != nil { + return fmt.Errorf("expression evaluation failed: %w", err) + } + + items, err := toSlice(result) + if err != nil { + return fmt.Errorf("expression must evaluate to an array: %w", err) + } + + if err := ctx.Metadata.Set(map[string]any{ + "arrayExpression": spec.ArrayExpression, + "count": len(items), + }); err != nil { + return fmt.Errorf("failed to set execution metadata: %w", err) + } + + if len(items) == 0 { + return ctx.ExecutionState.Pass() + } + + payloads := make([]any, 0, len(items)) + for i, item := range items { + payloads = append(payloads, map[string]any{ + "item": item, + "index": i, + "totalCount": len(items), + }) + } + + return ctx.ExecutionState.Emit(ChannelNameItem, PayloadType, payloads) +} + +func decodeSpec(raw any) (Spec, error) { + var spec Spec + if err := mapstructure.Decode(raw, &spec); err != nil { + return Spec{}, fmt.Errorf("failed to decode configuration: %w", err) + } + return spec, nil +} + +func validateSpec(spec Spec) error { + if spec.ArrayExpression == "" { + return fmt.Errorf("arrayExpression is required") + } + return nil +} + +func toSlice(v any) ([]any, error) { + if v == nil { + return []any{}, nil + } + // direct []any + if s, ok := v.([]any); ok { + return s, nil + } + // reflect-based fallback for typed slices + rv := reflect.ValueOf(v) + if rv.Kind() != reflect.Slice { + return nil, fmt.Errorf("got %T", v) + } + result := make([]any, rv.Len()) + for i := range result { + result[i] = rv.Index(i).Interface() + } + return result, nil +} + +func (f *FanOut) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (f *FanOut) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (f *FanOut) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.WebhookResponseBody, error) { + return http.StatusOK, nil, nil +} + +func (f *FanOut) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (f *FanOut) Hooks() []core.Hook { + return []core.Hook{} +} + +func (f *FanOut) HandleHook(ctx core.ActionHookContext) error { + return nil +} diff --git a/pkg/components/fanout/fan_out_test.go b/pkg/components/fanout/fan_out_test.go new file mode 100644 index 0000000000..e09284fe1a --- /dev/null +++ b/pkg/components/fanout/fan_out_test.go @@ -0,0 +1,100 @@ +package fanout + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func TestFanOutExecute(t *testing.T) { + t.Run("emits one payload per array item", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{ + Output: []any{ + map[string]any{"service": "EC2", "cost": 10.0}, + map[string]any{"service": "S3", "cost": 5.0}, + map[string]any{"service": "RDS", "cost": 20.0}, + }, + } + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].by_service`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + require.NoError(t, err) + assert.True(t, execState.Passed) + assert.Equal(t, ChannelNameItem, execState.Channel) + assert.Equal(t, PayloadType, execState.Type) + require.Len(t, execState.Payloads, 3) + + // check first payload + first := execState.Payloads[0].(map[string]any)["data"].(map[string]any) + assert.Equal(t, map[string]any{"service": "EC2", "cost": 10.0}, first["item"]) + assert.Equal(t, 0, first["index"]) + assert.Equal(t, 3, first["totalCount"]) + + // check last payload + last := execState.Payloads[2].(map[string]any)["data"].(map[string]any) + assert.Equal(t, map[string]any{"service": "RDS", "cost": 20.0}, last["item"]) + assert.Equal(t, 2, last["index"]) + }) + + t.Run("passes when array is empty", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{Output: []any{}} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].items`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + require.NoError(t, err) + assert.True(t, execState.Passed) + assert.Equal(t, "", execState.Channel) + assert.Empty(t, execState.Payloads) + }) + + t.Run("returns error when expression result is not an array", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{Output: "not-an-array"} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].name`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + assert.ErrorContains(t, err, "expression must evaluate to an array") + }) + + t.Run("returns error when arrayExpression is missing", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + assert.ErrorContains(t, err, "arrayExpression is required") + }) +} diff --git a/pkg/registryimports/registryimports.go b/pkg/registryimports/registryimports.go index 946ff2b885..49449e1bfa 100644 --- a/pkg/registryimports/registryimports.go +++ b/pkg/registryimports/registryimports.go @@ -8,6 +8,7 @@ import ( _ "github.com/superplanehq/superplane/pkg/components/approval" _ "github.com/superplanehq/superplane/pkg/components/deletememory" _ "github.com/superplanehq/superplane/pkg/components/display" + _ "github.com/superplanehq/superplane/pkg/components/fanout" _ "github.com/superplanehq/superplane/pkg/components/filter" _ "github.com/superplanehq/superplane/pkg/components/graphql" _ "github.com/superplanehq/superplane/pkg/components/http" From e26abe9cd7a48764e31954f78d46415a761057a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 09:58:36 +0200 Subject: [PATCH 2/9] fix: satisfy example payload and component docs checks for Fan Out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor Signed-off-by: Igor Šarčević Co-authored-by: Cursor --- docs/components/Core.mdx | 48 +++++++++++++++++++++++ pkg/components/fanout/example.go | 18 +++++++++ pkg/components/fanout/example_output.json | 12 ++++++ pkg/components/fanout/fan_out.go | 8 ---- 4 files changed, 78 insertions(+), 8 deletions(-) create mode 100644 pkg/components/fanout/example.go create mode 100644 pkg/components/fanout/example_output.json diff --git a/docs/components/Core.mdx b/docs/components/Core.mdx index 227235e5ae..0524c55a94 100644 --- a/docs/components/Core.mdx +++ b/docs/components/Core.mdx @@ -23,6 +23,7 @@ import { CardGrid, LinkCard } from "@astrojs/starlight/components"; + @@ -388,6 +389,53 @@ The Display component displays a debug message from the latest execution. } ``` + + +## Fan Out + +The Fan Out component reads an array from the upstream payload and emits one downstream event per element. + +### Use Cases + +- Iterate over a list of results and process each one independently +- Fan out runner output arrays into per-item workflow paths +- Process each page, service, or record with the same downstream steps + +### How It Works + +1. Evaluates the configured array expression against the incoming event data +2. Emits one `fanout.item` event to the `item` channel for each element +3. If the array is empty, passes without emitting any events + +### Output Fields (per item) + +- **item**: The array element value +- **index**: Zero-based index of the element +- **totalCount**: Total number of items in the array + +### Expression Environment + +- **$**: The run context data +- **root()**: Access root event data +- **previous()**: Access previous node outputs + +### Example Output + +```json +{ + "data": { + "index": 0, + "item": { + "cost_usd": 42.5, + "service": "EC2" + }, + "totalCount": 3 + }, + "timestamp": "2026-01-16T17:56:16.680755501Z", + "type": "fanout.item" +} +``` + ## Filter diff --git a/pkg/components/fanout/example.go b/pkg/components/fanout/example.go new file mode 100644 index 0000000000..d6effcc893 --- /dev/null +++ b/pkg/components/fanout/example.go @@ -0,0 +1,18 @@ +package fanout + +import ( + _ "embed" + "sync" + + "github.com/superplanehq/superplane/pkg/utils" +) + +//go:embed example_output.json +var exampleOutputBytes []byte + +var exampleOutputOnce sync.Once +var exampleOutput map[string]any + +func (f *FanOut) ExampleOutput() map[string]any { + return utils.UnmarshalEmbeddedJSON(&exampleOutputOnce, exampleOutputBytes, &exampleOutput) +} diff --git a/pkg/components/fanout/example_output.json b/pkg/components/fanout/example_output.json new file mode 100644 index 0000000000..e25a6b3419 --- /dev/null +++ b/pkg/components/fanout/example_output.json @@ -0,0 +1,12 @@ +{ + "type": "fanout.item", + "data": { + "item": { + "service": "EC2", + "cost_usd": 42.5 + }, + "index": 0, + "totalCount": 3 + }, + "timestamp": "2026-01-16T17:56:16.680755501Z" +} diff --git a/pkg/components/fanout/fan_out.go b/pkg/components/fanout/fan_out.go index 5a550197cf..c3e4ad0fbe 100644 --- a/pkg/components/fanout/fan_out.go +++ b/pkg/components/fanout/fan_out.go @@ -74,14 +74,6 @@ func (f *FanOut) Color() string { return "blue" } -func (f *FanOut) ExampleOutput() map[string]any { - return map[string]any{ - "item": map[string]any{"service": "EC2", "cost_usd": 42.5}, - "index": 0, - "totalCount": 3, - } -} - func (f *FanOut) OutputChannels(configuration any) []core.OutputChannel { return []core.OutputChannel{ {Name: ChannelNameItem, Label: "Item"}, From 84a03070a7963608e5831fcb08599b007351ae9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 10:30:39 +0200 Subject: [PATCH 3/9] chore: retrigger CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Igor Šarčević Co-authored-by: Cursor From 1db80ebbeac0b4aeda6c4408574380a94885b0ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 10:47:39 +0200 Subject: [PATCH 4/9] feat: add Fan Out workflow UI mapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Register fanOut in componentBaseMappers and eventStateRegistries so the canvas shows the split icon, array expression specs, and item-channel execution states instead of the noop fallback. Signed-off-by: Igor Šarčević Co-authored-by: Cursor Signed-off-by: Igor Šarčević Co-authored-by: Cursor --- .../src/pages/workflowv2/mappers/fanOut.ts | 153 ++++++++++++++++++ web_src/src/pages/workflowv2/mappers/index.ts | 3 + 2 files changed, 156 insertions(+) create mode 100644 web_src/src/pages/workflowv2/mappers/fanOut.ts diff --git a/web_src/src/pages/workflowv2/mappers/fanOut.ts b/web_src/src/pages/workflowv2/mappers/fanOut.ts new file mode 100644 index 0000000000..de7c397a2b --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/fanOut.ts @@ -0,0 +1,153 @@ +import type { + ComponentBaseContext, + ComponentBaseMapper, + EventStateRegistry, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + StateFunction, + SubtitleContext, +} from "./types"; +import type { ComponentBaseProps, EventSection, EventState, EventStateMap } from "@/ui/componentBase"; +import { DEFAULT_EVENT_STATE_MAP } from "@/ui/componentBase"; +import { getTriggerRenderer, getState, getStateMap } from "."; +import type React from "react"; +import { getBackgroundColorClass } from "@/lib/colors"; +import { renderTimeAgo } from "@/components/TimeAgo"; +import { formatTimestampInUserTimezone } from "@/lib/timezone"; + +type FanOutOutputs = Record; + +export const FAN_OUT_STATE_MAP: EventStateMap = { + ...DEFAULT_EVENT_STATE_MAP, + passed: { + icon: "circle-check", + textColor: "text-gray-800", + backgroundColor: "bg-green-100", + badgeColor: "bg-emerald-500", + }, + rejected: { + icon: "circle-x", + textColor: "text-gray-800", + backgroundColor: "bg-gray-100", + badgeColor: "bg-gray-500", + }, +}; + +export const fanOutStateFunction: StateFunction = (execution: ExecutionInfo): EventState => { + if (!execution) return "neutral"; + + if ( + execution.resultMessage && + (execution.resultReason === "RESULT_REASON_ERROR" || + (execution.result === "RESULT_FAILED" && execution.resultReason !== "RESULT_REASON_ERROR_RESOLVED")) + ) { + return "error"; + } + + if (execution.result === "RESULT_CANCELLED") { + return "cancelled"; + } + + if (execution.state === "STATE_PENDING" || execution.state === "STATE_STARTED") { + return "running"; + } + + if (execution.state === "STATE_FINISHED" && execution.result === "RESULT_PASSED") { + const outputs = execution.outputs as FanOutOutputs | undefined; + const itemOutputs = outputs?.item; + const hasItems = Array.isArray(itemOutputs) && itemOutputs.length > 0; + return hasItems ? "passed" : "rejected"; + } + + return "failed"; +}; + +export const FAN_OUT_STATE_REGISTRY: EventStateRegistry = { + stateMap: FAN_OUT_STATE_MAP, + getState: fanOutStateFunction, +}; + +type FanOutConfiguration = { + arrayExpression: string; +}; + +type FanOutMetadata = { + arrayExpression?: string; + count?: number; +}; + +export const fanOutMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const componentName = context.componentDefinition.name || "fanOut"; + const configuration = context.node.configuration as FanOutConfiguration; + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const specs = configuration.arrayExpression + ? [ + { + title: "Array", + tooltipTitle: "Array expression", + value: configuration.arrayExpression, + }, + ] + : undefined; + + return { + iconSlug: "split", + collapsed: context.node.isCollapsed, + collapsedBackground: getBackgroundColorClass("white"), + title: context.node.name || context.componentDefinition.label || context.componentDefinition.name || "Fan Out", + eventSections: lastExecution ? getFanOutEventSections(context.nodes, lastExecution, componentName) : undefined, + includeEmptyState: !lastExecution, + specs, + eventStateMap: getStateMap(componentName), + }; + }, + + subtitle(context: SubtitleContext): string | React.ReactNode { + if (!context.execution.createdAt) return ""; + return renderTimeAgo(new Date(context.execution.createdAt)); + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const configuration = context.execution.configuration as FanOutConfiguration; + const metadata = context.execution.metadata as FanOutMetadata | undefined; + const details: Record = { + "Evaluated at": context.execution.createdAt ? formatTimestampInUserTimezone(context.execution.createdAt) : "-", + "Array expression": configuration.arrayExpression ?? metadata?.arrayExpression ?? "-", + }; + + if (typeof metadata?.count === "number") { + details["Items emitted"] = metadata.count; + } + + return details; + }, +}; + +function getFanOutEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootEvent = execution.rootEvent; + if (!rootEvent?.id || !execution.createdAt) { + return []; + } + + const rootTriggerNode = nodes.find((n) => n.id === rootEvent.nodeId); + if (!rootTriggerNode?.componentName) { + return []; + } + + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode.componentName); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: rootEvent }); + const createdAt = new Date(execution.createdAt); + + return [ + { + receivedAt: createdAt, + eventTitle: title, + eventSubtitle: renderTimeAgo(createdAt), + eventState: getState(componentName)(execution), + eventId: rootEvent.id, + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/index.ts b/web_src/src/pages/workflowv2/mappers/index.ts index d346a40de5..efb1c7d5f3 100644 --- a/web_src/src/pages/workflowv2/mappers/index.ts +++ b/web_src/src/pages/workflowv2/mappers/index.ts @@ -248,6 +248,7 @@ import { } from "./oci/index"; import { filterMapper, FILTER_STATE_REGISTRY } from "./filter"; +import { fanOutMapper, FAN_OUT_STATE_REGISTRY } from "./fanOut"; import { sshMapper, SSH_STATE_REGISTRY } from "./ssh"; import { runnerMapper, RUNNER_STATE_REGISTRY } from "./runner"; import { waitCustomFieldRenderer, waitMapper, WAIT_STATE_REGISTRY } from "./wait"; @@ -284,6 +285,7 @@ const componentBaseMappers: Record = { runner: runnerMapper, timeGate: timeGateMapper, filter: filterMapper, + fanOut: fanOutMapper, wait: waitMapper, approval: approvalMapper, merge: mergeMapper, @@ -437,6 +439,7 @@ const eventStateRegistries: Record = { ssh: SSH_STATE_REGISTRY, runner: RUNNER_STATE_REGISTRY, filter: FILTER_STATE_REGISTRY, + fanOut: FAN_OUT_STATE_REGISTRY, if: IF_STATE_REGISTRY, timeGate: TIME_GATE_STATE_REGISTRY, wait: WAIT_STATE_REGISTRY, From 8661a3531ba6d90dbda200a2985c115a16900050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 11:17:35 +0200 Subject: [PATCH 5/9] feat: gate Fan Out behind experimental feature flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Register fanOut in the experimental features registry so the component picker shows it when the org enables the feature, matching Runner rollout. Signed-off-by: Igor Šarčević Co-authored-by: Cursor --- pkg/features/features.go | 4 ++++ pkg/features/features_test.go | 8 ++++++++ pkg/public/experimental_features_test.go | 1 + 3 files changed, 13 insertions(+) diff --git a/pkg/features/features.go b/pkg/features/features.go index cc9e2bb88a..3114dab3d7 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -19,8 +19,12 @@ const FeatureClaudeManagedAgents = "claude_managed_agents" // is released or enabled for the organization. const FeatureDashboards = "dashboards" +// FeatureFanOut gates the Fan Out core component until it is generally available. +const FeatureFanOut = "fanOut" + var registry = []Feature{ {ID: "runner", Label: "Runners", Description: "Sandboxed Runners"}, + {ID: FeatureFanOut, Label: "Fan Out", Description: "Emit one downstream event per item in an array"}, {ID: FeatureClaudeManagedAgents, Label: "Claude Managed Agents", Description: "Chat with a Claude-powered agent against the canvas"}, {ID: FeatureDashboards, Label: "Dashboards", Description: "Markdown dashboard panels on canvases"}, } diff --git a/pkg/features/features_test.go b/pkg/features/features_test.go index 86beeecd81..b7c964e8cf 100644 --- a/pkg/features/features_test.go +++ b/pkg/features/features_test.go @@ -22,6 +22,13 @@ func Test__Get(t *testing.T) { assert.Equal(t, "Dashboards", f.Label) }) + t.Run("fanOut feature is registered", func(t *testing.T) { + f, ok := Get(FeatureFanOut) + assert.True(t, ok) + assert.Equal(t, FeatureFanOut, f.ID) + assert.Equal(t, "Fan Out", f.Label) + }) + t.Run("unknown id returns zero value and false", func(t *testing.T) { f, ok := Get("does-not-exist") assert.False(t, ok) @@ -36,6 +43,7 @@ func Test__Get(t *testing.T) { func Test__Exists(t *testing.T) { assert.True(t, Exists("runner")) + assert.True(t, Exists(FeatureFanOut)) assert.True(t, Exists(FeatureDashboards)) assert.False(t, Exists("does-not-exist")) assert.False(t, Exists("")) diff --git a/pkg/public/experimental_features_test.go b/pkg/public/experimental_features_test.go index ee10b69772..9cc72b1d1e 100644 --- a/pkg/public/experimental_features_test.go +++ b/pkg/public/experimental_features_test.go @@ -35,6 +35,7 @@ func TestListExperimentalFeatures(t *testing.T) { ids = append(ids, id) } assert.Contains(t, ids, "runner") + assert.Contains(t, ids, features.FeatureFanOut) assert.Contains(t, ids, features.FeatureDashboards) }) From e93ff36c42e430b4f38820a22e1d1d386318b921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 11:18:33 +0200 Subject: [PATCH 6/9] chore: remove Fan Out experimental feature gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fan Out is a generally available core component like Filter and Merge. Signed-off-by: Igor Šarčević Co-authored-by: Cursor --- pkg/features/features.go | 4 ---- pkg/features/features_test.go | 8 -------- pkg/public/experimental_features_test.go | 1 - 3 files changed, 13 deletions(-) diff --git a/pkg/features/features.go b/pkg/features/features.go index 3114dab3d7..cc9e2bb88a 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -19,12 +19,8 @@ const FeatureClaudeManagedAgents = "claude_managed_agents" // is released or enabled for the organization. const FeatureDashboards = "dashboards" -// FeatureFanOut gates the Fan Out core component until it is generally available. -const FeatureFanOut = "fanOut" - var registry = []Feature{ {ID: "runner", Label: "Runners", Description: "Sandboxed Runners"}, - {ID: FeatureFanOut, Label: "Fan Out", Description: "Emit one downstream event per item in an array"}, {ID: FeatureClaudeManagedAgents, Label: "Claude Managed Agents", Description: "Chat with a Claude-powered agent against the canvas"}, {ID: FeatureDashboards, Label: "Dashboards", Description: "Markdown dashboard panels on canvases"}, } diff --git a/pkg/features/features_test.go b/pkg/features/features_test.go index b7c964e8cf..86beeecd81 100644 --- a/pkg/features/features_test.go +++ b/pkg/features/features_test.go @@ -22,13 +22,6 @@ func Test__Get(t *testing.T) { assert.Equal(t, "Dashboards", f.Label) }) - t.Run("fanOut feature is registered", func(t *testing.T) { - f, ok := Get(FeatureFanOut) - assert.True(t, ok) - assert.Equal(t, FeatureFanOut, f.ID) - assert.Equal(t, "Fan Out", f.Label) - }) - t.Run("unknown id returns zero value and false", func(t *testing.T) { f, ok := Get("does-not-exist") assert.False(t, ok) @@ -43,7 +36,6 @@ func Test__Get(t *testing.T) { func Test__Exists(t *testing.T) { assert.True(t, Exists("runner")) - assert.True(t, Exists(FeatureFanOut)) assert.True(t, Exists(FeatureDashboards)) assert.False(t, Exists("does-not-exist")) assert.False(t, Exists("")) diff --git a/pkg/public/experimental_features_test.go b/pkg/public/experimental_features_test.go index 9cc72b1d1e..ee10b69772 100644 --- a/pkg/public/experimental_features_test.go +++ b/pkg/public/experimental_features_test.go @@ -35,7 +35,6 @@ func TestListExperimentalFeatures(t *testing.T) { ids = append(ids, id) } assert.Contains(t, ids, "runner") - assert.Contains(t, ids, features.FeatureFanOut) assert.Contains(t, ids, features.FeatureDashboards) }) From d60f52b5672d0afce6eda95f1087aa8eaafa568e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 11:29:35 +0200 Subject: [PATCH 7/9] Fix visuals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Igor Šarčević --- db/structure.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/structure.sql b/db/structure.sql index ff39f124c5..1c8255b429 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -5,7 +5,7 @@ \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2145,7 +2145,7 @@ ALTER TABLE ONLY public.workflows \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2181,7 +2181,7 @@ COPY public.schema_migrations (version, dirty) FROM stdin; \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; From b33a83cafb2f18bb6e5703a8839f9e98832ff7a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 12:35:06 +0200 Subject: [PATCH 8/9] One by one execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Igor Šarčević --- .../superplane-component-mappers/SKILL.md | 107 ++++++++++ pkg/components/foreach/example.go | 18 ++ pkg/components/foreach/example_output.json | 12 ++ pkg/components/foreach/for_each.go | 200 ++++++++++++++++++ pkg/components/foreach/for_each_test.go | 98 +++++++++ .../contexts/node_configuration_builder.go | 37 ++++ .../src/pages/workflowv2/mappers/forEach.ts | 153 ++++++++++++++ 7 files changed, 625 insertions(+) create mode 100644 .cursor/skills/superplane-component-mappers/SKILL.md create mode 100644 pkg/components/foreach/example.go create mode 100644 pkg/components/foreach/example_output.json create mode 100644 pkg/components/foreach/for_each.go create mode 100644 pkg/components/foreach/for_each_test.go create mode 100644 web_src/src/pages/workflowv2/mappers/forEach.ts diff --git a/.cursor/skills/superplane-component-mappers/SKILL.md b/.cursor/skills/superplane-component-mappers/SKILL.md new file mode 100644 index 0000000000..f6ddefb346 --- /dev/null +++ b/.cursor/skills/superplane-component-mappers/SKILL.md @@ -0,0 +1,107 @@ +--- +name: superplane-component-mappers +description: >- + Adds or reviews workflow UI mappers for SuperPlane core and integration + components. Use when implementing a new backend component or trigger, when a + component has no mapper in web_src, when canvas nodes show generic/noop UI, or + when the user mentions frontend mappers, workflowv2/mappers, or component + customization. +--- + +# SuperPlane Component Frontend Mappers + +Backend registration alone is not enough. Every user-facing **component** (and **trigger**) needs a frontend mapper so the canvas shows the right icon, configuration specs, execution states, subtitles, and execution details. + +**There are no mappers associated with this component in the frontend** means the component `Name()` from Go is missing from `web_src/src/pages/workflowv2/mappers/index.ts`. The UI then falls back to `noopMapper` — generic node chrome with no expression specs, channel-aware states, or tailored execution details. + +## When a mapper is required + +| Change | Mapper work | +|--------|-------------| +| New core component (`pkg/components//`) | `web_src/src/pages/workflowv2/mappers/.ts` + register in `index.ts` | +| New integration component | `web_src/src/pages/workflowv2/mappers//.ts` + register in that integration's `index.ts` | +| New output channels or custom pass/fail semantics | Often also `eventStateRegistries` (see `filter`, `if`, `merge`) | +| Custom config field UI | `customFieldRenderers` (see `wait`, `schedule`) | + +## Checklist (core component) + +Copy and complete: + +``` +- [ ] Registry key matches Go `Name()` exactly (e.g. `forEach`, not `for-each`) +- [ ] `componentBaseMappers` entry in index.ts +- [ ] `iconSlug` matches backend `Icon()` when possible +- [ ] Configuration fields surfaced in `specs` (expressions, labels, etc.) +- [ ] `getExecutionDetails` includes key config + metadata from runs +- [ ] Custom `eventStateRegistries` if outputs use non-default channels (passed/rejected/true/false/item/…) +- [ ] Optional `*.spec.ts` for mapper props / execution details +- [ ] `make format.js` and `make check.build.ui` after UI edits +``` + +## Minimal mapper pattern + +Use `filter.ts` or `if.ts` for expression-based core components. Use `noop.ts` only when the component truly needs no customization. + +1. **Create** `web_src/src/pages/workflowv2/mappers/.ts`: + +```typescript +import type { ComponentBaseContext, ComponentBaseMapper, ExecutionDetailsContext, SubtitleContext } from "./types"; +import type { ComponentBaseProps } from "@/ui/componentBase"; +import { renderTimeAgo } from "@/components/TimeAgo"; + +type ForEachConfiguration = { arrayExpression: string }; + +export const forEachMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const configuration = context.node.configuration as ForEachConfiguration; + return { + iconSlug: "split", // match backend Icon() + collapsed: context.node.isCollapsed, + title: context.node.name || context.componentDefinition.label || "For Each", + specs: configuration.arrayExpression + ? [{ title: "Array", tooltipTitle: "Array expression", value: configuration.arrayExpression }] + : undefined, + includeEmptyState: context.lastExecutions.length === 0, + }; + }, + subtitle(context: SubtitleContext) { + return context.execution.createdAt ? renderTimeAgo(new Date(context.execution.createdAt)) : ""; + }, + getExecutionDetails(context: ExecutionDetailsContext) { + const configuration = context.execution.configuration as ForEachConfiguration; + return { "Array expression": configuration.arrayExpression ?? "-" }; + }, +}; +``` + +2. **Register** in `web_src/src/pages/workflowv2/mappers/index.ts`: + +```typescript +import { forEachMapper } from "./forEach"; + +const componentBaseMappers: Record = { + // ... + forEach: forEachMapper, +}; +``` + +3. **Custom states** (if the component emits named channels): define `*_STATE_MAP`, `*StateFunction`, `*_STATE_REGISTRY`, and add to `eventStateRegistries`. Mirror channel names from Go `OutputChannels()` (e.g. For Each → `item`). + +## Verify registration + +```bash +# Registry key must exist (replace forEach with component Name()) +rg 'forEach:' web_src/src/pages/workflowv2/mappers/index.ts +``` + +Integration components: key is often `integration.component` via `appMappers`; see `docs/contributing/integrations.md` and `docs/contributing/component-customization.md`. + +## Reference docs + +- [docs/contributing/component-customization.md](docs/contributing/component-customization.md) — registry types, tutorials +- [docs/contributing/component-design.md](docs/contributing/component-design.md) — mapper responsibilities on the canvas +- [web_src/AGENTS.md](web_src/AGENTS.md) — UI conventions and test commands + +## PR review + +If a PR adds `pkg/components/*` or `pkg/integrations/*` actions/triggers but no `web_src/src/pages/workflowv2/mappers/` changes, flag it unless the author explicitly documents intentional noop fallback. diff --git a/pkg/components/foreach/example.go b/pkg/components/foreach/example.go new file mode 100644 index 0000000000..800efc43a5 --- /dev/null +++ b/pkg/components/foreach/example.go @@ -0,0 +1,18 @@ +package foreach + +import ( + _ "embed" + "sync" + + "github.com/superplanehq/superplane/pkg/utils" +) + +//go:embed example_output.json +var exampleOutputBytes []byte + +var exampleOutputOnce sync.Once +var exampleOutput map[string]any + +func (c *ForEach) ExampleOutput() map[string]any { + return utils.UnmarshalEmbeddedJSON(&exampleOutputOnce, exampleOutputBytes, &exampleOutput) +} diff --git a/pkg/components/foreach/example_output.json b/pkg/components/foreach/example_output.json new file mode 100644 index 0000000000..e3002c22d3 --- /dev/null +++ b/pkg/components/foreach/example_output.json @@ -0,0 +1,12 @@ +{ + "type": "foreach.item", + "data": { + "item": { + "service": "EC2", + "cost_usd": 42.5 + }, + "index": 0, + "totalCount": 3 + }, + "timestamp": "2026-01-16T17:56:16.680755501Z" +} diff --git a/pkg/components/foreach/for_each.go b/pkg/components/foreach/for_each.go new file mode 100644 index 0000000000..8956f62a58 --- /dev/null +++ b/pkg/components/foreach/for_each.go @@ -0,0 +1,200 @@ +package foreach + +import ( + "fmt" + "net/http" + "reflect" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/registry" +) + +const ComponentName = "forEach" +const PayloadType = "foreach.item" +const ChannelNameItem = "item" + +func init() { + registry.RegisterAction(ComponentName, &ForEach{}) +} + +type ForEach struct{} + +type Spec struct { + ArrayExpression string `json:"arrayExpression"` +} + +func (c *ForEach) Name() string { + return ComponentName +} + +func (c *ForEach) Label() string { + return "For Each" +} + +func (c *ForEach) Description() string { + return "Emit one downstream event per item in an array" +} + +func (c *ForEach) Documentation() string { + return `The For Each component reads an array from the upstream payload and emits one downstream event per element. + +## Use Cases + +- Iterate over a list of results and process each one independently +- Split runner output arrays into per-item workflow paths +- Process each page, service, or record with the same downstream steps + +## How It Works + +1. Evaluates the configured array expression against the incoming event data +2. Emits one ` + "`foreach.item`" + ` event to the ` + "`item`" + ` channel for each element +3. If the array is empty, passes without emitting any events + +## Output Fields (per item) + +- **item**: The array element value +- **index**: Zero-based index of the element +- **totalCount**: Total number of items in the array + +## Expression Environment + +- **$**: The run context data +- **root()**: Access root event data +- **previous()**: Access previous node outputs` +} + +func (c *ForEach) Icon() string { + return "split" +} + +func (c *ForEach) Color() string { + return "blue" +} + +func (c *ForEach) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{ + {Name: ChannelNameItem, Label: "Item"}, + } +} + +func (c *ForEach) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "arrayExpression", + Label: "Array Expression", + Type: configuration.FieldTypeExpression, + Description: "Expression that evaluates to the array to iterate over", + Required: true, + }, + } +} + +func (c *ForEach) Setup(ctx core.SetupContext) error { + spec, err := decodeSpec(ctx.Configuration) + if err != nil { + return err + } + return validateSpec(spec) +} + +func (c *ForEach) Execute(ctx core.ExecutionContext) error { + spec, err := decodeSpec(ctx.Configuration) + if err != nil { + return err + } + if err := validateSpec(spec); err != nil { + return err + } + + result, err := ctx.Expressions.Run(spec.ArrayExpression) + if err != nil { + return fmt.Errorf("expression evaluation failed: %w", err) + } + + items, err := toSlice(result) + if err != nil { + return fmt.Errorf("expression must evaluate to an array: %w", err) + } + + if err := ctx.Metadata.Set(map[string]any{ + "arrayExpression": spec.ArrayExpression, + "count": len(items), + }); err != nil { + return fmt.Errorf("failed to set execution metadata: %w", err) + } + + if len(items) == 0 { + return ctx.ExecutionState.Pass() + } + + payloads := make([]any, 0, len(items)) + for i, item := range items { + payloads = append(payloads, map[string]any{ + "item": item, + "index": i, + "totalCount": len(items), + }) + } + + return ctx.ExecutionState.Emit(ChannelNameItem, PayloadType, payloads) +} + +func decodeSpec(raw any) (Spec, error) { + var spec Spec + if err := mapstructure.Decode(raw, &spec); err != nil { + return Spec{}, fmt.Errorf("failed to decode configuration: %w", err) + } + return spec, nil +} + +func validateSpec(spec Spec) error { + if spec.ArrayExpression == "" { + return fmt.Errorf("arrayExpression is required") + } + return nil +} + +func toSlice(v any) ([]any, error) { + if v == nil { + return []any{}, nil + } + if s, ok := v.([]any); ok { + return s, nil + } + rv := reflect.ValueOf(v) + if rv.Kind() != reflect.Slice { + return nil, fmt.Errorf("got %T", v) + } + result := make([]any, rv.Len()) + for i := range result { + result[i] = rv.Index(i).Interface() + } + return result, nil +} + +func (c *ForEach) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *ForEach) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *ForEach) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.WebhookResponseBody, error) { + return http.StatusOK, nil, nil +} + +func (c *ForEach) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (c *ForEach) Hooks() []core.Hook { + return []core.Hook{} +} + +func (c *ForEach) HandleHook(ctx core.ActionHookContext) error { + return nil +} diff --git a/pkg/components/foreach/for_each_test.go b/pkg/components/foreach/for_each_test.go new file mode 100644 index 0000000000..9fe87f6498 --- /dev/null +++ b/pkg/components/foreach/for_each_test.go @@ -0,0 +1,98 @@ +package foreach + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func TestForEachExecute(t *testing.T) { + t.Run("emits one payload per array item", func(t *testing.T) { + component := &ForEach{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{ + Output: []any{ + map[string]any{"service": "EC2", "cost": 10.0}, + map[string]any{"service": "S3", "cost": 5.0}, + map[string]any{"service": "RDS", "cost": 20.0}, + }, + } + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].by_service`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + require.NoError(t, err) + assert.True(t, execState.Passed) + assert.Equal(t, ChannelNameItem, execState.Channel) + assert.Equal(t, PayloadType, execState.Type) + require.Len(t, execState.Payloads, 3) + + first := execState.Payloads[0].(map[string]any)["data"].(map[string]any) + assert.Equal(t, map[string]any{"service": "EC2", "cost": 10.0}, first["item"]) + assert.Equal(t, 0, first["index"]) + assert.Equal(t, 3, first["totalCount"]) + + last := execState.Payloads[2].(map[string]any)["data"].(map[string]any) + assert.Equal(t, map[string]any{"service": "RDS", "cost": 20.0}, last["item"]) + assert.Equal(t, 2, last["index"]) + }) + + t.Run("passes when array is empty", func(t *testing.T) { + component := &ForEach{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{Output: []any{}} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].items`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + require.NoError(t, err) + assert.True(t, execState.Passed) + assert.Equal(t, "", execState.Channel) + assert.Empty(t, execState.Payloads) + }) + + t.Run("returns error when expression result is not an array", func(t *testing.T) { + component := &ForEach{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{Output: "not-an-array"} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].name`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + assert.ErrorContains(t, err, "expression must evaluate to an array") + }) + + t.Run("returns error when arrayExpression is missing", func(t *testing.T) { + component := &ForEach{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + assert.ErrorContains(t, err, "arrayExpression is required") + }) +} diff --git a/pkg/workers/contexts/node_configuration_builder.go b/pkg/workers/contexts/node_configuration_builder.go index b55a4f7e00..2890674f88 100644 --- a/pkg/workers/contexts/node_configuration_builder.go +++ b/pkg/workers/contexts/node_configuration_builder.go @@ -596,6 +596,13 @@ func (b *NodeConfigurationBuilder) populateFromExecutions( latestByExecution := latestEventByExecution(events, executionIDs) for nodeRef, executionID := range executionIDByRef { + if payload, ok, err := b.payloadFromTriggeringEvent(executionID); err != nil { + return err + } else if ok { + messageChain[nodeRef] = payload + continue + } + event, ok := latestByExecution[executionID] if !ok { return fmt.Errorf("node %s has no outputs", nodeRef) @@ -607,6 +614,36 @@ func (b *NodeConfigurationBuilder) populateFromExecutions( return nil } +// payloadFromTriggeringEvent returns the event payload that triggered the current +// execution when that event belongs to the referenced node's execution. This +// preserves per-branch data for components (e.g. For Each) that emit multiple +// events from a single execution. +func (b *NodeConfigurationBuilder) payloadFromTriggeringEvent(executionID uuid.UUID) (any, bool, error) { + if b.previousExecutionID == nil { + return nil, false, nil + } + + prevExecution, err := models.FindNodeExecutionInTransaction(b.tx, b.workflowID, *b.previousExecutionID) + if err != nil { + return nil, false, err + } + + if prevExecution.EventID == uuid.Nil { + return nil, false, nil + } + + triggerEvent, err := models.FindCanvasEventInTransaction(b.tx, prevExecution.EventID) + if err != nil { + return nil, false, err + } + + if triggerEvent.ExecutionID == nil || *triggerEvent.ExecutionID != executionID { + return nil, false, nil + } + + return triggerEvent.Data.Data(), true, nil +} + func latestEventByExecution(events []models.CanvasEvent, executionIDs []uuid.UUID) map[uuid.UUID]models.CanvasEvent { latestByExecution := make(map[uuid.UUID]models.CanvasEvent, len(executionIDs)) for _, event := range events { diff --git a/web_src/src/pages/workflowv2/mappers/forEach.ts b/web_src/src/pages/workflowv2/mappers/forEach.ts new file mode 100644 index 0000000000..c863148e03 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/forEach.ts @@ -0,0 +1,153 @@ +import type { + ComponentBaseContext, + ComponentBaseMapper, + EventStateRegistry, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + StateFunction, + SubtitleContext, +} from "./types"; +import type { ComponentBaseProps, EventSection, EventState, EventStateMap } from "@/ui/componentBase"; +import { DEFAULT_EVENT_STATE_MAP } from "@/ui/componentBase"; +import { getTriggerRenderer, getState, getStateMap } from "."; +import type React from "react"; +import { getBackgroundColorClass } from "@/lib/colors"; +import { renderTimeAgo } from "@/components/TimeAgo"; +import { formatTimestampInUserTimezone } from "@/lib/timezone"; + +type ForEachOutputs = Record; + +export const FOR_EACH_STATE_MAP: EventStateMap = { + ...DEFAULT_EVENT_STATE_MAP, + passed: { + icon: "circle-check", + textColor: "text-gray-800", + backgroundColor: "bg-green-100", + badgeColor: "bg-emerald-500", + }, + rejected: { + icon: "circle-x", + textColor: "text-gray-800", + backgroundColor: "bg-gray-100", + badgeColor: "bg-gray-500", + }, +}; + +export const forEachStateFunction: StateFunction = (execution: ExecutionInfo): EventState => { + if (!execution) return "neutral"; + + if ( + execution.resultMessage && + (execution.resultReason === "RESULT_REASON_ERROR" || + (execution.result === "RESULT_FAILED" && execution.resultReason !== "RESULT_REASON_ERROR_RESOLVED")) + ) { + return "error"; + } + + if (execution.result === "RESULT_CANCELLED") { + return "cancelled"; + } + + if (execution.state === "STATE_PENDING" || execution.state === "STATE_STARTED") { + return "running"; + } + + if (execution.state === "STATE_FINISHED" && execution.result === "RESULT_PASSED") { + const outputs = execution.outputs as ForEachOutputs | undefined; + const itemOutputs = outputs?.item; + const hasItems = Array.isArray(itemOutputs) && itemOutputs.length > 0; + return hasItems ? "passed" : "rejected"; + } + + return "failed"; +}; + +export const FOR_EACH_STATE_REGISTRY: EventStateRegistry = { + stateMap: FOR_EACH_STATE_MAP, + getState: forEachStateFunction, +}; + +type ForEachConfiguration = { + arrayExpression: string; +}; + +type ForEachMetadata = { + arrayExpression?: string; + count?: number; +}; + +export const forEachMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const componentName = context.componentDefinition.name || "forEach"; + const configuration = context.node.configuration as ForEachConfiguration; + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const specs = configuration.arrayExpression + ? [ + { + title: "Array", + tooltipTitle: "Array expression", + value: configuration.arrayExpression, + }, + ] + : undefined; + + return { + iconSlug: "split", + collapsed: context.node.isCollapsed, + collapsedBackground: getBackgroundColorClass("white"), + title: context.node.name || context.componentDefinition.label || context.componentDefinition.name || "For Each", + eventSections: lastExecution ? getForEachEventSections(context.nodes, lastExecution, componentName) : undefined, + includeEmptyState: !lastExecution, + specs, + eventStateMap: getStateMap(componentName), + }; + }, + + subtitle(context: SubtitleContext): string | React.ReactNode { + if (!context.execution.createdAt) return ""; + return renderTimeAgo(new Date(context.execution.createdAt)); + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const configuration = context.execution.configuration as ForEachConfiguration; + const metadata = context.execution.metadata as ForEachMetadata | undefined; + const details: Record = { + "Evaluated at": context.execution.createdAt ? formatTimestampInUserTimezone(context.execution.createdAt) : "-", + "Array expression": configuration.arrayExpression ?? metadata?.arrayExpression ?? "-", + }; + + if (typeof metadata?.count === "number") { + details["Items emitted"] = metadata.count; + } + + return details; + }, +}; + +function getForEachEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootEvent = execution.rootEvent; + if (!rootEvent?.id || !execution.createdAt) { + return []; + } + + const rootTriggerNode = nodes.find((n) => n.id === rootEvent.nodeId); + if (!rootTriggerNode?.componentName) { + return []; + } + + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode.componentName); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: rootEvent }); + const createdAt = new Date(execution.createdAt); + + return [ + { + receivedAt: createdAt, + eventTitle: title, + eventSubtitle: renderTimeAgo(createdAt), + eventState: getState(componentName)(execution), + eventId: rootEvent.id, + }, + ]; +} From 15e073a500ede60f3f623a1fd660860d220441c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 21 May 2026 14:49:54 +0200 Subject: [PATCH 9/9] Routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Igor Šarčević --- .../superplane-component-mappers/SKILL.md | 22 +- db/structure.sql | 6 +- docs/components/Core.mdx | 2 +- pkg/components/fanout/fan_out.go | 6 +- pkg/components/fanout/fan_out_test.go | 2 - pkg/components/foreach/example.go | 18 -- pkg/components/foreach/example_output.json | 12 -- pkg/components/foreach/for_each.go | 200 ------------------ pkg/components/foreach/for_each_test.go | 98 --------- pkg/registryimports/registryimports.go | 2 +- .../contexts/node_configuration_builder.go | 156 +++++++++----- ...figuration_builder_event_selection_test.go | 90 ++++++++ .../node_configuration_builder_test.go | 94 ++++++++ .../src/pages/workflowv2/mappers/forEach.ts | 153 -------------- 14 files changed, 310 insertions(+), 551 deletions(-) delete mode 100644 pkg/components/foreach/example.go delete mode 100644 pkg/components/foreach/example_output.json delete mode 100644 pkg/components/foreach/for_each.go delete mode 100644 pkg/components/foreach/for_each_test.go create mode 100644 pkg/workers/contexts/node_configuration_builder_event_selection_test.go delete mode 100644 web_src/src/pages/workflowv2/mappers/forEach.ts diff --git a/.cursor/skills/superplane-component-mappers/SKILL.md b/.cursor/skills/superplane-component-mappers/SKILL.md index f6ddefb346..96f64fea25 100644 --- a/.cursor/skills/superplane-component-mappers/SKILL.md +++ b/.cursor/skills/superplane-component-mappers/SKILL.md @@ -28,7 +28,7 @@ Backend registration alone is not enough. Every user-facing **component** (and * Copy and complete: ``` -- [ ] Registry key matches Go `Name()` exactly (e.g. `forEach`, not `for-each`) +- [ ] Registry key matches Go `Name()` exactly (e.g. `fanOut`, not `fan-out`) - [ ] `componentBaseMappers` entry in index.ts - [ ] `iconSlug` matches backend `Icon()` when possible - [ ] Configuration fields surfaced in `specs` (expressions, labels, etc.) @@ -49,15 +49,15 @@ import type { ComponentBaseContext, ComponentBaseMapper, ExecutionDetailsContext import type { ComponentBaseProps } from "@/ui/componentBase"; import { renderTimeAgo } from "@/components/TimeAgo"; -type ForEachConfiguration = { arrayExpression: string }; +type FanOutConfiguration = { arrayExpression: string }; -export const forEachMapper: ComponentBaseMapper = { +export const fanOutMapper: ComponentBaseMapper = { props(context: ComponentBaseContext): ComponentBaseProps { - const configuration = context.node.configuration as ForEachConfiguration; + const configuration = context.node.configuration as FanOutConfiguration; return { iconSlug: "split", // match backend Icon() collapsed: context.node.isCollapsed, - title: context.node.name || context.componentDefinition.label || "For Each", + title: context.node.name || context.componentDefinition.label || "Fan Out", specs: configuration.arrayExpression ? [{ title: "Array", tooltipTitle: "Array expression", value: configuration.arrayExpression }] : undefined, @@ -68,7 +68,7 @@ export const forEachMapper: ComponentBaseMapper = { return context.execution.createdAt ? renderTimeAgo(new Date(context.execution.createdAt)) : ""; }, getExecutionDetails(context: ExecutionDetailsContext) { - const configuration = context.execution.configuration as ForEachConfiguration; + const configuration = context.execution.configuration as FanOutConfiguration; return { "Array expression": configuration.arrayExpression ?? "-" }; }, }; @@ -77,21 +77,21 @@ export const forEachMapper: ComponentBaseMapper = { 2. **Register** in `web_src/src/pages/workflowv2/mappers/index.ts`: ```typescript -import { forEachMapper } from "./forEach"; +import { fanOutMapper } from "./fanOut"; const componentBaseMappers: Record = { // ... - forEach: forEachMapper, + fanOut: fanOutMapper, }; ``` -3. **Custom states** (if the component emits named channels): define `*_STATE_MAP`, `*StateFunction`, `*_STATE_REGISTRY`, and add to `eventStateRegistries`. Mirror channel names from Go `OutputChannels()` (e.g. For Each → `item`). +3. **Custom states** (if the component emits named channels): define `*_STATE_MAP`, `*StateFunction`, `*_STATE_REGISTRY`, and add to `eventStateRegistries`. Mirror channel names from Go `OutputChannels()` (e.g. Fan Out → `item`). ## Verify registration ```bash -# Registry key must exist (replace forEach with component Name()) -rg 'forEach:' web_src/src/pages/workflowv2/mappers/index.ts +# Registry key must exist (replace fanOut with component Name()) +rg 'fanOut:' web_src/src/pages/workflowv2/mappers/index.ts ``` Integration components: key is often `integration.component` via `appMappers`; see `docs/contributing/integrations.md` and `docs/contributing/component-customization.md`. diff --git a/db/structure.sql b/db/structure.sql index 1c8255b429..ff39f124c5 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -5,7 +5,7 @@ \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2145,7 +2145,7 @@ ALTER TABLE ONLY public.workflows \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2181,7 +2181,7 @@ COPY public.schema_migrations (version, dirty) FROM stdin; \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; diff --git a/docs/components/Core.mdx b/docs/components/Core.mdx index 0524c55a94..556415cb56 100644 --- a/docs/components/Core.mdx +++ b/docs/components/Core.mdx @@ -398,7 +398,7 @@ The Fan Out component reads an array from the upstream payload and emits one dow ### Use Cases - Iterate over a list of results and process each one independently -- Fan out runner output arrays into per-item workflow paths +- Split runner output arrays into per-item workflow paths - Process each page, service, or record with the same downstream steps ### How It Works diff --git a/pkg/components/fanout/fan_out.go b/pkg/components/fanout/fan_out.go index c3e4ad0fbe..b7c2e73548 100644 --- a/pkg/components/fanout/fan_out.go +++ b/pkg/components/fanout/fan_out.go @@ -44,7 +44,7 @@ func (f *FanOut) Documentation() string { ## Use Cases - Iterate over a list of results and process each one independently -- Fan out runner output arrays into per-item workflow paths +- Split runner output arrays into per-item workflow paths - Process each page, service, or record with the same downstream steps ## How It Works @@ -86,7 +86,7 @@ func (f *FanOut) Configuration() []configuration.Field { Name: "arrayExpression", Label: "Array Expression", Type: configuration.FieldTypeExpression, - Description: "Expression that evaluates to the array to fan out", + Description: "Expression that evaluates to the array to iterate over", Required: true, }, } @@ -161,11 +161,9 @@ func toSlice(v any) ([]any, error) { if v == nil { return []any{}, nil } - // direct []any if s, ok := v.([]any); ok { return s, nil } - // reflect-based fallback for typed slices rv := reflect.ValueOf(v) if rv.Kind() != reflect.Slice { return nil, fmt.Errorf("got %T", v) diff --git a/pkg/components/fanout/fan_out_test.go b/pkg/components/fanout/fan_out_test.go index e09284fe1a..245b33cafa 100644 --- a/pkg/components/fanout/fan_out_test.go +++ b/pkg/components/fanout/fan_out_test.go @@ -35,13 +35,11 @@ func TestFanOutExecute(t *testing.T) { assert.Equal(t, PayloadType, execState.Type) require.Len(t, execState.Payloads, 3) - // check first payload first := execState.Payloads[0].(map[string]any)["data"].(map[string]any) assert.Equal(t, map[string]any{"service": "EC2", "cost": 10.0}, first["item"]) assert.Equal(t, 0, first["index"]) assert.Equal(t, 3, first["totalCount"]) - // check last payload last := execState.Payloads[2].(map[string]any)["data"].(map[string]any) assert.Equal(t, map[string]any{"service": "RDS", "cost": 20.0}, last["item"]) assert.Equal(t, 2, last["index"]) diff --git a/pkg/components/foreach/example.go b/pkg/components/foreach/example.go deleted file mode 100644 index 800efc43a5..0000000000 --- a/pkg/components/foreach/example.go +++ /dev/null @@ -1,18 +0,0 @@ -package foreach - -import ( - _ "embed" - "sync" - - "github.com/superplanehq/superplane/pkg/utils" -) - -//go:embed example_output.json -var exampleOutputBytes []byte - -var exampleOutputOnce sync.Once -var exampleOutput map[string]any - -func (c *ForEach) ExampleOutput() map[string]any { - return utils.UnmarshalEmbeddedJSON(&exampleOutputOnce, exampleOutputBytes, &exampleOutput) -} diff --git a/pkg/components/foreach/example_output.json b/pkg/components/foreach/example_output.json deleted file mode 100644 index e3002c22d3..0000000000 --- a/pkg/components/foreach/example_output.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "type": "foreach.item", - "data": { - "item": { - "service": "EC2", - "cost_usd": 42.5 - }, - "index": 0, - "totalCount": 3 - }, - "timestamp": "2026-01-16T17:56:16.680755501Z" -} diff --git a/pkg/components/foreach/for_each.go b/pkg/components/foreach/for_each.go deleted file mode 100644 index 8956f62a58..0000000000 --- a/pkg/components/foreach/for_each.go +++ /dev/null @@ -1,200 +0,0 @@ -package foreach - -import ( - "fmt" - "net/http" - "reflect" - - "github.com/google/uuid" - "github.com/mitchellh/mapstructure" - "github.com/superplanehq/superplane/pkg/configuration" - "github.com/superplanehq/superplane/pkg/core" - "github.com/superplanehq/superplane/pkg/registry" -) - -const ComponentName = "forEach" -const PayloadType = "foreach.item" -const ChannelNameItem = "item" - -func init() { - registry.RegisterAction(ComponentName, &ForEach{}) -} - -type ForEach struct{} - -type Spec struct { - ArrayExpression string `json:"arrayExpression"` -} - -func (c *ForEach) Name() string { - return ComponentName -} - -func (c *ForEach) Label() string { - return "For Each" -} - -func (c *ForEach) Description() string { - return "Emit one downstream event per item in an array" -} - -func (c *ForEach) Documentation() string { - return `The For Each component reads an array from the upstream payload and emits one downstream event per element. - -## Use Cases - -- Iterate over a list of results and process each one independently -- Split runner output arrays into per-item workflow paths -- Process each page, service, or record with the same downstream steps - -## How It Works - -1. Evaluates the configured array expression against the incoming event data -2. Emits one ` + "`foreach.item`" + ` event to the ` + "`item`" + ` channel for each element -3. If the array is empty, passes without emitting any events - -## Output Fields (per item) - -- **item**: The array element value -- **index**: Zero-based index of the element -- **totalCount**: Total number of items in the array - -## Expression Environment - -- **$**: The run context data -- **root()**: Access root event data -- **previous()**: Access previous node outputs` -} - -func (c *ForEach) Icon() string { - return "split" -} - -func (c *ForEach) Color() string { - return "blue" -} - -func (c *ForEach) OutputChannels(configuration any) []core.OutputChannel { - return []core.OutputChannel{ - {Name: ChannelNameItem, Label: "Item"}, - } -} - -func (c *ForEach) Configuration() []configuration.Field { - return []configuration.Field{ - { - Name: "arrayExpression", - Label: "Array Expression", - Type: configuration.FieldTypeExpression, - Description: "Expression that evaluates to the array to iterate over", - Required: true, - }, - } -} - -func (c *ForEach) Setup(ctx core.SetupContext) error { - spec, err := decodeSpec(ctx.Configuration) - if err != nil { - return err - } - return validateSpec(spec) -} - -func (c *ForEach) Execute(ctx core.ExecutionContext) error { - spec, err := decodeSpec(ctx.Configuration) - if err != nil { - return err - } - if err := validateSpec(spec); err != nil { - return err - } - - result, err := ctx.Expressions.Run(spec.ArrayExpression) - if err != nil { - return fmt.Errorf("expression evaluation failed: %w", err) - } - - items, err := toSlice(result) - if err != nil { - return fmt.Errorf("expression must evaluate to an array: %w", err) - } - - if err := ctx.Metadata.Set(map[string]any{ - "arrayExpression": spec.ArrayExpression, - "count": len(items), - }); err != nil { - return fmt.Errorf("failed to set execution metadata: %w", err) - } - - if len(items) == 0 { - return ctx.ExecutionState.Pass() - } - - payloads := make([]any, 0, len(items)) - for i, item := range items { - payloads = append(payloads, map[string]any{ - "item": item, - "index": i, - "totalCount": len(items), - }) - } - - return ctx.ExecutionState.Emit(ChannelNameItem, PayloadType, payloads) -} - -func decodeSpec(raw any) (Spec, error) { - var spec Spec - if err := mapstructure.Decode(raw, &spec); err != nil { - return Spec{}, fmt.Errorf("failed to decode configuration: %w", err) - } - return spec, nil -} - -func validateSpec(spec Spec) error { - if spec.ArrayExpression == "" { - return fmt.Errorf("arrayExpression is required") - } - return nil -} - -func toSlice(v any) ([]any, error) { - if v == nil { - return []any{}, nil - } - if s, ok := v.([]any); ok { - return s, nil - } - rv := reflect.ValueOf(v) - if rv.Kind() != reflect.Slice { - return nil, fmt.Errorf("got %T", v) - } - result := make([]any, rv.Len()) - for i := range result { - result[i] = rv.Index(i).Interface() - } - return result, nil -} - -func (c *ForEach) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { - return ctx.DefaultProcessing() -} - -func (c *ForEach) Cancel(ctx core.ExecutionContext) error { - return nil -} - -func (c *ForEach) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.WebhookResponseBody, error) { - return http.StatusOK, nil, nil -} - -func (c *ForEach) Cleanup(ctx core.SetupContext) error { - return nil -} - -func (c *ForEach) Hooks() []core.Hook { - return []core.Hook{} -} - -func (c *ForEach) HandleHook(ctx core.ActionHookContext) error { - return nil -} diff --git a/pkg/components/foreach/for_each_test.go b/pkg/components/foreach/for_each_test.go deleted file mode 100644 index 9fe87f6498..0000000000 --- a/pkg/components/foreach/for_each_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package foreach - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/superplanehq/superplane/pkg/core" - "github.com/superplanehq/superplane/test/support/contexts" -) - -func TestForEachExecute(t *testing.T) { - t.Run("emits one payload per array item", func(t *testing.T) { - component := &ForEach{} - execState := &contexts.ExecutionStateContext{} - execMetadata := &contexts.MetadataContext{} - exprCtx := &contexts.ExpressionContext{ - Output: []any{ - map[string]any{"service": "EC2", "cost": 10.0}, - map[string]any{"service": "S3", "cost": 5.0}, - map[string]any{"service": "RDS", "cost": 20.0}, - }, - } - - err := component.Execute(core.ExecutionContext{ - Configuration: map[string]any{"arrayExpression": `$["Runner"].by_service`}, - Metadata: execMetadata, - ExecutionState: execState, - Expressions: exprCtx, - }) - - require.NoError(t, err) - assert.True(t, execState.Passed) - assert.Equal(t, ChannelNameItem, execState.Channel) - assert.Equal(t, PayloadType, execState.Type) - require.Len(t, execState.Payloads, 3) - - first := execState.Payloads[0].(map[string]any)["data"].(map[string]any) - assert.Equal(t, map[string]any{"service": "EC2", "cost": 10.0}, first["item"]) - assert.Equal(t, 0, first["index"]) - assert.Equal(t, 3, first["totalCount"]) - - last := execState.Payloads[2].(map[string]any)["data"].(map[string]any) - assert.Equal(t, map[string]any{"service": "RDS", "cost": 20.0}, last["item"]) - assert.Equal(t, 2, last["index"]) - }) - - t.Run("passes when array is empty", func(t *testing.T) { - component := &ForEach{} - execState := &contexts.ExecutionStateContext{} - execMetadata := &contexts.MetadataContext{} - exprCtx := &contexts.ExpressionContext{Output: []any{}} - - err := component.Execute(core.ExecutionContext{ - Configuration: map[string]any{"arrayExpression": `$["Runner"].items`}, - Metadata: execMetadata, - ExecutionState: execState, - Expressions: exprCtx, - }) - - require.NoError(t, err) - assert.True(t, execState.Passed) - assert.Equal(t, "", execState.Channel) - assert.Empty(t, execState.Payloads) - }) - - t.Run("returns error when expression result is not an array", func(t *testing.T) { - component := &ForEach{} - execState := &contexts.ExecutionStateContext{} - execMetadata := &contexts.MetadataContext{} - exprCtx := &contexts.ExpressionContext{Output: "not-an-array"} - - err := component.Execute(core.ExecutionContext{ - Configuration: map[string]any{"arrayExpression": `$["Runner"].name`}, - Metadata: execMetadata, - ExecutionState: execState, - Expressions: exprCtx, - }) - - assert.ErrorContains(t, err, "expression must evaluate to an array") - }) - - t.Run("returns error when arrayExpression is missing", func(t *testing.T) { - component := &ForEach{} - execState := &contexts.ExecutionStateContext{} - execMetadata := &contexts.MetadataContext{} - exprCtx := &contexts.ExpressionContext{} - - err := component.Execute(core.ExecutionContext{ - Configuration: map[string]any{}, - Metadata: execMetadata, - ExecutionState: execState, - Expressions: exprCtx, - }) - - assert.ErrorContains(t, err, "arrayExpression is required") - }) -} diff --git a/pkg/registryimports/registryimports.go b/pkg/registryimports/registryimports.go index 49449e1bfa..37cb552be8 100644 --- a/pkg/registryimports/registryimports.go +++ b/pkg/registryimports/registryimports.go @@ -8,8 +8,8 @@ import ( _ "github.com/superplanehq/superplane/pkg/components/approval" _ "github.com/superplanehq/superplane/pkg/components/deletememory" _ "github.com/superplanehq/superplane/pkg/components/display" - _ "github.com/superplanehq/superplane/pkg/components/fanout" _ "github.com/superplanehq/superplane/pkg/components/filter" + _ "github.com/superplanehq/superplane/pkg/components/fanout" _ "github.com/superplanehq/superplane/pkg/components/graphql" _ "github.com/superplanehq/superplane/pkg/components/http" _ "github.com/superplanehq/superplane/pkg/components/if" diff --git a/pkg/workers/contexts/node_configuration_builder.go b/pkg/workers/contexts/node_configuration_builder.go index 2890674f88..0979960d68 100644 --- a/pkg/workers/contexts/node_configuration_builder.go +++ b/pkg/workers/contexts/node_configuration_builder.go @@ -589,21 +589,42 @@ func (b *NodeConfigurationBuilder) populateFromExecutions( executionIDByRef[nodeRef] = execution.ID } - events, err := models.ListCanvasEventsForExecutionsInTransaction(b.tx, executionIDs) + chainExecutions, err := b.listLinearExecutionsInChain() if err != nil { return err } - latestByExecution := latestEventByExecution(events, executionIDs) + executionIDSet := make(map[uuid.UUID]struct{}, len(executionIDByRef)+len(chainExecutions)) + for _, executionID := range executionIDByRef { + executionIDSet[executionID] = struct{}{} + } + for _, execution := range chainExecutions { + executionIDSet[execution.ID] = struct{}{} + } + + allExecutionIDs := make([]uuid.UUID, 0, len(executionIDSet)) + for executionID := range executionIDSet { + allExecutionIDs = append(allExecutionIDs, executionID) + } + + events, err := models.ListCanvasEventsForExecutionsInTransaction(b.tx, allExecutionIDs) + if err != nil { + return err + } + + events, err = appendPreferredBranchEvents(b.tx, events, preferredEventIDByExecution(chainExecutions)) + if err != nil { + return err + } + + eventsByID := indexEventsByID(events) + preferredEventID := preferredEventIDByExecution(chainExecutions) + for nodeRef, executionID := range executionIDByRef { - if payload, ok, err := b.payloadFromTriggeringEvent(executionID); err != nil { - return err - } else if ok { - messageChain[nodeRef] = payload - continue + event, ok, err := eventForExecution(executionID, events, eventsByID, preferredEventID) + if err != nil { + return fmt.Errorf("node %s: %w", nodeRef, err) } - - event, ok := latestByExecution[executionID] if !ok { return fmt.Errorf("node %s has no outputs", nodeRef) } @@ -614,55 +635,85 @@ func (b *NodeConfigurationBuilder) populateFromExecutions( return nil } -// payloadFromTriggeringEvent returns the event payload that triggered the current -// execution when that event belongs to the referenced node's execution. This -// preserves per-branch data for components (e.g. For Each) that emit multiple -// events from a single execution. -func (b *NodeConfigurationBuilder) payloadFromTriggeringEvent(executionID uuid.UUID) (any, bool, error) { - if b.previousExecutionID == nil { - return nil, false, nil - } - - prevExecution, err := models.FindNodeExecutionInTransaction(b.tx, b.workflowID, *b.previousExecutionID) - if err != nil { - return nil, false, err - } - - if prevExecution.EventID == uuid.Nil { - return nil, false, nil - } - - triggerEvent, err := models.FindCanvasEventInTransaction(b.tx, prevExecution.EventID) - if err != nil { - return nil, false, err +// preferredEventIDByExecution maps a parent execution to the canvas event that routed +// into the child on this branch (from child.EventID when child.PreviousExecutionID is set). +func preferredEventIDByExecution(executions []models.CanvasNodeExecution) map[uuid.UUID]uuid.UUID { + preferred := make(map[uuid.UUID]uuid.UUID, len(executions)) + for _, execution := range executions { + if execution.PreviousExecutionID == nil || execution.EventID == uuid.Nil { + continue + } + preferred[*execution.PreviousExecutionID] = execution.EventID } + return preferred +} - if triggerEvent.ExecutionID == nil || *triggerEvent.ExecutionID != executionID { - return nil, false, nil +func indexEventsByID(events []models.CanvasEvent) map[uuid.UUID]models.CanvasEvent { + byID := make(map[uuid.UUID]models.CanvasEvent, len(events)) + for _, event := range events { + byID[event.ID] = event } - - return triggerEvent.Data.Data(), true, nil + return byID } -func latestEventByExecution(events []models.CanvasEvent, executionIDs []uuid.UUID) map[uuid.UUID]models.CanvasEvent { - latestByExecution := make(map[uuid.UUID]models.CanvasEvent, len(executionIDs)) - for _, event := range events { - if event.ExecutionID == nil { +func appendPreferredBranchEvents( + tx *gorm.DB, + events []models.CanvasEvent, + preferredEventID map[uuid.UUID]uuid.UUID, +) ([]models.CanvasEvent, error) { + byID := indexEventsByID(events) + for _, eventID := range preferredEventID { + if eventID == uuid.Nil { continue } - - latest, ok := latestByExecution[*event.ExecutionID] - if !ok || event.CreatedAt == nil { - latestByExecution[*event.ExecutionID] = event + if _, ok := byID[eventID]; ok { continue } - if latest.CreatedAt == nil || event.CreatedAt.After(*latest.CreatedAt) { - latestByExecution[*event.ExecutionID] = event + event, err := models.FindCanvasEventInTransaction(tx, eventID) + if err != nil { + return nil, err + } + events = append(events, *event) + byID[eventID] = *event + } + return events, nil +} + +// eventForExecution picks the canvas event whose payload should represent an upstream +// execution on the current branch. It prefers the child's incoming event ID from the +// linear execution chain; otherwise it uses the sole event on that execution. +func eventForExecution( + executionID uuid.UUID, + events []models.CanvasEvent, + eventsByID map[uuid.UUID]models.CanvasEvent, + preferredEventID map[uuid.UUID]uuid.UUID, +) (models.CanvasEvent, bool, error) { + if eventID, ok := preferredEventID[executionID]; ok && eventID != uuid.Nil { + if event, found := eventsByID[eventID]; found { + return event, true, nil } } - return latestByExecution + var matched []models.CanvasEvent + for _, event := range events { + if event.ExecutionID != nil && *event.ExecutionID == executionID { + matched = append(matched, event) + } + } + + switch len(matched) { + case 0: + return models.CanvasEvent{}, false, nil + case 1: + return matched[0], true, nil + default: + return models.CanvasEvent{}, false, fmt.Errorf( + "execution %s has ambiguous outputs (%d events)", + executionID, + len(matched), + ) + } } func parseDepth(param any) (int, error) { @@ -760,15 +811,24 @@ func (b *NodeConfigurationBuilder) resolveFromExecutions(depth int, step int, ha return step, nil, err } - latestByExecution := latestEventByExecution(events, executionIDs) + preferredEventID := preferredEventIDByExecution(executionsInChain) + events, err = appendPreferredBranchEvents(b.tx, events, preferredEventID) + if err != nil { + return step, nil, err + } + + eventsByID := indexEventsByID(events) for _, execution := range executionsInChain[startIndex:] { step++ if step < depth { continue } - event, exists := latestByExecution[execution.ID] - if !exists { + event, ok, err := eventForExecution(execution.ID, events, eventsByID, preferredEventID) + if err != nil { + return step, nil, err + } + if !ok { continue } diff --git a/pkg/workers/contexts/node_configuration_builder_event_selection_test.go b/pkg/workers/contexts/node_configuration_builder_event_selection_test.go new file mode 100644 index 0000000000..c48492a453 --- /dev/null +++ b/pkg/workers/contexts/node_configuration_builder_event_selection_test.go @@ -0,0 +1,90 @@ +package contexts + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/models" + "gorm.io/datatypes" +) + +func Test_preferredEventIDByExecution(t *testing.T) { + parentID := uuid.New() + childID := uuid.New() + childEventID := uuid.New() + + executions := []models.CanvasNodeExecution{ + {ID: parentID}, + { + ID: childID, + EventID: childEventID, + PreviousExecutionID: &parentID, + }, + } + + preferred := preferredEventIDByExecution(executions) + assert.Equal(t, childEventID, preferred[parentID]) +} + +func Test_eventForExecution_prefersBranchEvent(t *testing.T) { + parentExecutionID := uuid.New() + branchEventID := uuid.New() + otherEventID := uuid.New() + now := time.Now() + + events := []models.CanvasEvent{ + { + ID: otherEventID, + ExecutionID: &parentExecutionID, + CreatedAt: &now, + Data: datatypes.NewJSONType[any](map[string]any{"item": "a"}), + }, + { + ID: branchEventID, + ExecutionID: &parentExecutionID, + CreatedAt: &now, + Data: datatypes.NewJSONType[any](map[string]any{"item": "b"}), + }, + } + + eventsByID := indexEventsByID(events) + preferred := map[uuid.UUID]uuid.UUID{parentExecutionID: branchEventID} + + event, ok, err := eventForExecution(parentExecutionID, events, eventsByID, preferred) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, branchEventID, event.ID) +} + +func Test_eventForExecution_singleEventWithoutPreferred(t *testing.T) { + executionID := uuid.New() + eventID := uuid.New() + now := time.Now() + + events := []models.CanvasEvent{ + {ID: eventID, ExecutionID: &executionID, CreatedAt: &now}, + } + + event, ok, err := eventForExecution(executionID, events, indexEventsByID(events), nil) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, eventID, event.ID) +} + +func Test_eventForExecution_ambiguousWithoutBranchLink(t *testing.T) { + executionID := uuid.New() + now := time.Now() + + events := []models.CanvasEvent{ + {ID: uuid.New(), ExecutionID: &executionID, CreatedAt: &now}, + {ID: uuid.New(), ExecutionID: &executionID, CreatedAt: &now}, + } + + _, ok, err := eventForExecution(executionID, events, indexEventsByID(events), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "ambiguous outputs") + assert.False(t, ok) +} diff --git a/pkg/workers/contexts/node_configuration_builder_test.go b/pkg/workers/contexts/node_configuration_builder_test.go index 7980227eef..30ac98637c 100644 --- a/pkg/workers/contexts/node_configuration_builder_test.go +++ b/pkg/workers/contexts/node_configuration_builder_test.go @@ -2,6 +2,7 @@ package contexts import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1626,3 +1627,96 @@ func Test_NodeConfigurationBuilder_Config_DoesNotOverwriteExistingConfigKey(t *t assert.Equal(t, "https://api.example.com", result["prev_api_url"]) assert.Equal(t, "ok", result["output_status"]) } + +func Test_NodeConfigurationBuilder_FanOutBranchPayload(t *testing.T) { + r := support.Setup(t) + defer r.Close() + + triggerNode := "start" + fanOutNode := "fanOut" + waitNode := "wait" + displayNode := "display" + + canvas, _ := support.CreateCanvas( + t, + r.Organization.ID, + r.User, + []models.CanvasNode{ + { + NodeID: triggerNode, + Name: triggerNode, + Type: models.NodeTypeTrigger, + Ref: datatypes.NewJSONType(models.NodeRef{Trigger: &models.TriggerRef{Name: "start"}}), + }, + { + NodeID: fanOutNode, + Name: fanOutNode, + Type: models.NodeTypeComponent, + Ref: datatypes.NewJSONType(models.NodeRef{Component: &models.ComponentRef{Name: "fanOut"}}), + }, + { + NodeID: waitNode, + Name: waitNode, + Type: models.NodeTypeComponent, + Ref: datatypes.NewJSONType(models.NodeRef{Component: &models.ComponentRef{Name: "wait"}}), + }, + { + NodeID: displayNode, + Name: displayNode, + Type: models.NodeTypeComponent, + Ref: datatypes.NewJSONType(models.NodeRef{Component: &models.ComponentRef{Name: "display"}}), + }, + }, + []models.Edge{ + {SourceID: triggerNode, TargetID: fanOutNode, Channel: "default"}, + {SourceID: fanOutNode, TargetID: waitNode, Channel: "item"}, + {SourceID: waitNode, TargetID: displayNode, Channel: "default"}, + }, + ) + + rootEvent := support.EmitCanvasEventForNodeWithData( + t, + canvas.ID, + triggerNode, + "default", + nil, + map[string]any{"items": []any{"a", "b", "c"}}, + ) + + fanOutExecution := support.CreateCanvasNodeExecution(t, canvas.ID, fanOutNode, rootEvent.ID, rootEvent.ID, nil) + + now := time.Now() + emitItem := func(item string, index int) *models.CanvasEvent { + return support.EmitCanvasEventForNodeWithData( + t, + canvas.ID, + fanOutNode, + "item", + &fanOutExecution.ID, + map[string]any{"item": item, "index": index, "totalCount": 3}, + ) + } + + _ = emitItem("a", 0) + branchEvent := emitItem("b", 1) + _ = emitItem("c", 2) + + // Align timestamps like PassInTransaction (same CreatedAt for all branch events). + require.NoError(t, database.Conn().Model(&models.CanvasEvent{}). + Where("execution_id = ?", fanOutExecution.ID). + Update("created_at", now).Error) + + waitInput := map[string]any{"item": "b", "index": 1, "totalCount": 3} + waitExecution := support.CreateNextNodeExecution(t, canvas.ID, waitNode, rootEvent.ID, branchEvent.ID, &fanOutExecution.ID) + + builder := NewNodeConfigurationBuilder(database.Conn(), canvas.ID). + WithPreviousExecution(&waitExecution.ID). + WithRootEvent(&rootEvent.ID). + WithInput(map[string]any{waitNode: waitInput}) + + result, err := builder.Build(map[string]any{ + "item": "{{ $[\"" + fanOutNode + "\"].item }}", + }) + require.NoError(t, err) + assert.Equal(t, "b", result["item"]) +} diff --git a/web_src/src/pages/workflowv2/mappers/forEach.ts b/web_src/src/pages/workflowv2/mappers/forEach.ts deleted file mode 100644 index c863148e03..0000000000 --- a/web_src/src/pages/workflowv2/mappers/forEach.ts +++ /dev/null @@ -1,153 +0,0 @@ -import type { - ComponentBaseContext, - ComponentBaseMapper, - EventStateRegistry, - ExecutionDetailsContext, - ExecutionInfo, - NodeInfo, - OutputPayload, - StateFunction, - SubtitleContext, -} from "./types"; -import type { ComponentBaseProps, EventSection, EventState, EventStateMap } from "@/ui/componentBase"; -import { DEFAULT_EVENT_STATE_MAP } from "@/ui/componentBase"; -import { getTriggerRenderer, getState, getStateMap } from "."; -import type React from "react"; -import { getBackgroundColorClass } from "@/lib/colors"; -import { renderTimeAgo } from "@/components/TimeAgo"; -import { formatTimestampInUserTimezone } from "@/lib/timezone"; - -type ForEachOutputs = Record; - -export const FOR_EACH_STATE_MAP: EventStateMap = { - ...DEFAULT_EVENT_STATE_MAP, - passed: { - icon: "circle-check", - textColor: "text-gray-800", - backgroundColor: "bg-green-100", - badgeColor: "bg-emerald-500", - }, - rejected: { - icon: "circle-x", - textColor: "text-gray-800", - backgroundColor: "bg-gray-100", - badgeColor: "bg-gray-500", - }, -}; - -export const forEachStateFunction: StateFunction = (execution: ExecutionInfo): EventState => { - if (!execution) return "neutral"; - - if ( - execution.resultMessage && - (execution.resultReason === "RESULT_REASON_ERROR" || - (execution.result === "RESULT_FAILED" && execution.resultReason !== "RESULT_REASON_ERROR_RESOLVED")) - ) { - return "error"; - } - - if (execution.result === "RESULT_CANCELLED") { - return "cancelled"; - } - - if (execution.state === "STATE_PENDING" || execution.state === "STATE_STARTED") { - return "running"; - } - - if (execution.state === "STATE_FINISHED" && execution.result === "RESULT_PASSED") { - const outputs = execution.outputs as ForEachOutputs | undefined; - const itemOutputs = outputs?.item; - const hasItems = Array.isArray(itemOutputs) && itemOutputs.length > 0; - return hasItems ? "passed" : "rejected"; - } - - return "failed"; -}; - -export const FOR_EACH_STATE_REGISTRY: EventStateRegistry = { - stateMap: FOR_EACH_STATE_MAP, - getState: forEachStateFunction, -}; - -type ForEachConfiguration = { - arrayExpression: string; -}; - -type ForEachMetadata = { - arrayExpression?: string; - count?: number; -}; - -export const forEachMapper: ComponentBaseMapper = { - props(context: ComponentBaseContext): ComponentBaseProps { - const componentName = context.componentDefinition.name || "forEach"; - const configuration = context.node.configuration as ForEachConfiguration; - const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; - const specs = configuration.arrayExpression - ? [ - { - title: "Array", - tooltipTitle: "Array expression", - value: configuration.arrayExpression, - }, - ] - : undefined; - - return { - iconSlug: "split", - collapsed: context.node.isCollapsed, - collapsedBackground: getBackgroundColorClass("white"), - title: context.node.name || context.componentDefinition.label || context.componentDefinition.name || "For Each", - eventSections: lastExecution ? getForEachEventSections(context.nodes, lastExecution, componentName) : undefined, - includeEmptyState: !lastExecution, - specs, - eventStateMap: getStateMap(componentName), - }; - }, - - subtitle(context: SubtitleContext): string | React.ReactNode { - if (!context.execution.createdAt) return ""; - return renderTimeAgo(new Date(context.execution.createdAt)); - }, - - getExecutionDetails(context: ExecutionDetailsContext): Record { - const configuration = context.execution.configuration as ForEachConfiguration; - const metadata = context.execution.metadata as ForEachMetadata | undefined; - const details: Record = { - "Evaluated at": context.execution.createdAt ? formatTimestampInUserTimezone(context.execution.createdAt) : "-", - "Array expression": configuration.arrayExpression ?? metadata?.arrayExpression ?? "-", - }; - - if (typeof metadata?.count === "number") { - details["Items emitted"] = metadata.count; - } - - return details; - }, -}; - -function getForEachEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { - const rootEvent = execution.rootEvent; - if (!rootEvent?.id || !execution.createdAt) { - return []; - } - - const rootTriggerNode = nodes.find((n) => n.id === rootEvent.nodeId); - if (!rootTriggerNode?.componentName) { - return []; - } - - const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode.componentName); - const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: rootEvent }); - const createdAt = new Date(execution.createdAt); - - return [ - { - receivedAt: createdAt, - eventTitle: title, - eventSubtitle: renderTimeAgo(createdAt), - eventState: getState(componentName)(execution), - eventId: rootEvent.id, - }, - ]; -}