diff --git a/package-lock.json b/package-lock.json index 8dc971d..0d539d3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "sfn-sim", - "version": "1.5.1", + "version": "2.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "sfn-sim", - "version": "1.5.1", + "version": "2.0.0", "license": "ISC", "dependencies": { "@wmfs/statelint": "^1.30.0", diff --git a/src/errors.js b/src/errors.js index 6112dd7..193ae99 100644 --- a/src/errors.js +++ b/src/errors.js @@ -90,8 +90,11 @@ class ExceedToleratedFailureThresholdError extends RuntimeError { } class ItemReaderFailedError extends RuntimeError { - constructor() { - super('A Map state failed to read all items as specified by the "ItemReader" field.'); + constructor(cause) { + const message = cause + ? `A Map state failed to read all items as specified by the "ItemReader" field. ${cause}` + : 'A Map state failed to read all items as specified by the "ItemReader" field.'; + super(message); this.name = 'States.ItemReaderFailed'; } } @@ -113,5 +116,7 @@ export { TaskFailedError, NoChoiceMatchedError, IntrinsicFailureError, + ExceedToleratedFailureThresholdError, + ItemReaderFailedError, ERROR_WILDCARD, }; diff --git a/src/executors.js b/src/executors.js index 4fd16e0..e960df4 100644 --- a/src/executors.js +++ b/src/executors.js @@ -1,7 +1,15 @@ import { runJSONPathChoice, runJSONataChoice } from './choice.js'; -import { RuntimeError, FailError, ERROR_WILDCARD } from './errors.js'; +import { v4 as uuidV4 } from 'uuid'; +import { + RuntimeError, + FailError, + ExceedToleratedFailureThresholdError, + ItemReaderFailedError, + ERROR_WILDCARD, +} from './errors.js'; import runTask from './task.js'; import { getValue, applyPayloadTemplate, getStateResult, wait, evaluateJSONata, getJSONataInput, getJSONataOutput, assign } from './utils.js'; +import { createHash } from 'node:crypto'; /* * JSONata @@ -106,17 +114,260 @@ const executeParallelJSONata = async (state, variables, simulatorContext) => { return [output, next]; }; +const getItemReaderPointerValue = (value, pointer) => { + if (!pointer || pointer === '/') { + return value; + } + + if (!pointer.startsWith('/')) { + throw new RuntimeError('ItemReader ReaderConfig.ItemsPointer must start with "/"'); + } + + const segments = pointer + .split('/') + .slice(1) + .map((segment) => segment.replace(/~1/g, '/').replace(/~0/g, '~')); + + return segments.reduce((current, segment) => { + if (current === undefined || current === null) { + throw new RuntimeError('ItemReader ReaderConfig.ItemsPointer did not match any data'); + } + return current[segment]; + }, value); +}; + +const shapeS3ObjectListItem = (entry, lastModified) => { + if (typeof entry.body !== 'string') { + throw new RuntimeError('S3 object body must be a string'); + } + + return { + ETag: createHash('md5').update(entry.body).digest('hex'), + Key: entry.key, + LastModified: lastModified, + Size: new TextEncoder().encode(entry.body).length, + StorageClass: 'STANDARD', + }; +}; + +const mapSettledResultsToOutput = (settledResults) => settledResults.map((entry) => { + if (entry.status === 'fulfilled') { + return entry.value; + } + + const reason = entry.reason; + if (reason?.toErrorOutput) { + return reason.toErrorOutput(); + } + + return { + Error: reason?.name || 'Error', + Cause: reason?.stack || reason?.message, + }; +}); + +const isFailureThresholdExceeded = ({ + totalItems, + failedItems, + toleratedFailureCount, + toleratedFailurePercentage, +}) => { + const failedPercentage = totalItems === 0 ? 0 : (failedItems / totalItems) * 100; + const countExceeded = toleratedFailureCount !== null && failedItems > toleratedFailureCount; + const percentageExceeded = toleratedFailurePercentage !== null && failedPercentage > toleratedFailurePercentage; + + return countExceeded || percentageExceeded; +}; + +const getMapChildExecutionContext = (state, variables, itemInput) => { + if (state.ItemProcessor.ProcessorConfig?.Mode !== 'DISTRIBUTED') { + return variables.states.context.Execution; + } + + const mapRunId = uuidV4(); + const parentExecutionName = variables.states.context.Execution?.Name; + const mapStateIdentifier = (state.Label || variables.states.context.State?.Name || 'Map').replace(/\s+/g, ''); + const childExecutionName = parentExecutionName + ? `${parentExecutionName}/${mapStateIdentifier}:${mapRunId}` + : `${mapStateIdentifier}:${mapRunId}`; + + return { + Id: mapRunId, + Input: itemInput, + Name: childExecutionName, + StartTime: new Date().toISOString(), + RedriveCount: 0, + }; +}; + +const getMapItemsFromItemReaderJSONata = async (state, variables, simulatorContext) => { + try { + const { ItemReader } = state; + const readerInput = ItemReader.Arguments + ? await evaluateJSONata(ItemReader.Arguments, variables) + : (ItemReader.Parameters || variables.states.input); + + const s3Resource = simulatorContext.resources.find( + ({ service, name }) => service === 's3' && name === readerInput.Bucket, + ); + + if (!s3Resource) { + throw new RuntimeError('ItemReader failed to find configured S3 bucket resource'); + } + + let items; + const isS3GetObjectReader = [ + 'arn:aws:states:::s3:getObject', + 'arn:aws:states:::aws-sdk:s3:getObject', + ].includes(ItemReader.Resource); + const isS3ListObjectsReader = [ + 'arn:aws:states:::s3:listObjectsV2', + 'arn:aws:states:::aws-sdk:s3:listObjectsV2', + ].includes(ItemReader.Resource); + + if (isS3GetObjectReader) { + // TODO JSONL, CSV, MANIFEST, PARQUET + const inputType = ItemReader.ReaderConfig?.InputType || 'JSON'; + if (inputType !== 'JSON') { + throw new RuntimeError(`Unsupported ItemReader ReaderConfig.InputType [${inputType}]`); + } + + const object = s3Resource.objects?.find((entry) => entry.key === readerInput.Key); + + if (!object) { + throw new RuntimeError('ItemReader failed to load the configured S3 object'); + } + + items = object.body; + if (typeof items === 'string') { + items = JSON.parse(items); + } else { + throw new RuntimeError('ItemReader S3 object body must be a string'); + } + + const itemsPointer = ItemReader.ReaderConfig?.ItemsPointer; + if (itemsPointer) { + items = getItemReaderPointerValue(items, itemsPointer); + } + } else if (isS3ListObjectsReader) { + + const prefix = readerInput.Prefix || ''; + const now = new Date().toISOString(); + + // TODO Support Transformation === "LOAD_AND_FLATTEN" + + items = (s3Resource.objects || []) + .filter((entry) => entry.key.startsWith(prefix)) + .map((entry) => shapeS3ObjectListItem(entry, now)); + } else { + throw new RuntimeError(`Unsupported ItemReader Resource [${ItemReader.Resource}]`); + } + + if (!Array.isArray(items)) { + throw new RuntimeError('ItemReader must resolve to an array of items'); + } + + const maxItems = ItemReader.ReaderConfig?.MaxItems !== undefined + ? await evaluateJSONata(ItemReader.ReaderConfig.MaxItems, variables) + : null; + + if (maxItems !== null && maxItems !== undefined) { + return items.slice(0, maxItems); + } + + return items; + } catch (error) { + if (error instanceof ItemReaderFailedError) { + throw error; + } + throw new ItemReaderFailedError(error?.message || error); + } +}; + +const getMapItemsJSONata = async (state, variables, simulatorContext) => { + if (!state.ItemReader) { + return evaluateJSONata(state.Items, variables); + } + + if (state.ItemProcessor.ProcessorConfig?.Mode !== 'DISTRIBUTED') { + throw new RuntimeError('ItemReader is not supported for INLINE map states'); + } + + const itemReaderItems = await getMapItemsFromItemReaderJSONata(state, variables, simulatorContext); + + if (!state.Items) { + return itemReaderItems; + } + + const itemReaderVariables = { + ...variables, + states: { + ...variables.states, + input: itemReaderItems, + }, + }; + + return evaluateJSONata(state.Items, itemReaderVariables); +}; + +const applyItemBatcherJSONata = async (itemBatcher, items, variables) => { + if (!itemBatcher) { + return items; + } + + // TODO MaxInputBytesPerBatch + + let maxItemsPerBatch = null; + if (itemBatcher.MaxItemsPerBatch !== undefined) { + maxItemsPerBatch = await evaluateJSONata(itemBatcher.MaxItemsPerBatch, variables); + } + + if (maxItemsPerBatch !== null && (!Number.isInteger(maxItemsPerBatch) || maxItemsPerBatch <= 0)) { + throw new RuntimeError('ItemBatcher MaxItemsPerBatch must resolve to a positive integer'); + } + + const batchInput = itemBatcher.BatchInput !== undefined + ? await evaluateJSONata(itemBatcher.BatchInput, variables) + : undefined; + + const batches = []; + const batchSize = maxItemsPerBatch || items.length || 1; + for (let index = 0; index < items.length; index += batchSize) { + const batch = { + Items: items.slice(index, index + batchSize), + }; + + if (batchInput !== undefined) { + batch.BatchInput = batchInput; + } + + batches.push(batch); + } + + return batches; +}; + const executeMapJSONata = async (state, variables, simulatorContext) => { - const items = await evaluateJSONata(state.Items, variables); + // Map-level JSONata fields should resolve $states.input to the map state's input. + const parentInput = variables.states.input; + + // Get the items to process, either from Items or via an ItemReader + let items = await getMapItemsJSONata(state, variables, simulatorContext); - // TODO ItemReader, ItemBatcher, ResultWriter, ToleratedFailure + // If there is only a single item returned by the JSONata expression, wrap it in an array. + // This is AWS Step Function behaviour to coerce scalar expressions to single element arrays to facilitate mapping. + if (!Array.isArray(items)) { + items = [items]; + } - const executions = items.map(async (Value, Index) => { + // Resolve ItemSelector against the items to process + const selectedItems = await Promise.all(items.map(async (Value, Index) => { const itemVariables = { ...variables, states: { ...variables.states, - input: Value, + // $states.input is the input to the map state when ItemSelector is resolving the items to process. + input: parentInput, context: { ...variables.states.context, State: { @@ -134,15 +385,85 @@ const executeMapJSONata = async (state, variables, simulatorContext) => { }; if (state.ItemSelector) { - const input = await evaluateJSONata(state.ItemSelector, itemVariables); - itemVariables.states.input = input; - itemVariables.states.context.Execution.Input = input; + return evaluateJSONata(state.ItemSelector, itemVariables); } + return Value; + })); + + // Batch the items if an ItemBatcher is configured + const executionInputs = await applyItemBatcherJSONata(state.ItemBatcher, selectedItems, variables); + + // Execute the child workflows for each item or batch + const executions = executionInputs.map(async (Value, Index) => { + const childExecution = getMapChildExecutionContext(state, variables, Value); + + const itemVariables = { + ...variables, + states: { + ...variables.states, + // $states.input is the input to the child workflow when it is executing. + input: Value, + context: { + ...variables.states.context, + Execution: childExecution, + State: { + ...variables.states.context.State, + Name: state.ItemProcessor.StartAt, + }, + Map: { + Item: { + Index, + Value, + }, + }, + }, + }, + }; return executeStateMachine(state.ItemProcessor, itemVariables, simulatorContext); }); - const result = await Promise.all(executions); + // Check failure thresholds + + // TODO MaxConcurrency + const settledResults = await Promise.allSettled(executions); + const failedResults = settledResults.filter((entry) => entry.status === 'rejected'); + + const toleratedFailureCount = state.ToleratedFailureCount !== undefined + ? await evaluateJSONata(state.ToleratedFailureCount, variables) + : null; + const toleratedFailurePercentage = state.ToleratedFailurePercentage !== undefined + ? await evaluateJSONata(state.ToleratedFailurePercentage, variables) + : null; + + const hasFailureThreshold = toleratedFailureCount !== null || toleratedFailurePercentage !== null; + if (failedResults.length > 0 && !hasFailureThreshold) { + throw failedResults[0].reason; + } + + if (toleratedFailureCount !== null && (!Number.isInteger(toleratedFailureCount) || toleratedFailureCount < 0)) { + throw new RuntimeError('ToleratedFailureCount must resolve to a non-negative integer'); + } + if ( + toleratedFailurePercentage !== null + && (typeof toleratedFailurePercentage !== 'number' || toleratedFailurePercentage < 0 || toleratedFailurePercentage > 100) + ) { + throw new RuntimeError('ToleratedFailurePercentage must resolve to a number between 0 and 100'); + } + + const totalItems = settledResults.length; + const failedItems = failedResults.length; + if (isFailureThresholdExceeded({ + totalItems, + failedItems, + toleratedFailureCount, + toleratedFailurePercentage, + })) { + throw new ExceedToleratedFailureThresholdError(); + } + + // TODO ResultWriter + const result = mapSettledResultsToOutput(settledResults); variables.states.result = result; @@ -274,14 +595,205 @@ const executeParallelJSONPath = async (state, variables, simulatorContext) => { return [stateOutput, next]; }; + +const getMapItemsFromItemReaderJSONPath = (state, stateInput, context, simulatorContext) => { + try { + const { ItemReader } = state; + const readerInput = ItemReader.Parameters + ? applyPayloadTemplate(stateInput, context, ItemReader.Parameters) + : stateInput; + + const s3Resource = simulatorContext.resources.find( + ({ service, name }) => service === 's3' && name === readerInput.Bucket, + ); + + if (!s3Resource) { + throw new RuntimeError('ItemReader failed to find configured S3 bucket resource'); + } + + let items; + const isS3GetObjectReader = [ + 'arn:aws:states:::s3:getObject', + 'arn:aws:states:::aws-sdk:s3:getObject', + ].includes(ItemReader.Resource); + const isS3ListObjectsReader = [ + 'arn:aws:states:::s3:listObjectsV2', + 'arn:aws:states:::aws-sdk:s3:listObjectsV2', + ].includes(ItemReader.Resource); + + if (isS3GetObjectReader) { + const inputType = ItemReader.ReaderConfig?.InputType || 'JSON'; + if (inputType !== 'JSON') { + throw new RuntimeError(`Unsupported ItemReader ReaderConfig.InputType [${inputType}]`); + } + + const object = s3Resource.objects?.find((entry) => entry.key === readerInput.Key); + if (!object) { + throw new RuntimeError('ItemReader failed to load the configured S3 object'); + } + + items = object.body; + if (typeof items === 'string') { + items = JSON.parse(items); + } else { + throw new RuntimeError('ItemReader S3 object body must be a string'); + } + + const itemsPointer = ItemReader.ReaderConfig?.ItemsPointer; + if (itemsPointer) { + items = getItemReaderPointerValue(items, itemsPointer); + } + } else if (isS3ListObjectsReader) { + const prefix = readerInput.Prefix || ''; + const now = new Date().toISOString(); + items = (s3Resource.objects || []) + .filter((entry) => entry.key.startsWith(prefix)) + .map((entry) => shapeS3ObjectListItem(entry, now)); + } else { + throw new RuntimeError(`Unsupported ItemReader Resource [${ItemReader.Resource}]`); + } + + if (!Array.isArray(items)) { + throw new RuntimeError('ItemReader must resolve to an array of items'); + } + + const maxItems = ItemReader.ReaderConfig?.MaxItems || null; + const maxItemsPath = ItemReader.ReaderConfig?.MaxItemsPath || null; + + if (maxItemsPath !== null && maxItems !== null) { + throw new RuntimeError('ItemReader ReaderConfig.MaxItems and ReaderConfig.MaxItemsPath cannot be used together'); + } + + let maxItemsValue = null; + if (maxItemsPath !== null) { + maxItemsValue = getValue(stateInput, maxItemsPath); + } else if (maxItems !== null) { + if (maxItems !== null && (!Number.isInteger(maxItems) || maxItems <= 0)) { + throw new RuntimeError('ItemReader ReaderConfig.MaxItems must resolve to a positive integer'); + } + maxItemsValue = maxItems; + } + + if (maxItemsValue !== null) { + return items.slice(0, maxItemsValue); + } + + return items; + } catch (error) { + if (error instanceof ItemReaderFailedError) { + throw error; + } + throw new ItemReaderFailedError(error?.message || error); + } +}; + +const getMapItemsJSONPath = (state, stateInput, effectiveInput, context, simulatorContext) => { + if (!state.ItemReader) { + return getValue(effectiveInput, state.ItemsPath); + } + + if (state.ItemProcessor.ProcessorConfig?.Mode !== 'DISTRIBUTED') { + throw new RuntimeError('ItemReader is not supported for INLINE map states'); + } + + const itemReaderItems = getMapItemsFromItemReaderJSONPath(state, stateInput, context, simulatorContext); + + if (!state.ItemsPath) { + return itemReaderItems; + } + + return getValue(itemReaderItems, state.ItemsPath); +}; + +const applyItemBatcherJSONPath = (itemBatcher, items, stateInput, context) => { + if (!itemBatcher) { + return items; + } + + // TODO MaxInputBytesPerBatch, MaxInputBytesPerBatchPath + + const maxItemsPerBatch = itemBatcher.MaxItemsPerBatch ?? null; + const maxItemsPerBatchPath = itemBatcher.MaxItemsPerBatchPath ?? null; + + if (maxItemsPerBatch !== null && maxItemsPerBatchPath !== null) { + throw new RuntimeError('ItemBatcher MaxItemsPerBatch and MaxItemsPerBatchPath cannot be used together'); + } + + let maxItemsPerBatchValue; + + if (maxItemsPerBatchPath !== null) { + maxItemsPerBatchValue = getValue(stateInput, maxItemsPerBatchPath); + } else if (maxItemsPerBatch !== null) { + if (maxItemsPerBatch !== null && (!Number.isInteger(maxItemsPerBatch) || maxItemsPerBatch <= 0)) { + throw new RuntimeError('ItemBatcher MaxItemsPerBatch must resolve to a positive integer'); + } + + maxItemsPerBatchValue = maxItemsPerBatch; + } + + const batchInput = itemBatcher.BatchInput !== undefined + ? applyPayloadTemplate(stateInput, context, itemBatcher.BatchInput) + : undefined; + + const batches = []; + const batchSize = maxItemsPerBatchValue || items.length || 1; + for (let index = 0; index < items.length; index += batchSize) { + const batch = { + Items: items.slice(index, index + batchSize), + }; + if (batchInput !== undefined) { + batch.BatchInput = batchInput; + } + batches.push(batch); + } + + return batches; +}; + const executeMapJSONPath = async (state, variables, simulatorContext) => { const rawInput = variables.states.input; const stateInput = getValue(rawInput, state.InputPath); const effectiveInput = applyPayloadTemplate(stateInput, variables.states.context, state.Parameters); + let items = getMapItemsJSONPath(state, stateInput, effectiveInput, variables.states.context, simulatorContext); + + // If there is only a single item returned by the JSONpath evaluation, wrap it in an array. + // This is AWS Step Function behaviour to coerce scalar expressions to single element arrays to facilitate mapping. + if (!Array.isArray(items)) { + items = [items]; + } + + const selectedItems = items.map((Value, Index) => { + if (!state.ItemSelector) { + return Value; + } - const items = getValue(effectiveInput, state.ItemsPath); + const itemContext = { + ...variables.states.context, + State: { + ...variables.states.context.State, + Name: state.ItemProcessor.StartAt, + }, + Map: { + Item: { + Index, + Value, + }, + }, + }; + + return applyPayloadTemplate(effectiveInput, itemContext, state.ItemSelector); + }); + + const executionInputs = applyItemBatcherJSONPath( + state.ItemBatcher, + selectedItems, + effectiveInput, + variables.states.context, + ); + + const executions = executionInputs.map((Value, Index) => { + const childExecution = getMapChildExecutionContext(state, variables, Value); - const executions = items.map((Value, Index) => { const itemVariables = { ...variables, states: { @@ -289,6 +801,7 @@ const executeMapJSONPath = async (state, variables, simulatorContext) => { input: Value, context: { ...variables.states.context, + Execution: childExecution, State: { ...variables.states.context.State, Name: state.ItemProcessor.StartAt, @@ -302,12 +815,44 @@ const executeMapJSONPath = async (state, variables, simulatorContext) => { }, }, }; - return executeStateMachine(state.ItemProcessor, itemVariables, simulatorContext); }); - const result = await Promise.all(executions); + const settledResults = await Promise.allSettled(executions); + const failedResults = settledResults.filter((entry) => entry.status === 'rejected'); + + const toleratedFailureCount = state.ToleratedFailureCount + ?? (state.ToleratedFailureCountPath ? getValue(effectiveInput, state.ToleratedFailureCountPath) : null); + const toleratedFailurePercentage = state.ToleratedFailurePercentage + ?? (state.ToleratedFailurePercentagePath ? getValue(effectiveInput, state.ToleratedFailurePercentagePath) : null); + + const hasFailureThreshold = toleratedFailureCount !== null || toleratedFailurePercentage !== null; + if (failedResults.length > 0 && !hasFailureThreshold) { + throw failedResults[0].reason; + } + + if (toleratedFailureCount !== null && (!Number.isInteger(toleratedFailureCount) || toleratedFailureCount < 0)) { + throw new RuntimeError('ToleratedFailureCount must resolve to a non-negative integer'); + } + if ( + toleratedFailurePercentage !== null + && (typeof toleratedFailurePercentage !== 'number' || toleratedFailurePercentage < 0 || toleratedFailurePercentage > 100) + ) { + throw new RuntimeError('ToleratedFailurePercentage must resolve to a number between 0 and 100'); + } + + const totalItems = settledResults.length; + const failedItems = failedResults.length; + if (isFailureThresholdExceeded({ + totalItems, + failedItems, + toleratedFailureCount, + toleratedFailurePercentage, + })) { + throw new ExceedToleratedFailureThresholdError(); + } + const result = mapSettledResultsToOutput(settledResults); const effectiveResult = applyPayloadTemplate(result, variables.states.context, state.ResultSelector); const stateResult = getStateResult(rawInput, effectiveResult, state.ResultPath); const stateOutput = getValue(stateResult, state.OutputPath); diff --git a/tests/map-jsonata.test.js b/tests/map-jsonata.test.js new file mode 100644 index 0000000..3cbd83e --- /dev/null +++ b/tests/map-jsonata.test.js @@ -0,0 +1,1643 @@ +import { describe, test, expect } from "vitest"; +import { v4 as uuidV4 } from "uuid"; +import { executeStateMachine } from "../src/executors.js"; +import { load } from "../src/index.js"; +import { defaultOptions } from "../src/options.js"; + +const getVariables = (definition, input) => ({ + states: { + input, + context: { + Execution: { + Id: uuidV4(), + Input: input, + Name: "test-execution", + StartTime: new Date().toISOString(), + }, + State: { + Name: definition.StartAt, + }, + StateMachine: { + Id: uuidV4(), + Name: "test-state-machine", + }, + Task: {}, + }, + }, +}); + +const getSimulatorContext = (overrides = {}) => ({ + resources: [], + options: defaultOptions, + queryLanguage: "JSONata", + ...overrides, +}); + +describe("Map state (JSONata)", () => { + describe("Execution and context", () => { + test("uses parent map input for $states.input and map item value via context", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input.items %}", + ItemSelector: { + requestId: "{% $states.input.requestId %}", + number: "{% $states.context.Map.Item.Value.number %}", + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const input = { + requestId: "req-123", + items: [{ number: 1 }, { number: 2 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext() + ); + + expect(result).toEqual([ + { requestId: "req-123", number: 1 }, + { requestId: "req-123", number: 2 }, + ]); + }); + + test("keeps parent Execution context for INLINE map iterations", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "Map Step", + States: { + "Map Step": { + Type: "Map", + Items: "{% $states.input %}", + ItemProcessor: { + StartAt: "InspectExecutionInput", + States: { + InspectExecutionInput: { + Type: "Pass", + Output: { + item: "{% $states.context.Map.Item.Value %}", + executionInput: "{% $states.context.Execution.Input %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext() + ); + + expect(result).toEqual([ + { item: 1, executionInput: [1, 2, 3] }, + { item: 2, executionInput: [1, 2, 3] }, + { item: 3, executionInput: [1, 2, 3] }, + ]); + }); + + test("creates per-item Execution context for DISTRIBUTED map iterations", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "My Map Step", + States: { + "My Map Step": { + Type: "Map", + Label: "Custom Label Name", + Items: "{% $states.input %}", + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "InspectDistributedExecution", + States: { + InspectDistributedExecution: { + Type: "Task", + Resource: + "arn:aws:lambda:::function:inspect-distributed-execution", + Output: { + item: "{% $states.input %}", + executionInput: "{% $states.context.Execution.Input %}", + executionId: "{% $states.context.Execution.Id %}", + executionName: "{% $states.context.Execution.Name %}", + executionStartTime: + "{% $states.context.Execution.StartTime %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "inspect-distributed-execution", + function: async (input) => { + const delayMs = (4 - input) * 10; + await new Promise((resolve) => setTimeout(resolve, delayMs)); + return input; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(3); + expect(result).toEqual([ + expect.objectContaining({ + item: 1, + executionInput: 1, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 2, + executionInput: 2, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 3, + executionInput: 3, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + ]); + + for (const entry of result) { + expect(entry.executionId).toEqual(expect.any(String)); + expect(entry.executionStartTime).toEqual(expect.any(String)); + } + + const uniqueIds = new Set(result.map((entry) => entry.executionId)); + expect(uniqueIds.size).toBe(3); + }); + }); + + describe("ItemReader", () => { + test("reads items using ItemReader before running item processor", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Arguments: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r1" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("resolves ItemReader Arguments Bucket and Key from state input", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Arguments: { + Bucket: "{% $states.input.reader.bucket %}", + Key: "{% $states.input.reader.key %}", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { + key: "dynamic/items.json", + body: JSON.stringify([{ id: "dyn-a" }, { id: "dyn-b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + key: "dynamic/items.json", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "dyn-a" }, { id: "dyn-b" }]); + }); + + test("reads nested items using ItemReader ReaderConfig.ItemsPointer", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "/payload/records", + }, + Arguments: { + Bucket: "source-bucket", + Key: "nested.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "nested.json", + body: JSON.stringify({ + payload: { + records: [{ id: "nested-a" }, { id: "nested-b" }], + }, + }), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r2" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "nested-a" }, { id: "nested-b" }]); + }); + + test("limits item count using ItemReader ReaderConfig.MaxItems", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItems: 2, + }, + Arguments: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r3" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("reads a list of S3 objects from a prefix using ItemReader listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + { key: "images/pic.jpg", body: "binary-bytes" }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "prefix-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result).toEqual([ + expect.objectContaining({ + Key: "logs/2024/a.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + expect.objectContaining({ + Key: "logs/2024/b.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + ]); + expect(result[0].Size).toEqual(expect.any(Number)); + expect(result[1].Size).toEqual(expect.any(Number)); + expect(result[0].LastModified).toEqual(expect.any(String)); + expect(result[1].LastModified).toEqual(expect.any(String)); + }); + + test("resolves ItemReader Arguments Bucket and Prefix from state input for listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "{% $states.input.reader.bucket %}", + Prefix: "{% $states.input.reader.prefix %}", + }, + }, + ItemSelector: { + key: "{% $states.context.Map.Item.Value.Key %}", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { key: "logs/2025/a.json", body: '{"a":1}' }, + { key: "logs/2025/b.json", body: '{"b":2}' }, + { key: "logs/2024/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + prefix: "logs/2025/", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { key: "logs/2025/a.json" }, + { key: "logs/2025/b.json" }, + ]); + }); + + test("applies ItemSelector to listObjectsV2 metadata items", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemSelector: { + key: "{% $states.context.Map.Item.Value.Key %}", + bytes: "{% $states.context.Map.Item.Value.Size %}", + class: "{% $states.context.Map.Item.Value.StorageClass %}", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "selector-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { + key: "logs/2024/a.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + { + key: "logs/2024/b.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + ]); + }); + + test("fails when ItemReader is configured on INLINE map", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { requestId: "inline-item-reader" }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader bucket is missing", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader key is not found in S3", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { + Bucket: "source-bucket", + Key: "missing.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.json", body: JSON.stringify([1, 2]) }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("returns an empty array when ItemReader listObjectsV2 prefix has no matching objects", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "source-bucket", + Prefix: "missing-prefix/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "logs/a.json", body: "{}" }], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([]); + }); + + test("fails with States.ItemReaderFailed when ItemReader InputType is unsupported", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "CSV", + }, + Arguments: { + Bucket: "source-bucket", + Key: "items.csv", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.csv", body: "a,b,c" }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader ItemsPointer is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "payload/records", + }, + Arguments: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify({ payload: { records: [1, 2] } }), + }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader getObject payload is not an array", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Arguments: { + Bucket: "source-bucket", + Key: "not-array.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "not-array.json", body: JSON.stringify({ id: "x" }) }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + }); + + describe("ItemBatcher", () => { + test("batches items with ItemBatcher before child workflow execution", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input.items %}", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: { + source: "{% $states.input.requestId %}", + }, + }, + ItemProcessor: { + StartAt: "SummariseBatch", + States: { + SummariseBatch: { + Type: "Pass", + Output: { + source: "{% $states.input.BatchInput.source %}", + batchCount: "{% $count($states.input.Items) %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const input = { + requestId: "req-77", + items: [{ id: 1 }, { id: 2 }, { id: 3 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext() + ); + + expect(result).toEqual([ + { source: "req-77", batchCount: 2 }, + { source: "req-77", batchCount: 1 }, + ]); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is zero", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 0, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is not an integer", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 1.5, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher BatchInput JSONata expression is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: "{% $notAFunction( %}", + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toBeDefined(); + }); + + test("returns empty output when ItemBatcher is configured and there are no items", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 2, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, []), + getSimulatorContext() + ); + + expect(result).toEqual([]); + }); + }); + + describe("ToleratedFailure*", () => { + test("fails with States.ExceedToleratedFailureThreshold when tolerated failures are exceeded", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when tolerated failure percentage is exceeded", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailurePercentage: 40, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: true }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("returns failed iteration error payloads when failures are tolerated", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + Output: "{% $states.result.Payload %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { id: input.id, ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: false }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ id: 1, ok: true }); + expect(result[1]).toMatchObject({ + Error: "States.TaskFailed", + Cause: expect.any(String), + }); + expect(result[1].Cause).toContain("boom"); + expect(result[2]).toEqual({ id: 3, ok: true }); + }); + + test("throws original task failure when items fail and no tolerated failure threshold is configured", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.TaskFailed" }); + }); + + test("fails when ToleratedFailureCount is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: -1, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailurePercentage is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailurePercentage: 120, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailureCount expression resolves to undefined", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input.items %}", + ToleratedFailureCount: "{% $states.input.missingThreshold %}", + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { items: [1] }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("does not exceed tolerated failure count when failed item count equals threshold", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + Output: "{% $states.result.Payload %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.fail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [{ fail: true }, { fail: false }]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result[0]).toMatchObject({ Error: "States.TaskFailed" }); + expect(result[1]).toEqual({ ok: true }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when failure count is above threshold", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + }); + + // Statelint (@wmfs/statelint) does not yet recognise QueryLanguage, + // ItemReader, ItemBatcher, ProcessorConfig, or ToleratedFailure*. Until it + // does, callers must use validateDefinition: false to opt out. + describe("load() integration with validateDefinition: false", () => { + test("end-to-end Map with ItemReader, ItemBatcher and ToleratedFailure*", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { Bucket: "input-bucket", Key: "items.json" }, + ReaderConfig: { InputType: "JSON" }, + }, + ItemBatcher: { MaxItemsPerBatch: 2 }, + ToleratedFailureCount: 0, + ToleratedFailurePercentage: 100, + ItemProcessor: { + QueryLanguage: "JSONata", + StartAt: "EchoBatch", + ProcessorConfig: { Mode: "DISTRIBUTED", ExecutionType: "STANDARD" }, + States: { + EchoBatch: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "input-bucket", + objects: [ + { key: "items.json", body: JSON.stringify([{ id: 1 }, { id: 2 }, { id: 3 }]) }, + ], + }, + ]; + + const stateMachine = load(definition, resources, { + validateDefinition: false, + }); + const result = await stateMachine.execute({}); + + expect(result).toEqual([ + { Items: [{ id: 1 }, { id: 2 }] }, + { Items: [{ id: 3 }] }, + ]); + }); + }); +}); diff --git a/tests/map-jsonpath.test.js b/tests/map-jsonpath.test.js new file mode 100644 index 0000000..cbbc8df --- /dev/null +++ b/tests/map-jsonpath.test.js @@ -0,0 +1,1833 @@ +import { describe, test, expect } from "vitest"; +import { v4 as uuidV4 } from "uuid"; +import { executeStateMachine } from "../src/executors.js"; +import { load } from "../src/index.js"; +import { defaultOptions } from "../src/options.js"; + +const getVariables = (definition, input) => ({ + states: { + input, + context: { + Execution: { + Id: uuidV4(), + Input: input, + Name: "test-execution", + StartTime: new Date().toISOString(), + }, + State: { + Name: definition.StartAt, + }, + StateMachine: { + Id: uuidV4(), + Name: "test-state-machine", + }, + Task: {}, + }, + }, +}); + +const getSimulatorContext = (overrides = {}) => ({ + resources: [], + options: defaultOptions, + queryLanguage: "JSONPath", + ...overrides, +}); + +describe("Map state (JSONPath)", () => { + describe("Execution and context", () => { + test("reads items using ItemReader before running item processor", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Parameters: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r1" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("keeps parent Execution context for INLINE map iterations", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "Map Step", + States: { + "Map Step": { + Type: "Map", + ItemsPath: "$", + ItemProcessor: { + StartAt: "InspectExecutionInput", + States: { + InspectExecutionInput: { + Type: "Pass", + Parameters: { + "item.$": "$", + "executionInput.$": "$$.Execution.Input", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext() + ); + + expect(result).toEqual([ + { item: 1, executionInput: [1, 2, 3] }, + { item: 2, executionInput: [1, 2, 3] }, + { item: 3, executionInput: [1, 2, 3] }, + ]); + }); + + test("creates per-item Execution context for DISTRIBUTED map iterations", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "My Map Step", + States: { + "My Map Step": { + Type: "Map", + Label: "Custom Label Name", + ItemsPath: "$", + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "InspectDistributedExecution", + States: { + InspectDistributedExecution: { + Type: "Pass", + Parameters: { + "item.$": "$", + "executionInput.$": "$$.Execution.Input", + "executionId.$": "$$.Execution.Id", + "executionName.$": "$$.Execution.Name", + "executionStartTime.$": "$$.Execution.StartTime", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext() + ); + + expect(result).toEqual([ + expect.objectContaining({ + item: 1, + executionInput: 1, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 2, + executionInput: 2, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 3, + executionInput: 3, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + ]); + + for (const entry of result) { + expect(entry.executionId).toEqual(expect.any(String)); + expect(entry.executionStartTime).toEqual(expect.any(String)); + } + + const uniqueIds = new Set(result.map((entry) => entry.executionId)); + expect(uniqueIds.size).toBe(3); + }); + }); + + describe("ItemReader", () => { + test("resolves ItemReader Parameters Bucket and Key from state input", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Parameters: { + "Bucket.$": "$.reader.bucket", + "Key.$": "$.reader.key", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { + key: "dynamic/items.json", + body: JSON.stringify([{ id: "dyn-a" }, { id: "dyn-b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + key: "dynamic/items.json", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "dyn-a" }, { id: "dyn-b" }]); + }); + + test("reads nested items using ItemReader ReaderConfig.ItemsPointer", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "/payload/records", + }, + Parameters: { + Bucket: "source-bucket", + Key: "nested.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "nested.json", + body: JSON.stringify({ + payload: { + records: [{ id: "nested-a" }, { id: "nested-b" }], + }, + }), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r2" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "nested-a" }, { id: "nested-b" }]); + }); + + test("limits item count using ItemReader ReaderConfig.MaxItems", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItems: 2, + }, + Parameters: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r3" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("limits item count using ItemReader ReaderConfig.MaxItemsPath from state input", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItemsPath: "$.limit", + }, + Parameters: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { limit: 3 }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }, { id: "c" }]); + }); + + test("fails with States.ItemReaderFailed when both MaxItems and MaxItemsPath are set", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItems: 2, + MaxItemsPath: "$.limit", + }, + Parameters: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { Type: "Pass", End: true }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }]), + }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { limit: 1 }), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("reads a list of S3 objects from a prefix using ItemReader listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + { key: "images/pic.jpg", body: "binary-bytes" }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "prefix-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result).toEqual([ + expect.objectContaining({ + Key: "logs/2024/a.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + expect.objectContaining({ + Key: "logs/2024/b.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + ]); + expect(result[0].Size).toEqual(expect.any(Number)); + expect(result[1].Size).toEqual(expect.any(Number)); + expect(result[0].LastModified).toEqual(expect.any(String)); + expect(result[1].LastModified).toEqual(expect.any(String)); + }); + + test("resolves ItemReader Parameters Bucket and Prefix from state input for listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + "Bucket.$": "$.reader.bucket", + "Prefix.$": "$.reader.prefix", + }, + }, + ItemSelector: { + "key.$": "$$.Map.Item.Value.Key", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { key: "logs/2025/a.json", body: '{"a":1}' }, + { key: "logs/2025/b.json", body: '{"b":2}' }, + { key: "logs/2024/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + prefix: "logs/2025/", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { key: "logs/2025/a.json" }, + { key: "logs/2025/b.json" }, + ]); + }); + + test("applies ItemSelector to listObjectsV2 metadata items", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemSelector: { + "key.$": "$$.Map.Item.Value.Key", + "bytes.$": "$$.Map.Item.Value.Size", + "class.$": "$$.Map.Item.Value.StorageClass", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "selector-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { + key: "logs/2024/a.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + { + key: "logs/2024/b.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + ]); + }); + + test("passes object item input type to child executions from ItemReader output", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "InspectType", + States: { + InspectType: { + Type: "Task", + Resource: "arn:aws:lambda:::function:inspect-type", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + ], + }, + { + service: "lambda", + name: "inspect-type", + function: (input) => ({ + isObject: + input !== null && + typeof input === "object" && + !Array.isArray(input), + hasKey: Object.prototype.hasOwnProperty.call(input, "Key"), + }), + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "type-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { isObject: true, hasKey: true }, + { isObject: true, hasKey: true }, + ]); + }); + + test("fails when ItemReader is configured on INLINE map", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { requestId: "inline-item-reader" }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader bucket is missing", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader key is not found in S3", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { + Bucket: "source-bucket", + Key: "missing.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.json", body: JSON.stringify([1, 2]) }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("returns an empty array when ItemReader listObjectsV2 prefix has no matching objects", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "missing-prefix/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "logs/a.json", body: "{}" }], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([]); + }); + + test("fails with States.ItemReaderFailed when ItemReader InputType is unsupported", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "CSV", + }, + Parameters: { + Bucket: "source-bucket", + Key: "items.csv", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.csv", body: "a,b,c" }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader ItemsPointer is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "payload/records", + }, + Parameters: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify({ payload: { records: [1, 2] } }), + }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader getObject payload is not an array", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Parameters: { + Bucket: "source-bucket", + Key: "not-array.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "not-array.json", body: JSON.stringify({ id: "x" }) }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + }); + + describe("ItemBatcher", () => { + test("batches items with ItemBatcher before child workflow execution", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: { + "source.$": "$.requestId", + }, + }, + ItemProcessor: { + StartAt: "SummariseBatch", + States: { + SummariseBatch: { + Type: "Task", + Resource: "arn:aws:lambda:::function:summarise-batch", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "summarise-batch", + function: (input) => ({ + source: input.BatchInput?.source, + batchCount: input.Items?.length || 0, + }), + }, + ]; + + const input = { + requestId: "req-77", + items: [{ id: 1 }, { id: 2 }, { id: 3 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { source: "req-77", batchCount: 2 }, + { source: "req-77", batchCount: 1 }, + ]); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is zero", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 0, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is not an integer", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 1.5, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher BatchInput intrinsic function is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: { + "invalid.$": "States.NotAFunction($.value)", + }, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ value: 1 }, { value: 2 }]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "States.IntrinsicFailure" }); + }); + + test("batches items using ItemBatcher MaxItemsPerBatchPath from state input", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ItemBatcher: { + MaxItemsPerBatchPath: "$.batchSize", + }, + ItemProcessor: { + StartAt: "SummariseBatch", + States: { + SummariseBatch: { + Type: "Task", + Resource: "arn:aws:lambda:::function:summarise-batch", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "summarise-batch", + function: (input) => ({ batchCount: input.Items?.length || 0 }), + }, + ]; + + const input = { + batchSize: 2, + items: [{ id: 1 }, { id: 2 }, { id: 3 }, { id: 4 }, { id: 5 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { batchCount: 2 }, + { batchCount: 2 }, + { batchCount: 1 }, + ]); + }); + + test("fails when both ItemBatcher MaxItemsPerBatch and MaxItemsPerBatchPath are set", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ItemBatcher: { + MaxItemsPerBatch: 2, + MaxItemsPerBatchPath: "$.batchSize", + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { Type: "Pass", End: true }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { batchSize: 2, items: [1, 2, 3] }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("returns empty output when ItemBatcher is configured and there are no items", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 2, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, []), + getSimulatorContext() + ); + + expect(result).toEqual([]); + }); + }); + + describe("ToleratedFailure*", () => { + test("fails with States.ExceedToleratedFailureThreshold when tolerated failures are exceeded", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when tolerated failure percentage is exceeded", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailurePercentage: 40, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: true }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("returns failed iteration error payloads when failures are tolerated", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { id: input.id, ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: false }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ id: 1, ok: true }); + expect(result[1]).toMatchObject({ + Error: "States.TaskFailed", + Cause: expect.any(String), + }); + expect(result[1].Cause).toContain("boom"); + expect(result[2]).toEqual({ id: 3, ok: true }); + }); + + test("throws original task failure when items fail and no tolerated failure threshold is configured", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.TaskFailed" }); + }); + + test("fails when ToleratedFailureCount is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: -1, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailurePercentage is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailurePercentage: 120, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailureCountPath resolves to undefined", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ToleratedFailureCountPath: "$.missingThreshold", + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { items: [1] }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("does not exceed tolerated failure count when failed item count equals threshold", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.fail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [{ fail: true }, { fail: false }]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result[0]).toMatchObject({ Error: "States.TaskFailed" }); + expect(result[1]).toEqual({ ok: true }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when failure count is above threshold", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + }); + + // Statelint (@wmfs/statelint) does not yet recognise ItemReader, ItemBatcher, + // ProcessorConfig, or ToleratedFailure*. Until it does, callers must use + // validateDefinition: false to opt out. + describe("load() integration with validateDefinition: false", () => { + test("end-to-end Map with ItemReader, ItemBatcher and ToleratedFailure*", async () => { + const definition = { + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { Bucket: "input-bucket", Key: "items.json" }, + ReaderConfig: { InputType: "JSON" }, + }, + ItemBatcher: { MaxItemsPerBatch: 2 }, + ToleratedFailureCount: 0, + ToleratedFailurePercentage: 100, + ItemProcessor: { + StartAt: "EchoBatch", + ProcessorConfig: { Mode: "DISTRIBUTED", ExecutionType: "STANDARD" }, + States: { + EchoBatch: { Type: "Pass", End: true }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "input-bucket", + objects: [ + { key: "items.json", body: JSON.stringify([{ id: 1 }, { id: 2 }, { id: 3 }]) }, + ], + }, + ]; + + const stateMachine = load(definition, resources, { + validateDefinition: false, + }); + const result = await stateMachine.execute({}); + + expect(result).toEqual([ + { Items: [{ id: 1 }, { id: 2 }] }, + { Items: [{ id: 3 }] }, + ]); + }); + }); +});