diff --git a/.cursor/skills/superplane-component-mappers/SKILL.md b/.cursor/skills/superplane-component-mappers/SKILL.md new file mode 100644 index 0000000000..96f64fea25 --- /dev/null +++ b/.cursor/skills/superplane-component-mappers/SKILL.md @@ -0,0 +1,107 @@ +--- +name: superplane-component-mappers +description: >- + Adds or reviews workflow UI mappers for SuperPlane core and integration + components. Use when implementing a new backend component or trigger, when a + component has no mapper in web_src, when canvas nodes show generic/noop UI, or + when the user mentions frontend mappers, workflowv2/mappers, or component + customization. +--- + +# SuperPlane Component Frontend Mappers + +Backend registration alone is not enough. Every user-facing **component** (and **trigger**) needs a frontend mapper so the canvas shows the right icon, configuration specs, execution states, subtitles, and execution details. + +**There are no mappers associated with this component in the frontend** means the component `Name()` from Go is missing from `web_src/src/pages/workflowv2/mappers/index.ts`. The UI then falls back to `noopMapper` — generic node chrome with no expression specs, channel-aware states, or tailored execution details. + +## When a mapper is required + +| Change | Mapper work | +|--------|-------------| +| New core component (`pkg/components//`) | `web_src/src/pages/workflowv2/mappers/.ts` + register in `index.ts` | +| New integration component | `web_src/src/pages/workflowv2/mappers//.ts` + register in that integration's `index.ts` | +| New output channels or custom pass/fail semantics | Often also `eventStateRegistries` (see `filter`, `if`, `merge`) | +| Custom config field UI | `customFieldRenderers` (see `wait`, `schedule`) | + +## Checklist (core component) + +Copy and complete: + +``` +- [ ] Registry key matches Go `Name()` exactly (e.g. `fanOut`, not `fan-out`) +- [ ] `componentBaseMappers` entry in index.ts +- [ ] `iconSlug` matches backend `Icon()` when possible +- [ ] Configuration fields surfaced in `specs` (expressions, labels, etc.) +- [ ] `getExecutionDetails` includes key config + metadata from runs +- [ ] Custom `eventStateRegistries` if outputs use non-default channels (passed/rejected/true/false/item/…) +- [ ] Optional `*.spec.ts` for mapper props / execution details +- [ ] `make format.js` and `make check.build.ui` after UI edits +``` + +## Minimal mapper pattern + +Use `filter.ts` or `if.ts` for expression-based core components. Use `noop.ts` only when the component truly needs no customization. + +1. **Create** `web_src/src/pages/workflowv2/mappers/.ts`: + +```typescript +import type { ComponentBaseContext, ComponentBaseMapper, ExecutionDetailsContext, SubtitleContext } from "./types"; +import type { ComponentBaseProps } from "@/ui/componentBase"; +import { renderTimeAgo } from "@/components/TimeAgo"; + +type FanOutConfiguration = { arrayExpression: string }; + +export const fanOutMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const configuration = context.node.configuration as FanOutConfiguration; + return { + iconSlug: "split", // match backend Icon() + collapsed: context.node.isCollapsed, + title: context.node.name || context.componentDefinition.label || "Fan Out", + specs: configuration.arrayExpression + ? [{ title: "Array", tooltipTitle: "Array expression", value: configuration.arrayExpression }] + : undefined, + includeEmptyState: context.lastExecutions.length === 0, + }; + }, + subtitle(context: SubtitleContext) { + return context.execution.createdAt ? renderTimeAgo(new Date(context.execution.createdAt)) : ""; + }, + getExecutionDetails(context: ExecutionDetailsContext) { + const configuration = context.execution.configuration as FanOutConfiguration; + return { "Array expression": configuration.arrayExpression ?? "-" }; + }, +}; +``` + +2. **Register** in `web_src/src/pages/workflowv2/mappers/index.ts`: + +```typescript +import { fanOutMapper } from "./fanOut"; + +const componentBaseMappers: Record = { + // ... + fanOut: fanOutMapper, +}; +``` + +3. **Custom states** (if the component emits named channels): define `*_STATE_MAP`, `*StateFunction`, `*_STATE_REGISTRY`, and add to `eventStateRegistries`. Mirror channel names from Go `OutputChannels()` (e.g. Fan Out → `item`). + +## Verify registration + +```bash +# Registry key must exist (replace fanOut with component Name()) +rg 'fanOut:' web_src/src/pages/workflowv2/mappers/index.ts +``` + +Integration components: key is often `integration.component` via `appMappers`; see `docs/contributing/integrations.md` and `docs/contributing/component-customization.md`. + +## Reference docs + +- [docs/contributing/component-customization.md](docs/contributing/component-customization.md) — registry types, tutorials +- [docs/contributing/component-design.md](docs/contributing/component-design.md) — mapper responsibilities on the canvas +- [web_src/AGENTS.md](web_src/AGENTS.md) — UI conventions and test commands + +## PR review + +If a PR adds `pkg/components/*` or `pkg/integrations/*` actions/triggers but no `web_src/src/pages/workflowv2/mappers/` changes, flag it unless the author explicitly documents intentional noop fallback. diff --git a/docs/components/Core.mdx b/docs/components/Core.mdx index 227235e5ae..556415cb56 100644 --- a/docs/components/Core.mdx +++ b/docs/components/Core.mdx @@ -23,6 +23,7 @@ import { CardGrid, LinkCard } from "@astrojs/starlight/components"; + @@ -388,6 +389,53 @@ The Display component displays a debug message from the latest execution. } ``` + + +## Fan Out + +The Fan Out component reads an array from the upstream payload and emits one downstream event per element. + +### Use Cases + +- Iterate over a list of results and process each one independently +- Split runner output arrays into per-item workflow paths +- Process each page, service, or record with the same downstream steps + +### How It Works + +1. Evaluates the configured array expression against the incoming event data +2. Emits one `fanout.item` event to the `item` channel for each element +3. If the array is empty, passes without emitting any events + +### Output Fields (per item) + +- **item**: The array element value +- **index**: Zero-based index of the element +- **totalCount**: Total number of items in the array + +### Expression Environment + +- **$**: The run context data +- **root()**: Access root event data +- **previous()**: Access previous node outputs + +### Example Output + +```json +{ + "data": { + "index": 0, + "item": { + "cost_usd": 42.5, + "service": "EC2" + }, + "totalCount": 3 + }, + "timestamp": "2026-01-16T17:56:16.680755501Z", + "type": "fanout.item" +} +``` + ## Filter diff --git a/pkg/components/fanout/example.go b/pkg/components/fanout/example.go new file mode 100644 index 0000000000..d6effcc893 --- /dev/null +++ b/pkg/components/fanout/example.go @@ -0,0 +1,18 @@ +package fanout + +import ( + _ "embed" + "sync" + + "github.com/superplanehq/superplane/pkg/utils" +) + +//go:embed example_output.json +var exampleOutputBytes []byte + +var exampleOutputOnce sync.Once +var exampleOutput map[string]any + +func (f *FanOut) ExampleOutput() map[string]any { + return utils.UnmarshalEmbeddedJSON(&exampleOutputOnce, exampleOutputBytes, &exampleOutput) +} diff --git a/pkg/components/fanout/example_output.json b/pkg/components/fanout/example_output.json new file mode 100644 index 0000000000..e25a6b3419 --- /dev/null +++ b/pkg/components/fanout/example_output.json @@ -0,0 +1,12 @@ +{ + "type": "fanout.item", + "data": { + "item": { + "service": "EC2", + "cost_usd": 42.5 + }, + "index": 0, + "totalCount": 3 + }, + "timestamp": "2026-01-16T17:56:16.680755501Z" +} diff --git a/pkg/components/fanout/fan_out.go b/pkg/components/fanout/fan_out.go new file mode 100644 index 0000000000..b7c2e73548 --- /dev/null +++ b/pkg/components/fanout/fan_out.go @@ -0,0 +1,200 @@ +package fanout + +import ( + "fmt" + "net/http" + "reflect" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/registry" +) + +const ComponentName = "fanOut" +const PayloadType = "fanout.item" +const ChannelNameItem = "item" + +func init() { + registry.RegisterAction(ComponentName, &FanOut{}) +} + +type FanOut struct{} + +type Spec struct { + ArrayExpression string `json:"arrayExpression"` +} + +func (f *FanOut) Name() string { + return ComponentName +} + +func (f *FanOut) Label() string { + return "Fan Out" +} + +func (f *FanOut) Description() string { + return "Emit one downstream event per item in an array" +} + +func (f *FanOut) Documentation() string { + return `The Fan Out component reads an array from the upstream payload and emits one downstream event per element. + +## Use Cases + +- Iterate over a list of results and process each one independently +- Split runner output arrays into per-item workflow paths +- Process each page, service, or record with the same downstream steps + +## How It Works + +1. Evaluates the configured array expression against the incoming event data +2. Emits one ` + "`fanout.item`" + ` event to the ` + "`item`" + ` channel for each element +3. If the array is empty, passes without emitting any events + +## Output Fields (per item) + +- **item**: The array element value +- **index**: Zero-based index of the element +- **totalCount**: Total number of items in the array + +## Expression Environment + +- **$**: The run context data +- **root()**: Access root event data +- **previous()**: Access previous node outputs` +} + +func (f *FanOut) Icon() string { + return "split" +} + +func (f *FanOut) Color() string { + return "blue" +} + +func (f *FanOut) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{ + {Name: ChannelNameItem, Label: "Item"}, + } +} + +func (f *FanOut) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "arrayExpression", + Label: "Array Expression", + Type: configuration.FieldTypeExpression, + Description: "Expression that evaluates to the array to iterate over", + Required: true, + }, + } +} + +func (f *FanOut) Setup(ctx core.SetupContext) error { + spec, err := decodeSpec(ctx.Configuration) + if err != nil { + return err + } + return validateSpec(spec) +} + +func (f *FanOut) Execute(ctx core.ExecutionContext) error { + spec, err := decodeSpec(ctx.Configuration) + if err != nil { + return err + } + if err := validateSpec(spec); err != nil { + return err + } + + result, err := ctx.Expressions.Run(spec.ArrayExpression) + if err != nil { + return fmt.Errorf("expression evaluation failed: %w", err) + } + + items, err := toSlice(result) + if err != nil { + return fmt.Errorf("expression must evaluate to an array: %w", err) + } + + if err := ctx.Metadata.Set(map[string]any{ + "arrayExpression": spec.ArrayExpression, + "count": len(items), + }); err != nil { + return fmt.Errorf("failed to set execution metadata: %w", err) + } + + if len(items) == 0 { + return ctx.ExecutionState.Pass() + } + + payloads := make([]any, 0, len(items)) + for i, item := range items { + payloads = append(payloads, map[string]any{ + "item": item, + "index": i, + "totalCount": len(items), + }) + } + + return ctx.ExecutionState.Emit(ChannelNameItem, PayloadType, payloads) +} + +func decodeSpec(raw any) (Spec, error) { + var spec Spec + if err := mapstructure.Decode(raw, &spec); err != nil { + return Spec{}, fmt.Errorf("failed to decode configuration: %w", err) + } + return spec, nil +} + +func validateSpec(spec Spec) error { + if spec.ArrayExpression == "" { + return fmt.Errorf("arrayExpression is required") + } + return nil +} + +func toSlice(v any) ([]any, error) { + if v == nil { + return []any{}, nil + } + if s, ok := v.([]any); ok { + return s, nil + } + rv := reflect.ValueOf(v) + if rv.Kind() != reflect.Slice { + return nil, fmt.Errorf("got %T", v) + } + result := make([]any, rv.Len()) + for i := range result { + result[i] = rv.Index(i).Interface() + } + return result, nil +} + +func (f *FanOut) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (f *FanOut) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (f *FanOut) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.WebhookResponseBody, error) { + return http.StatusOK, nil, nil +} + +func (f *FanOut) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (f *FanOut) Hooks() []core.Hook { + return []core.Hook{} +} + +func (f *FanOut) HandleHook(ctx core.ActionHookContext) error { + return nil +} diff --git a/pkg/components/fanout/fan_out_test.go b/pkg/components/fanout/fan_out_test.go new file mode 100644 index 0000000000..245b33cafa --- /dev/null +++ b/pkg/components/fanout/fan_out_test.go @@ -0,0 +1,98 @@ +package fanout + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func TestFanOutExecute(t *testing.T) { + t.Run("emits one payload per array item", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{ + Output: []any{ + map[string]any{"service": "EC2", "cost": 10.0}, + map[string]any{"service": "S3", "cost": 5.0}, + map[string]any{"service": "RDS", "cost": 20.0}, + }, + } + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].by_service`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + require.NoError(t, err) + assert.True(t, execState.Passed) + assert.Equal(t, ChannelNameItem, execState.Channel) + assert.Equal(t, PayloadType, execState.Type) + require.Len(t, execState.Payloads, 3) + + first := execState.Payloads[0].(map[string]any)["data"].(map[string]any) + assert.Equal(t, map[string]any{"service": "EC2", "cost": 10.0}, first["item"]) + assert.Equal(t, 0, first["index"]) + assert.Equal(t, 3, first["totalCount"]) + + last := execState.Payloads[2].(map[string]any)["data"].(map[string]any) + assert.Equal(t, map[string]any{"service": "RDS", "cost": 20.0}, last["item"]) + assert.Equal(t, 2, last["index"]) + }) + + t.Run("passes when array is empty", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{Output: []any{}} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].items`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + require.NoError(t, err) + assert.True(t, execState.Passed) + assert.Equal(t, "", execState.Channel) + assert.Empty(t, execState.Payloads) + }) + + t.Run("returns error when expression result is not an array", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{Output: "not-an-array"} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{"arrayExpression": `$["Runner"].name`}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + assert.ErrorContains(t, err, "expression must evaluate to an array") + }) + + t.Run("returns error when arrayExpression is missing", func(t *testing.T) { + component := &FanOut{} + execState := &contexts.ExecutionStateContext{} + execMetadata := &contexts.MetadataContext{} + exprCtx := &contexts.ExpressionContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{}, + Metadata: execMetadata, + ExecutionState: execState, + Expressions: exprCtx, + }) + + assert.ErrorContains(t, err, "arrayExpression is required") + }) +} diff --git a/pkg/registryimports/registryimports.go b/pkg/registryimports/registryimports.go index 946ff2b885..37cb552be8 100644 --- a/pkg/registryimports/registryimports.go +++ b/pkg/registryimports/registryimports.go @@ -9,6 +9,7 @@ import ( _ "github.com/superplanehq/superplane/pkg/components/deletememory" _ "github.com/superplanehq/superplane/pkg/components/display" _ "github.com/superplanehq/superplane/pkg/components/filter" + _ "github.com/superplanehq/superplane/pkg/components/fanout" _ "github.com/superplanehq/superplane/pkg/components/graphql" _ "github.com/superplanehq/superplane/pkg/components/http" _ "github.com/superplanehq/superplane/pkg/components/if" diff --git a/pkg/workers/contexts/node_configuration_builder.go b/pkg/workers/contexts/node_configuration_builder.go index b55a4f7e00..0979960d68 100644 --- a/pkg/workers/contexts/node_configuration_builder.go +++ b/pkg/workers/contexts/node_configuration_builder.go @@ -589,14 +589,42 @@ func (b *NodeConfigurationBuilder) populateFromExecutions( executionIDByRef[nodeRef] = execution.ID } - events, err := models.ListCanvasEventsForExecutionsInTransaction(b.tx, executionIDs) + chainExecutions, err := b.listLinearExecutionsInChain() + if err != nil { + return err + } + + executionIDSet := make(map[uuid.UUID]struct{}, len(executionIDByRef)+len(chainExecutions)) + for _, executionID := range executionIDByRef { + executionIDSet[executionID] = struct{}{} + } + for _, execution := range chainExecutions { + executionIDSet[execution.ID] = struct{}{} + } + + allExecutionIDs := make([]uuid.UUID, 0, len(executionIDSet)) + for executionID := range executionIDSet { + allExecutionIDs = append(allExecutionIDs, executionID) + } + + events, err := models.ListCanvasEventsForExecutionsInTransaction(b.tx, allExecutionIDs) + if err != nil { + return err + } + + events, err = appendPreferredBranchEvents(b.tx, events, preferredEventIDByExecution(chainExecutions)) if err != nil { return err } - latestByExecution := latestEventByExecution(events, executionIDs) + eventsByID := indexEventsByID(events) + preferredEventID := preferredEventIDByExecution(chainExecutions) + for nodeRef, executionID := range executionIDByRef { - event, ok := latestByExecution[executionID] + event, ok, err := eventForExecution(executionID, events, eventsByID, preferredEventID) + if err != nil { + return fmt.Errorf("node %s: %w", nodeRef, err) + } if !ok { return fmt.Errorf("node %s has no outputs", nodeRef) } @@ -607,25 +635,85 @@ func (b *NodeConfigurationBuilder) populateFromExecutions( return nil } -func latestEventByExecution(events []models.CanvasEvent, executionIDs []uuid.UUID) map[uuid.UUID]models.CanvasEvent { - latestByExecution := make(map[uuid.UUID]models.CanvasEvent, len(executionIDs)) - for _, event := range events { - if event.ExecutionID == nil { +// preferredEventIDByExecution maps a parent execution to the canvas event that routed +// into the child on this branch (from child.EventID when child.PreviousExecutionID is set). +func preferredEventIDByExecution(executions []models.CanvasNodeExecution) map[uuid.UUID]uuid.UUID { + preferred := make(map[uuid.UUID]uuid.UUID, len(executions)) + for _, execution := range executions { + if execution.PreviousExecutionID == nil || execution.EventID == uuid.Nil { continue } + preferred[*execution.PreviousExecutionID] = execution.EventID + } + return preferred +} - latest, ok := latestByExecution[*event.ExecutionID] - if !ok || event.CreatedAt == nil { - latestByExecution[*event.ExecutionID] = event +func indexEventsByID(events []models.CanvasEvent) map[uuid.UUID]models.CanvasEvent { + byID := make(map[uuid.UUID]models.CanvasEvent, len(events)) + for _, event := range events { + byID[event.ID] = event + } + return byID +} + +func appendPreferredBranchEvents( + tx *gorm.DB, + events []models.CanvasEvent, + preferredEventID map[uuid.UUID]uuid.UUID, +) ([]models.CanvasEvent, error) { + byID := indexEventsByID(events) + for _, eventID := range preferredEventID { + if eventID == uuid.Nil { + continue + } + if _, ok := byID[eventID]; ok { continue } - if latest.CreatedAt == nil || event.CreatedAt.After(*latest.CreatedAt) { - latestByExecution[*event.ExecutionID] = event + event, err := models.FindCanvasEventInTransaction(tx, eventID) + if err != nil { + return nil, err + } + events = append(events, *event) + byID[eventID] = *event + } + return events, nil +} + +// eventForExecution picks the canvas event whose payload should represent an upstream +// execution on the current branch. It prefers the child's incoming event ID from the +// linear execution chain; otherwise it uses the sole event on that execution. +func eventForExecution( + executionID uuid.UUID, + events []models.CanvasEvent, + eventsByID map[uuid.UUID]models.CanvasEvent, + preferredEventID map[uuid.UUID]uuid.UUID, +) (models.CanvasEvent, bool, error) { + if eventID, ok := preferredEventID[executionID]; ok && eventID != uuid.Nil { + if event, found := eventsByID[eventID]; found { + return event, true, nil } } - return latestByExecution + var matched []models.CanvasEvent + for _, event := range events { + if event.ExecutionID != nil && *event.ExecutionID == executionID { + matched = append(matched, event) + } + } + + switch len(matched) { + case 0: + return models.CanvasEvent{}, false, nil + case 1: + return matched[0], true, nil + default: + return models.CanvasEvent{}, false, fmt.Errorf( + "execution %s has ambiguous outputs (%d events)", + executionID, + len(matched), + ) + } } func parseDepth(param any) (int, error) { @@ -723,15 +811,24 @@ func (b *NodeConfigurationBuilder) resolveFromExecutions(depth int, step int, ha return step, nil, err } - latestByExecution := latestEventByExecution(events, executionIDs) + preferredEventID := preferredEventIDByExecution(executionsInChain) + events, err = appendPreferredBranchEvents(b.tx, events, preferredEventID) + if err != nil { + return step, nil, err + } + + eventsByID := indexEventsByID(events) for _, execution := range executionsInChain[startIndex:] { step++ if step < depth { continue } - event, exists := latestByExecution[execution.ID] - if !exists { + event, ok, err := eventForExecution(execution.ID, events, eventsByID, preferredEventID) + if err != nil { + return step, nil, err + } + if !ok { continue } diff --git a/pkg/workers/contexts/node_configuration_builder_event_selection_test.go b/pkg/workers/contexts/node_configuration_builder_event_selection_test.go new file mode 100644 index 0000000000..c48492a453 --- /dev/null +++ b/pkg/workers/contexts/node_configuration_builder_event_selection_test.go @@ -0,0 +1,90 @@ +package contexts + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/models" + "gorm.io/datatypes" +) + +func Test_preferredEventIDByExecution(t *testing.T) { + parentID := uuid.New() + childID := uuid.New() + childEventID := uuid.New() + + executions := []models.CanvasNodeExecution{ + {ID: parentID}, + { + ID: childID, + EventID: childEventID, + PreviousExecutionID: &parentID, + }, + } + + preferred := preferredEventIDByExecution(executions) + assert.Equal(t, childEventID, preferred[parentID]) +} + +func Test_eventForExecution_prefersBranchEvent(t *testing.T) { + parentExecutionID := uuid.New() + branchEventID := uuid.New() + otherEventID := uuid.New() + now := time.Now() + + events := []models.CanvasEvent{ + { + ID: otherEventID, + ExecutionID: &parentExecutionID, + CreatedAt: &now, + Data: datatypes.NewJSONType[any](map[string]any{"item": "a"}), + }, + { + ID: branchEventID, + ExecutionID: &parentExecutionID, + CreatedAt: &now, + Data: datatypes.NewJSONType[any](map[string]any{"item": "b"}), + }, + } + + eventsByID := indexEventsByID(events) + preferred := map[uuid.UUID]uuid.UUID{parentExecutionID: branchEventID} + + event, ok, err := eventForExecution(parentExecutionID, events, eventsByID, preferred) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, branchEventID, event.ID) +} + +func Test_eventForExecution_singleEventWithoutPreferred(t *testing.T) { + executionID := uuid.New() + eventID := uuid.New() + now := time.Now() + + events := []models.CanvasEvent{ + {ID: eventID, ExecutionID: &executionID, CreatedAt: &now}, + } + + event, ok, err := eventForExecution(executionID, events, indexEventsByID(events), nil) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, eventID, event.ID) +} + +func Test_eventForExecution_ambiguousWithoutBranchLink(t *testing.T) { + executionID := uuid.New() + now := time.Now() + + events := []models.CanvasEvent{ + {ID: uuid.New(), ExecutionID: &executionID, CreatedAt: &now}, + {ID: uuid.New(), ExecutionID: &executionID, CreatedAt: &now}, + } + + _, ok, err := eventForExecution(executionID, events, indexEventsByID(events), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "ambiguous outputs") + assert.False(t, ok) +} diff --git a/pkg/workers/contexts/node_configuration_builder_test.go b/pkg/workers/contexts/node_configuration_builder_test.go index 7980227eef..30ac98637c 100644 --- a/pkg/workers/contexts/node_configuration_builder_test.go +++ b/pkg/workers/contexts/node_configuration_builder_test.go @@ -2,6 +2,7 @@ package contexts import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1626,3 +1627,96 @@ func Test_NodeConfigurationBuilder_Config_DoesNotOverwriteExistingConfigKey(t *t assert.Equal(t, "https://api.example.com", result["prev_api_url"]) assert.Equal(t, "ok", result["output_status"]) } + +func Test_NodeConfigurationBuilder_FanOutBranchPayload(t *testing.T) { + r := support.Setup(t) + defer r.Close() + + triggerNode := "start" + fanOutNode := "fanOut" + waitNode := "wait" + displayNode := "display" + + canvas, _ := support.CreateCanvas( + t, + r.Organization.ID, + r.User, + []models.CanvasNode{ + { + NodeID: triggerNode, + Name: triggerNode, + Type: models.NodeTypeTrigger, + Ref: datatypes.NewJSONType(models.NodeRef{Trigger: &models.TriggerRef{Name: "start"}}), + }, + { + NodeID: fanOutNode, + Name: fanOutNode, + Type: models.NodeTypeComponent, + Ref: datatypes.NewJSONType(models.NodeRef{Component: &models.ComponentRef{Name: "fanOut"}}), + }, + { + NodeID: waitNode, + Name: waitNode, + Type: models.NodeTypeComponent, + Ref: datatypes.NewJSONType(models.NodeRef{Component: &models.ComponentRef{Name: "wait"}}), + }, + { + NodeID: displayNode, + Name: displayNode, + Type: models.NodeTypeComponent, + Ref: datatypes.NewJSONType(models.NodeRef{Component: &models.ComponentRef{Name: "display"}}), + }, + }, + []models.Edge{ + {SourceID: triggerNode, TargetID: fanOutNode, Channel: "default"}, + {SourceID: fanOutNode, TargetID: waitNode, Channel: "item"}, + {SourceID: waitNode, TargetID: displayNode, Channel: "default"}, + }, + ) + + rootEvent := support.EmitCanvasEventForNodeWithData( + t, + canvas.ID, + triggerNode, + "default", + nil, + map[string]any{"items": []any{"a", "b", "c"}}, + ) + + fanOutExecution := support.CreateCanvasNodeExecution(t, canvas.ID, fanOutNode, rootEvent.ID, rootEvent.ID, nil) + + now := time.Now() + emitItem := func(item string, index int) *models.CanvasEvent { + return support.EmitCanvasEventForNodeWithData( + t, + canvas.ID, + fanOutNode, + "item", + &fanOutExecution.ID, + map[string]any{"item": item, "index": index, "totalCount": 3}, + ) + } + + _ = emitItem("a", 0) + branchEvent := emitItem("b", 1) + _ = emitItem("c", 2) + + // Align timestamps like PassInTransaction (same CreatedAt for all branch events). + require.NoError(t, database.Conn().Model(&models.CanvasEvent{}). + Where("execution_id = ?", fanOutExecution.ID). + Update("created_at", now).Error) + + waitInput := map[string]any{"item": "b", "index": 1, "totalCount": 3} + waitExecution := support.CreateNextNodeExecution(t, canvas.ID, waitNode, rootEvent.ID, branchEvent.ID, &fanOutExecution.ID) + + builder := NewNodeConfigurationBuilder(database.Conn(), canvas.ID). + WithPreviousExecution(&waitExecution.ID). + WithRootEvent(&rootEvent.ID). + WithInput(map[string]any{waitNode: waitInput}) + + result, err := builder.Build(map[string]any{ + "item": "{{ $[\"" + fanOutNode + "\"].item }}", + }) + require.NoError(t, err) + assert.Equal(t, "b", result["item"]) +} diff --git a/web_src/src/pages/workflowv2/mappers/fanOut.ts b/web_src/src/pages/workflowv2/mappers/fanOut.ts new file mode 100644 index 0000000000..de7c397a2b --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/fanOut.ts @@ -0,0 +1,153 @@ +import type { + ComponentBaseContext, + ComponentBaseMapper, + EventStateRegistry, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + StateFunction, + SubtitleContext, +} from "./types"; +import type { ComponentBaseProps, EventSection, EventState, EventStateMap } from "@/ui/componentBase"; +import { DEFAULT_EVENT_STATE_MAP } from "@/ui/componentBase"; +import { getTriggerRenderer, getState, getStateMap } from "."; +import type React from "react"; +import { getBackgroundColorClass } from "@/lib/colors"; +import { renderTimeAgo } from "@/components/TimeAgo"; +import { formatTimestampInUserTimezone } from "@/lib/timezone"; + +type FanOutOutputs = Record; + +export const FAN_OUT_STATE_MAP: EventStateMap = { + ...DEFAULT_EVENT_STATE_MAP, + passed: { + icon: "circle-check", + textColor: "text-gray-800", + backgroundColor: "bg-green-100", + badgeColor: "bg-emerald-500", + }, + rejected: { + icon: "circle-x", + textColor: "text-gray-800", + backgroundColor: "bg-gray-100", + badgeColor: "bg-gray-500", + }, +}; + +export const fanOutStateFunction: StateFunction = (execution: ExecutionInfo): EventState => { + if (!execution) return "neutral"; + + if ( + execution.resultMessage && + (execution.resultReason === "RESULT_REASON_ERROR" || + (execution.result === "RESULT_FAILED" && execution.resultReason !== "RESULT_REASON_ERROR_RESOLVED")) + ) { + return "error"; + } + + if (execution.result === "RESULT_CANCELLED") { + return "cancelled"; + } + + if (execution.state === "STATE_PENDING" || execution.state === "STATE_STARTED") { + return "running"; + } + + if (execution.state === "STATE_FINISHED" && execution.result === "RESULT_PASSED") { + const outputs = execution.outputs as FanOutOutputs | undefined; + const itemOutputs = outputs?.item; + const hasItems = Array.isArray(itemOutputs) && itemOutputs.length > 0; + return hasItems ? "passed" : "rejected"; + } + + return "failed"; +}; + +export const FAN_OUT_STATE_REGISTRY: EventStateRegistry = { + stateMap: FAN_OUT_STATE_MAP, + getState: fanOutStateFunction, +}; + +type FanOutConfiguration = { + arrayExpression: string; +}; + +type FanOutMetadata = { + arrayExpression?: string; + count?: number; +}; + +export const fanOutMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const componentName = context.componentDefinition.name || "fanOut"; + const configuration = context.node.configuration as FanOutConfiguration; + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const specs = configuration.arrayExpression + ? [ + { + title: "Array", + tooltipTitle: "Array expression", + value: configuration.arrayExpression, + }, + ] + : undefined; + + return { + iconSlug: "split", + collapsed: context.node.isCollapsed, + collapsedBackground: getBackgroundColorClass("white"), + title: context.node.name || context.componentDefinition.label || context.componentDefinition.name || "Fan Out", + eventSections: lastExecution ? getFanOutEventSections(context.nodes, lastExecution, componentName) : undefined, + includeEmptyState: !lastExecution, + specs, + eventStateMap: getStateMap(componentName), + }; + }, + + subtitle(context: SubtitleContext): string | React.ReactNode { + if (!context.execution.createdAt) return ""; + return renderTimeAgo(new Date(context.execution.createdAt)); + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const configuration = context.execution.configuration as FanOutConfiguration; + const metadata = context.execution.metadata as FanOutMetadata | undefined; + const details: Record = { + "Evaluated at": context.execution.createdAt ? formatTimestampInUserTimezone(context.execution.createdAt) : "-", + "Array expression": configuration.arrayExpression ?? metadata?.arrayExpression ?? "-", + }; + + if (typeof metadata?.count === "number") { + details["Items emitted"] = metadata.count; + } + + return details; + }, +}; + +function getFanOutEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootEvent = execution.rootEvent; + if (!rootEvent?.id || !execution.createdAt) { + return []; + } + + const rootTriggerNode = nodes.find((n) => n.id === rootEvent.nodeId); + if (!rootTriggerNode?.componentName) { + return []; + } + + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode.componentName); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: rootEvent }); + const createdAt = new Date(execution.createdAt); + + return [ + { + receivedAt: createdAt, + eventTitle: title, + eventSubtitle: renderTimeAgo(createdAt), + eventState: getState(componentName)(execution), + eventId: rootEvent.id, + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/index.ts b/web_src/src/pages/workflowv2/mappers/index.ts index d346a40de5..efb1c7d5f3 100644 --- a/web_src/src/pages/workflowv2/mappers/index.ts +++ b/web_src/src/pages/workflowv2/mappers/index.ts @@ -248,6 +248,7 @@ import { } from "./oci/index"; import { filterMapper, FILTER_STATE_REGISTRY } from "./filter"; +import { fanOutMapper, FAN_OUT_STATE_REGISTRY } from "./fanOut"; import { sshMapper, SSH_STATE_REGISTRY } from "./ssh"; import { runnerMapper, RUNNER_STATE_REGISTRY } from "./runner"; import { waitCustomFieldRenderer, waitMapper, WAIT_STATE_REGISTRY } from "./wait"; @@ -284,6 +285,7 @@ const componentBaseMappers: Record = { runner: runnerMapper, timeGate: timeGateMapper, filter: filterMapper, + fanOut: fanOutMapper, wait: waitMapper, approval: approvalMapper, merge: mergeMapper, @@ -437,6 +439,7 @@ const eventStateRegistries: Record = { ssh: SSH_STATE_REGISTRY, runner: RUNNER_STATE_REGISTRY, filter: FILTER_STATE_REGISTRY, + fanOut: FAN_OUT_STATE_REGISTRY, if: IF_STATE_REGISTRY, timeGate: TIME_GATE_STATE_REGISTRY, wait: WAIT_STATE_REGISTRY,