From 90a3a953b22b68b1d15326e85dfe0d0f912d38d1 Mon Sep 17 00:00:00 2001 From: John McCarthy Date: Sat, 25 Apr 2026 14:58:32 +1000 Subject: [PATCH] Add support for ItemReader, ItemBatcher, ToleratedFailure* in Map states This commit adds support for the following Map state configuration fields: - `ItemReader` specifying a resource or resources in S3 to read items from - `ItemBatcher` to batch map state executions - `ToleratedFailureCount` and `ToleratedFailurePercentage` on map states (and `-Path` variants for JSONpath). It also adds supports and checks for: - `MaxItems` on map states (and `MaxItemsPath` for JSONpath) - `ProcessorConfig.Mode` (`"INLINE"` or `"DISTRIBUTED"`) for validating whether `ItemReader` should be processed - `Label` for adding (more) accurate `context.Execution` information for distributed child map state executions All changes are implemented for both JSONpath and JSONata configuration of Map states. All changes are covered by new unit tests for map states, covering normal input, edge cases, and validating inputs for `ItemReader`, `ItemBatcher`, and `ToleratedFailure*` as well as `INLINE` v.s. `DISTRIBUTED`. This commit does not address the following additional functionalities of AWS map states which are still remaining: - `MaxConcurrency` - `MaxInputBytesPerBatch` (and `MaxInputBytesPerBatchPath` for JSONpath) - `ProcessorConfig.ExecutionType` for distributed map states. - `ResultWriter` - S3 object input types of `JSONL`, `CSV`, `MANIFEST`, or `PARQUET` in map states with `ItemReader` configured to use S3 `getObject` . - `Transformation === "LOAD_AND_FLATTEN"` functionality. --- package-lock.json | 4 +- src/errors.js | 9 +- src/executors.js | 571 ++++++++++- tests/map-jsonata.test.js | 1643 ++++++++++++++++++++++++++++++++ tests/map-jsonpath.test.js | 1833 ++++++++++++++++++++++++++++++++++++ 5 files changed, 4043 insertions(+), 17 deletions(-) create mode 100644 tests/map-jsonata.test.js create mode 100644 tests/map-jsonpath.test.js diff --git a/package-lock.json b/package-lock.json index 8dc971d..0d539d3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "sfn-sim", - "version": "1.5.1", + "version": "2.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "sfn-sim", - "version": "1.5.1", + "version": "2.0.0", "license": "ISC", "dependencies": { "@wmfs/statelint": "^1.30.0", diff --git a/src/errors.js b/src/errors.js index 6112dd7..193ae99 100644 --- a/src/errors.js +++ b/src/errors.js @@ -90,8 +90,11 @@ class ExceedToleratedFailureThresholdError extends RuntimeError { } class ItemReaderFailedError extends RuntimeError { - constructor() { - super('A Map state failed to read all items as specified by the "ItemReader" field.'); + constructor(cause) { + const message = cause + ? `A Map state failed to read all items as specified by the "ItemReader" field. ${cause}` + : 'A Map state failed to read all items as specified by the "ItemReader" field.'; + super(message); this.name = 'States.ItemReaderFailed'; } } @@ -113,5 +116,7 @@ export { TaskFailedError, NoChoiceMatchedError, IntrinsicFailureError, + ExceedToleratedFailureThresholdError, + ItemReaderFailedError, ERROR_WILDCARD, }; diff --git a/src/executors.js b/src/executors.js index 4fd16e0..e960df4 100644 --- a/src/executors.js +++ b/src/executors.js @@ -1,7 +1,15 @@ import { runJSONPathChoice, runJSONataChoice } from './choice.js'; -import { RuntimeError, FailError, ERROR_WILDCARD } from './errors.js'; +import { v4 as uuidV4 } from 'uuid'; +import { + RuntimeError, + FailError, + ExceedToleratedFailureThresholdError, + ItemReaderFailedError, + ERROR_WILDCARD, +} from './errors.js'; import runTask from './task.js'; import { getValue, applyPayloadTemplate, getStateResult, wait, evaluateJSONata, getJSONataInput, getJSONataOutput, assign } from './utils.js'; +import { createHash } from 'node:crypto'; /* * JSONata @@ -106,17 +114,260 @@ const executeParallelJSONata = async (state, variables, simulatorContext) => { return [output, next]; }; +const getItemReaderPointerValue = (value, pointer) => { + if (!pointer || pointer === '/') { + return value; + } + + if (!pointer.startsWith('/')) { + throw new RuntimeError('ItemReader ReaderConfig.ItemsPointer must start with "/"'); + } + + const segments = pointer + .split('/') + .slice(1) + .map((segment) => segment.replace(/~1/g, '/').replace(/~0/g, '~')); + + return segments.reduce((current, segment) => { + if (current === undefined || current === null) { + throw new RuntimeError('ItemReader ReaderConfig.ItemsPointer did not match any data'); + } + return current[segment]; + }, value); +}; + +const shapeS3ObjectListItem = (entry, lastModified) => { + if (typeof entry.body !== 'string') { + throw new RuntimeError('S3 object body must be a string'); + } + + return { + ETag: createHash('md5').update(entry.body).digest('hex'), + Key: entry.key, + LastModified: lastModified, + Size: new TextEncoder().encode(entry.body).length, + StorageClass: 'STANDARD', + }; +}; + +const mapSettledResultsToOutput = (settledResults) => settledResults.map((entry) => { + if (entry.status === 'fulfilled') { + return entry.value; + } + + const reason = entry.reason; + if (reason?.toErrorOutput) { + return reason.toErrorOutput(); + } + + return { + Error: reason?.name || 'Error', + Cause: reason?.stack || reason?.message, + }; +}); + +const isFailureThresholdExceeded = ({ + totalItems, + failedItems, + toleratedFailureCount, + toleratedFailurePercentage, +}) => { + const failedPercentage = totalItems === 0 ? 0 : (failedItems / totalItems) * 100; + const countExceeded = toleratedFailureCount !== null && failedItems > toleratedFailureCount; + const percentageExceeded = toleratedFailurePercentage !== null && failedPercentage > toleratedFailurePercentage; + + return countExceeded || percentageExceeded; +}; + +const getMapChildExecutionContext = (state, variables, itemInput) => { + if (state.ItemProcessor.ProcessorConfig?.Mode !== 'DISTRIBUTED') { + return variables.states.context.Execution; + } + + const mapRunId = uuidV4(); + const parentExecutionName = variables.states.context.Execution?.Name; + const mapStateIdentifier = (state.Label || variables.states.context.State?.Name || 'Map').replace(/\s+/g, ''); + const childExecutionName = parentExecutionName + ? `${parentExecutionName}/${mapStateIdentifier}:${mapRunId}` + : `${mapStateIdentifier}:${mapRunId}`; + + return { + Id: mapRunId, + Input: itemInput, + Name: childExecutionName, + StartTime: new Date().toISOString(), + RedriveCount: 0, + }; +}; + +const getMapItemsFromItemReaderJSONata = async (state, variables, simulatorContext) => { + try { + const { ItemReader } = state; + const readerInput = ItemReader.Arguments + ? await evaluateJSONata(ItemReader.Arguments, variables) + : (ItemReader.Parameters || variables.states.input); + + const s3Resource = simulatorContext.resources.find( + ({ service, name }) => service === 's3' && name === readerInput.Bucket, + ); + + if (!s3Resource) { + throw new RuntimeError('ItemReader failed to find configured S3 bucket resource'); + } + + let items; + const isS3GetObjectReader = [ + 'arn:aws:states:::s3:getObject', + 'arn:aws:states:::aws-sdk:s3:getObject', + ].includes(ItemReader.Resource); + const isS3ListObjectsReader = [ + 'arn:aws:states:::s3:listObjectsV2', + 'arn:aws:states:::aws-sdk:s3:listObjectsV2', + ].includes(ItemReader.Resource); + + if (isS3GetObjectReader) { + // TODO JSONL, CSV, MANIFEST, PARQUET + const inputType = ItemReader.ReaderConfig?.InputType || 'JSON'; + if (inputType !== 'JSON') { + throw new RuntimeError(`Unsupported ItemReader ReaderConfig.InputType [${inputType}]`); + } + + const object = s3Resource.objects?.find((entry) => entry.key === readerInput.Key); + + if (!object) { + throw new RuntimeError('ItemReader failed to load the configured S3 object'); + } + + items = object.body; + if (typeof items === 'string') { + items = JSON.parse(items); + } else { + throw new RuntimeError('ItemReader S3 object body must be a string'); + } + + const itemsPointer = ItemReader.ReaderConfig?.ItemsPointer; + if (itemsPointer) { + items = getItemReaderPointerValue(items, itemsPointer); + } + } else if (isS3ListObjectsReader) { + + const prefix = readerInput.Prefix || ''; + const now = new Date().toISOString(); + + // TODO Support Transformation === "LOAD_AND_FLATTEN" + + items = (s3Resource.objects || []) + .filter((entry) => entry.key.startsWith(prefix)) + .map((entry) => shapeS3ObjectListItem(entry, now)); + } else { + throw new RuntimeError(`Unsupported ItemReader Resource [${ItemReader.Resource}]`); + } + + if (!Array.isArray(items)) { + throw new RuntimeError('ItemReader must resolve to an array of items'); + } + + const maxItems = ItemReader.ReaderConfig?.MaxItems !== undefined + ? await evaluateJSONata(ItemReader.ReaderConfig.MaxItems, variables) + : null; + + if (maxItems !== null && maxItems !== undefined) { + return items.slice(0, maxItems); + } + + return items; + } catch (error) { + if (error instanceof ItemReaderFailedError) { + throw error; + } + throw new ItemReaderFailedError(error?.message || error); + } +}; + +const getMapItemsJSONata = async (state, variables, simulatorContext) => { + if (!state.ItemReader) { + return evaluateJSONata(state.Items, variables); + } + + if (state.ItemProcessor.ProcessorConfig?.Mode !== 'DISTRIBUTED') { + throw new RuntimeError('ItemReader is not supported for INLINE map states'); + } + + const itemReaderItems = await getMapItemsFromItemReaderJSONata(state, variables, simulatorContext); + + if (!state.Items) { + return itemReaderItems; + } + + const itemReaderVariables = { + ...variables, + states: { + ...variables.states, + input: itemReaderItems, + }, + }; + + return evaluateJSONata(state.Items, itemReaderVariables); +}; + +const applyItemBatcherJSONata = async (itemBatcher, items, variables) => { + if (!itemBatcher) { + return items; + } + + // TODO MaxInputBytesPerBatch + + let maxItemsPerBatch = null; + if (itemBatcher.MaxItemsPerBatch !== undefined) { + maxItemsPerBatch = await evaluateJSONata(itemBatcher.MaxItemsPerBatch, variables); + } + + if (maxItemsPerBatch !== null && (!Number.isInteger(maxItemsPerBatch) || maxItemsPerBatch <= 0)) { + throw new RuntimeError('ItemBatcher MaxItemsPerBatch must resolve to a positive integer'); + } + + const batchInput = itemBatcher.BatchInput !== undefined + ? await evaluateJSONata(itemBatcher.BatchInput, variables) + : undefined; + + const batches = []; + const batchSize = maxItemsPerBatch || items.length || 1; + for (let index = 0; index < items.length; index += batchSize) { + const batch = { + Items: items.slice(index, index + batchSize), + }; + + if (batchInput !== undefined) { + batch.BatchInput = batchInput; + } + + batches.push(batch); + } + + return batches; +}; + const executeMapJSONata = async (state, variables, simulatorContext) => { - const items = await evaluateJSONata(state.Items, variables); + // Map-level JSONata fields should resolve $states.input to the map state's input. + const parentInput = variables.states.input; + + // Get the items to process, either from Items or via an ItemReader + let items = await getMapItemsJSONata(state, variables, simulatorContext); - // TODO ItemReader, ItemBatcher, ResultWriter, ToleratedFailure + // If there is only a single item returned by the JSONata expression, wrap it in an array. + // This is AWS Step Function behaviour to coerce scalar expressions to single element arrays to facilitate mapping. + if (!Array.isArray(items)) { + items = [items]; + } - const executions = items.map(async (Value, Index) => { + // Resolve ItemSelector against the items to process + const selectedItems = await Promise.all(items.map(async (Value, Index) => { const itemVariables = { ...variables, states: { ...variables.states, - input: Value, + // $states.input is the input to the map state when ItemSelector is resolving the items to process. + input: parentInput, context: { ...variables.states.context, State: { @@ -134,15 +385,85 @@ const executeMapJSONata = async (state, variables, simulatorContext) => { }; if (state.ItemSelector) { - const input = await evaluateJSONata(state.ItemSelector, itemVariables); - itemVariables.states.input = input; - itemVariables.states.context.Execution.Input = input; + return evaluateJSONata(state.ItemSelector, itemVariables); } + return Value; + })); + + // Batch the items if an ItemBatcher is configured + const executionInputs = await applyItemBatcherJSONata(state.ItemBatcher, selectedItems, variables); + + // Execute the child workflows for each item or batch + const executions = executionInputs.map(async (Value, Index) => { + const childExecution = getMapChildExecutionContext(state, variables, Value); + + const itemVariables = { + ...variables, + states: { + ...variables.states, + // $states.input is the input to the child workflow when it is executing. + input: Value, + context: { + ...variables.states.context, + Execution: childExecution, + State: { + ...variables.states.context.State, + Name: state.ItemProcessor.StartAt, + }, + Map: { + Item: { + Index, + Value, + }, + }, + }, + }, + }; return executeStateMachine(state.ItemProcessor, itemVariables, simulatorContext); }); - const result = await Promise.all(executions); + // Check failure thresholds + + // TODO MaxConcurrency + const settledResults = await Promise.allSettled(executions); + const failedResults = settledResults.filter((entry) => entry.status === 'rejected'); + + const toleratedFailureCount = state.ToleratedFailureCount !== undefined + ? await evaluateJSONata(state.ToleratedFailureCount, variables) + : null; + const toleratedFailurePercentage = state.ToleratedFailurePercentage !== undefined + ? await evaluateJSONata(state.ToleratedFailurePercentage, variables) + : null; + + const hasFailureThreshold = toleratedFailureCount !== null || toleratedFailurePercentage !== null; + if (failedResults.length > 0 && !hasFailureThreshold) { + throw failedResults[0].reason; + } + + if (toleratedFailureCount !== null && (!Number.isInteger(toleratedFailureCount) || toleratedFailureCount < 0)) { + throw new RuntimeError('ToleratedFailureCount must resolve to a non-negative integer'); + } + if ( + toleratedFailurePercentage !== null + && (typeof toleratedFailurePercentage !== 'number' || toleratedFailurePercentage < 0 || toleratedFailurePercentage > 100) + ) { + throw new RuntimeError('ToleratedFailurePercentage must resolve to a number between 0 and 100'); + } + + const totalItems = settledResults.length; + const failedItems = failedResults.length; + if (isFailureThresholdExceeded({ + totalItems, + failedItems, + toleratedFailureCount, + toleratedFailurePercentage, + })) { + throw new ExceedToleratedFailureThresholdError(); + } + + // TODO ResultWriter + const result = mapSettledResultsToOutput(settledResults); variables.states.result = result; @@ -274,14 +595,205 @@ const executeParallelJSONPath = async (state, variables, simulatorContext) => { return [stateOutput, next]; }; + +const getMapItemsFromItemReaderJSONPath = (state, stateInput, context, simulatorContext) => { + try { + const { ItemReader } = state; + const readerInput = ItemReader.Parameters + ? applyPayloadTemplate(stateInput, context, ItemReader.Parameters) + : stateInput; + + const s3Resource = simulatorContext.resources.find( + ({ service, name }) => service === 's3' && name === readerInput.Bucket, + ); + + if (!s3Resource) { + throw new RuntimeError('ItemReader failed to find configured S3 bucket resource'); + } + + let items; + const isS3GetObjectReader = [ + 'arn:aws:states:::s3:getObject', + 'arn:aws:states:::aws-sdk:s3:getObject', + ].includes(ItemReader.Resource); + const isS3ListObjectsReader = [ + 'arn:aws:states:::s3:listObjectsV2', + 'arn:aws:states:::aws-sdk:s3:listObjectsV2', + ].includes(ItemReader.Resource); + + if (isS3GetObjectReader) { + const inputType = ItemReader.ReaderConfig?.InputType || 'JSON'; + if (inputType !== 'JSON') { + throw new RuntimeError(`Unsupported ItemReader ReaderConfig.InputType [${inputType}]`); + } + + const object = s3Resource.objects?.find((entry) => entry.key === readerInput.Key); + if (!object) { + throw new RuntimeError('ItemReader failed to load the configured S3 object'); + } + + items = object.body; + if (typeof items === 'string') { + items = JSON.parse(items); + } else { + throw new RuntimeError('ItemReader S3 object body must be a string'); + } + + const itemsPointer = ItemReader.ReaderConfig?.ItemsPointer; + if (itemsPointer) { + items = getItemReaderPointerValue(items, itemsPointer); + } + } else if (isS3ListObjectsReader) { + const prefix = readerInput.Prefix || ''; + const now = new Date().toISOString(); + items = (s3Resource.objects || []) + .filter((entry) => entry.key.startsWith(prefix)) + .map((entry) => shapeS3ObjectListItem(entry, now)); + } else { + throw new RuntimeError(`Unsupported ItemReader Resource [${ItemReader.Resource}]`); + } + + if (!Array.isArray(items)) { + throw new RuntimeError('ItemReader must resolve to an array of items'); + } + + const maxItems = ItemReader.ReaderConfig?.MaxItems || null; + const maxItemsPath = ItemReader.ReaderConfig?.MaxItemsPath || null; + + if (maxItemsPath !== null && maxItems !== null) { + throw new RuntimeError('ItemReader ReaderConfig.MaxItems and ReaderConfig.MaxItemsPath cannot be used together'); + } + + let maxItemsValue = null; + if (maxItemsPath !== null) { + maxItemsValue = getValue(stateInput, maxItemsPath); + } else if (maxItems !== null) { + if (maxItems !== null && (!Number.isInteger(maxItems) || maxItems <= 0)) { + throw new RuntimeError('ItemReader ReaderConfig.MaxItems must resolve to a positive integer'); + } + maxItemsValue = maxItems; + } + + if (maxItemsValue !== null) { + return items.slice(0, maxItemsValue); + } + + return items; + } catch (error) { + if (error instanceof ItemReaderFailedError) { + throw error; + } + throw new ItemReaderFailedError(error?.message || error); + } +}; + +const getMapItemsJSONPath = (state, stateInput, effectiveInput, context, simulatorContext) => { + if (!state.ItemReader) { + return getValue(effectiveInput, state.ItemsPath); + } + + if (state.ItemProcessor.ProcessorConfig?.Mode !== 'DISTRIBUTED') { + throw new RuntimeError('ItemReader is not supported for INLINE map states'); + } + + const itemReaderItems = getMapItemsFromItemReaderJSONPath(state, stateInput, context, simulatorContext); + + if (!state.ItemsPath) { + return itemReaderItems; + } + + return getValue(itemReaderItems, state.ItemsPath); +}; + +const applyItemBatcherJSONPath = (itemBatcher, items, stateInput, context) => { + if (!itemBatcher) { + return items; + } + + // TODO MaxInputBytesPerBatch, MaxInputBytesPerBatchPath + + const maxItemsPerBatch = itemBatcher.MaxItemsPerBatch ?? null; + const maxItemsPerBatchPath = itemBatcher.MaxItemsPerBatchPath ?? null; + + if (maxItemsPerBatch !== null && maxItemsPerBatchPath !== null) { + throw new RuntimeError('ItemBatcher MaxItemsPerBatch and MaxItemsPerBatchPath cannot be used together'); + } + + let maxItemsPerBatchValue; + + if (maxItemsPerBatchPath !== null) { + maxItemsPerBatchValue = getValue(stateInput, maxItemsPerBatchPath); + } else if (maxItemsPerBatch !== null) { + if (maxItemsPerBatch !== null && (!Number.isInteger(maxItemsPerBatch) || maxItemsPerBatch <= 0)) { + throw new RuntimeError('ItemBatcher MaxItemsPerBatch must resolve to a positive integer'); + } + + maxItemsPerBatchValue = maxItemsPerBatch; + } + + const batchInput = itemBatcher.BatchInput !== undefined + ? applyPayloadTemplate(stateInput, context, itemBatcher.BatchInput) + : undefined; + + const batches = []; + const batchSize = maxItemsPerBatchValue || items.length || 1; + for (let index = 0; index < items.length; index += batchSize) { + const batch = { + Items: items.slice(index, index + batchSize), + }; + if (batchInput !== undefined) { + batch.BatchInput = batchInput; + } + batches.push(batch); + } + + return batches; +}; + const executeMapJSONPath = async (state, variables, simulatorContext) => { const rawInput = variables.states.input; const stateInput = getValue(rawInput, state.InputPath); const effectiveInput = applyPayloadTemplate(stateInput, variables.states.context, state.Parameters); + let items = getMapItemsJSONPath(state, stateInput, effectiveInput, variables.states.context, simulatorContext); + + // If there is only a single item returned by the JSONpath evaluation, wrap it in an array. + // This is AWS Step Function behaviour to coerce scalar expressions to single element arrays to facilitate mapping. + if (!Array.isArray(items)) { + items = [items]; + } + + const selectedItems = items.map((Value, Index) => { + if (!state.ItemSelector) { + return Value; + } - const items = getValue(effectiveInput, state.ItemsPath); + const itemContext = { + ...variables.states.context, + State: { + ...variables.states.context.State, + Name: state.ItemProcessor.StartAt, + }, + Map: { + Item: { + Index, + Value, + }, + }, + }; + + return applyPayloadTemplate(effectiveInput, itemContext, state.ItemSelector); + }); + + const executionInputs = applyItemBatcherJSONPath( + state.ItemBatcher, + selectedItems, + effectiveInput, + variables.states.context, + ); + + const executions = executionInputs.map((Value, Index) => { + const childExecution = getMapChildExecutionContext(state, variables, Value); - const executions = items.map((Value, Index) => { const itemVariables = { ...variables, states: { @@ -289,6 +801,7 @@ const executeMapJSONPath = async (state, variables, simulatorContext) => { input: Value, context: { ...variables.states.context, + Execution: childExecution, State: { ...variables.states.context.State, Name: state.ItemProcessor.StartAt, @@ -302,12 +815,44 @@ const executeMapJSONPath = async (state, variables, simulatorContext) => { }, }, }; - return executeStateMachine(state.ItemProcessor, itemVariables, simulatorContext); }); - const result = await Promise.all(executions); + const settledResults = await Promise.allSettled(executions); + const failedResults = settledResults.filter((entry) => entry.status === 'rejected'); + + const toleratedFailureCount = state.ToleratedFailureCount + ?? (state.ToleratedFailureCountPath ? getValue(effectiveInput, state.ToleratedFailureCountPath) : null); + const toleratedFailurePercentage = state.ToleratedFailurePercentage + ?? (state.ToleratedFailurePercentagePath ? getValue(effectiveInput, state.ToleratedFailurePercentagePath) : null); + + const hasFailureThreshold = toleratedFailureCount !== null || toleratedFailurePercentage !== null; + if (failedResults.length > 0 && !hasFailureThreshold) { + throw failedResults[0].reason; + } + + if (toleratedFailureCount !== null && (!Number.isInteger(toleratedFailureCount) || toleratedFailureCount < 0)) { + throw new RuntimeError('ToleratedFailureCount must resolve to a non-negative integer'); + } + if ( + toleratedFailurePercentage !== null + && (typeof toleratedFailurePercentage !== 'number' || toleratedFailurePercentage < 0 || toleratedFailurePercentage > 100) + ) { + throw new RuntimeError('ToleratedFailurePercentage must resolve to a number between 0 and 100'); + } + + const totalItems = settledResults.length; + const failedItems = failedResults.length; + if (isFailureThresholdExceeded({ + totalItems, + failedItems, + toleratedFailureCount, + toleratedFailurePercentage, + })) { + throw new ExceedToleratedFailureThresholdError(); + } + const result = mapSettledResultsToOutput(settledResults); const effectiveResult = applyPayloadTemplate(result, variables.states.context, state.ResultSelector); const stateResult = getStateResult(rawInput, effectiveResult, state.ResultPath); const stateOutput = getValue(stateResult, state.OutputPath); diff --git a/tests/map-jsonata.test.js b/tests/map-jsonata.test.js new file mode 100644 index 0000000..3cbd83e --- /dev/null +++ b/tests/map-jsonata.test.js @@ -0,0 +1,1643 @@ +import { describe, test, expect } from "vitest"; +import { v4 as uuidV4 } from "uuid"; +import { executeStateMachine } from "../src/executors.js"; +import { load } from "../src/index.js"; +import { defaultOptions } from "../src/options.js"; + +const getVariables = (definition, input) => ({ + states: { + input, + context: { + Execution: { + Id: uuidV4(), + Input: input, + Name: "test-execution", + StartTime: new Date().toISOString(), + }, + State: { + Name: definition.StartAt, + }, + StateMachine: { + Id: uuidV4(), + Name: "test-state-machine", + }, + Task: {}, + }, + }, +}); + +const getSimulatorContext = (overrides = {}) => ({ + resources: [], + options: defaultOptions, + queryLanguage: "JSONata", + ...overrides, +}); + +describe("Map state (JSONata)", () => { + describe("Execution and context", () => { + test("uses parent map input for $states.input and map item value via context", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input.items %}", + ItemSelector: { + requestId: "{% $states.input.requestId %}", + number: "{% $states.context.Map.Item.Value.number %}", + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const input = { + requestId: "req-123", + items: [{ number: 1 }, { number: 2 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext() + ); + + expect(result).toEqual([ + { requestId: "req-123", number: 1 }, + { requestId: "req-123", number: 2 }, + ]); + }); + + test("keeps parent Execution context for INLINE map iterations", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "Map Step", + States: { + "Map Step": { + Type: "Map", + Items: "{% $states.input %}", + ItemProcessor: { + StartAt: "InspectExecutionInput", + States: { + InspectExecutionInput: { + Type: "Pass", + Output: { + item: "{% $states.context.Map.Item.Value %}", + executionInput: "{% $states.context.Execution.Input %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext() + ); + + expect(result).toEqual([ + { item: 1, executionInput: [1, 2, 3] }, + { item: 2, executionInput: [1, 2, 3] }, + { item: 3, executionInput: [1, 2, 3] }, + ]); + }); + + test("creates per-item Execution context for DISTRIBUTED map iterations", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "My Map Step", + States: { + "My Map Step": { + Type: "Map", + Label: "Custom Label Name", + Items: "{% $states.input %}", + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "InspectDistributedExecution", + States: { + InspectDistributedExecution: { + Type: "Task", + Resource: + "arn:aws:lambda:::function:inspect-distributed-execution", + Output: { + item: "{% $states.input %}", + executionInput: "{% $states.context.Execution.Input %}", + executionId: "{% $states.context.Execution.Id %}", + executionName: "{% $states.context.Execution.Name %}", + executionStartTime: + "{% $states.context.Execution.StartTime %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "inspect-distributed-execution", + function: async (input) => { + const delayMs = (4 - input) * 10; + await new Promise((resolve) => setTimeout(resolve, delayMs)); + return input; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(3); + expect(result).toEqual([ + expect.objectContaining({ + item: 1, + executionInput: 1, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 2, + executionInput: 2, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 3, + executionInput: 3, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + ]); + + for (const entry of result) { + expect(entry.executionId).toEqual(expect.any(String)); + expect(entry.executionStartTime).toEqual(expect.any(String)); + } + + const uniqueIds = new Set(result.map((entry) => entry.executionId)); + expect(uniqueIds.size).toBe(3); + }); + }); + + describe("ItemReader", () => { + test("reads items using ItemReader before running item processor", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Arguments: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r1" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("resolves ItemReader Arguments Bucket and Key from state input", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Arguments: { + Bucket: "{% $states.input.reader.bucket %}", + Key: "{% $states.input.reader.key %}", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { + key: "dynamic/items.json", + body: JSON.stringify([{ id: "dyn-a" }, { id: "dyn-b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + key: "dynamic/items.json", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "dyn-a" }, { id: "dyn-b" }]); + }); + + test("reads nested items using ItemReader ReaderConfig.ItemsPointer", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "/payload/records", + }, + Arguments: { + Bucket: "source-bucket", + Key: "nested.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "nested.json", + body: JSON.stringify({ + payload: { + records: [{ id: "nested-a" }, { id: "nested-b" }], + }, + }), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r2" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "nested-a" }, { id: "nested-b" }]); + }); + + test("limits item count using ItemReader ReaderConfig.MaxItems", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItems: 2, + }, + Arguments: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Output: { + id: "{% $states.context.Map.Item.Value.id %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r3" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("reads a list of S3 objects from a prefix using ItemReader listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + { key: "images/pic.jpg", body: "binary-bytes" }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "prefix-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result).toEqual([ + expect.objectContaining({ + Key: "logs/2024/a.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + expect.objectContaining({ + Key: "logs/2024/b.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + ]); + expect(result[0].Size).toEqual(expect.any(Number)); + expect(result[1].Size).toEqual(expect.any(Number)); + expect(result[0].LastModified).toEqual(expect.any(String)); + expect(result[1].LastModified).toEqual(expect.any(String)); + }); + + test("resolves ItemReader Arguments Bucket and Prefix from state input for listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "{% $states.input.reader.bucket %}", + Prefix: "{% $states.input.reader.prefix %}", + }, + }, + ItemSelector: { + key: "{% $states.context.Map.Item.Value.Key %}", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { key: "logs/2025/a.json", body: '{"a":1}' }, + { key: "logs/2025/b.json", body: '{"b":2}' }, + { key: "logs/2024/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + prefix: "logs/2025/", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { key: "logs/2025/a.json" }, + { key: "logs/2025/b.json" }, + ]); + }); + + test("applies ItemSelector to listObjectsV2 metadata items", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemSelector: { + key: "{% $states.context.Map.Item.Value.Key %}", + bytes: "{% $states.context.Map.Item.Value.Size %}", + class: "{% $states.context.Map.Item.Value.StorageClass %}", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "selector-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { + key: "logs/2024/a.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + { + key: "logs/2024/b.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + ]); + }); + + test("fails when ItemReader is configured on INLINE map", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { requestId: "inline-item-reader" }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader bucket is missing", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader key is not found in S3", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { + Bucket: "source-bucket", + Key: "missing.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.json", body: JSON.stringify([1, 2]) }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("returns an empty array when ItemReader listObjectsV2 prefix has no matching objects", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Arguments: { + Bucket: "source-bucket", + Prefix: "missing-prefix/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "logs/a.json", body: "{}" }], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([]); + }); + + test("fails with States.ItemReaderFailed when ItemReader InputType is unsupported", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "CSV", + }, + Arguments: { + Bucket: "source-bucket", + Key: "items.csv", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.csv", body: "a,b,c" }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader ItemsPointer is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "payload/records", + }, + Arguments: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify({ payload: { records: [1, 2] } }), + }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader getObject payload is not an array", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Arguments: { + Bucket: "source-bucket", + Key: "not-array.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "not-array.json", body: JSON.stringify({ id: "x" }) }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + }); + + describe("ItemBatcher", () => { + test("batches items with ItemBatcher before child workflow execution", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input.items %}", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: { + source: "{% $states.input.requestId %}", + }, + }, + ItemProcessor: { + StartAt: "SummariseBatch", + States: { + SummariseBatch: { + Type: "Pass", + Output: { + source: "{% $states.input.BatchInput.source %}", + batchCount: "{% $count($states.input.Items) %}", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const input = { + requestId: "req-77", + items: [{ id: 1 }, { id: 2 }, { id: 3 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext() + ); + + expect(result).toEqual([ + { source: "req-77", batchCount: 2 }, + { source: "req-77", batchCount: 1 }, + ]); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is zero", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 0, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is not an integer", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 1.5, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher BatchInput JSONata expression is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: "{% $notAFunction( %}", + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toBeDefined(); + }); + + test("returns empty output when ItemBatcher is configured and there are no items", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemBatcher: { + MaxItemsPerBatch: 2, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, []), + getSimulatorContext() + ); + + expect(result).toEqual([]); + }); + }); + + describe("ToleratedFailure*", () => { + test("fails with States.ExceedToleratedFailureThreshold when tolerated failures are exceeded", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when tolerated failure percentage is exceeded", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailurePercentage: 40, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: true }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("returns failed iteration error payloads when failures are tolerated", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + Output: "{% $states.result.Payload %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { id: input.id, ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: false }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ id: 1, ok: true }); + expect(result[1]).toMatchObject({ + Error: "States.TaskFailed", + Cause: expect.any(String), + }); + expect(result[1].Cause).toContain("boom"); + expect(result[2]).toEqual({ id: 3, ok: true }); + }); + + test("throws original task failure when items fail and no tolerated failure threshold is configured", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.TaskFailed" }); + }); + + test("fails when ToleratedFailureCount is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: -1, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailurePercentage is invalid", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailurePercentage: 120, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailureCount expression resolves to undefined", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input.items %}", + ToleratedFailureCount: "{% $states.input.missingThreshold %}", + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { items: [1] }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("does not exceed tolerated failure count when failed item count equals threshold", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + Output: "{% $states.result.Payload %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.fail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [{ fail: true }, { fail: false }]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result[0]).toMatchObject({ Error: "States.TaskFailed" }); + expect(result[1]).toEqual({ ok: true }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when failure count is above threshold", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + Items: "{% $states.input %}", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + }); + + // Statelint (@wmfs/statelint) does not yet recognise QueryLanguage, + // ItemReader, ItemBatcher, ProcessorConfig, or ToleratedFailure*. Until it + // does, callers must use validateDefinition: false to opt out. + describe("load() integration with validateDefinition: false", () => { + test("end-to-end Map with ItemReader, ItemBatcher and ToleratedFailure*", async () => { + const definition = { + QueryLanguage: "JSONata", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Arguments: { Bucket: "input-bucket", Key: "items.json" }, + ReaderConfig: { InputType: "JSON" }, + }, + ItemBatcher: { MaxItemsPerBatch: 2 }, + ToleratedFailureCount: 0, + ToleratedFailurePercentage: 100, + ItemProcessor: { + QueryLanguage: "JSONata", + StartAt: "EchoBatch", + ProcessorConfig: { Mode: "DISTRIBUTED", ExecutionType: "STANDARD" }, + States: { + EchoBatch: { + Type: "Pass", + Output: "{% $states.input %}", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "input-bucket", + objects: [ + { key: "items.json", body: JSON.stringify([{ id: 1 }, { id: 2 }, { id: 3 }]) }, + ], + }, + ]; + + const stateMachine = load(definition, resources, { + validateDefinition: false, + }); + const result = await stateMachine.execute({}); + + expect(result).toEqual([ + { Items: [{ id: 1 }, { id: 2 }] }, + { Items: [{ id: 3 }] }, + ]); + }); + }); +}); diff --git a/tests/map-jsonpath.test.js b/tests/map-jsonpath.test.js new file mode 100644 index 0000000..cbbc8df --- /dev/null +++ b/tests/map-jsonpath.test.js @@ -0,0 +1,1833 @@ +import { describe, test, expect } from "vitest"; +import { v4 as uuidV4 } from "uuid"; +import { executeStateMachine } from "../src/executors.js"; +import { load } from "../src/index.js"; +import { defaultOptions } from "../src/options.js"; + +const getVariables = (definition, input) => ({ + states: { + input, + context: { + Execution: { + Id: uuidV4(), + Input: input, + Name: "test-execution", + StartTime: new Date().toISOString(), + }, + State: { + Name: definition.StartAt, + }, + StateMachine: { + Id: uuidV4(), + Name: "test-state-machine", + }, + Task: {}, + }, + }, +}); + +const getSimulatorContext = (overrides = {}) => ({ + resources: [], + options: defaultOptions, + queryLanguage: "JSONPath", + ...overrides, +}); + +describe("Map state (JSONPath)", () => { + describe("Execution and context", () => { + test("reads items using ItemReader before running item processor", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Parameters: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r1" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("keeps parent Execution context for INLINE map iterations", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "Map Step", + States: { + "Map Step": { + Type: "Map", + ItemsPath: "$", + ItemProcessor: { + StartAt: "InspectExecutionInput", + States: { + InspectExecutionInput: { + Type: "Pass", + Parameters: { + "item.$": "$", + "executionInput.$": "$$.Execution.Input", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext() + ); + + expect(result).toEqual([ + { item: 1, executionInput: [1, 2, 3] }, + { item: 2, executionInput: [1, 2, 3] }, + { item: 3, executionInput: [1, 2, 3] }, + ]); + }); + + test("creates per-item Execution context for DISTRIBUTED map iterations", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "My Map Step", + States: { + "My Map Step": { + Type: "Map", + Label: "Custom Label Name", + ItemsPath: "$", + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "InspectDistributedExecution", + States: { + InspectDistributedExecution: { + Type: "Pass", + Parameters: { + "item.$": "$", + "executionInput.$": "$$.Execution.Input", + "executionId.$": "$$.Execution.Id", + "executionName.$": "$$.Execution.Name", + "executionStartTime.$": "$$.Execution.StartTime", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, [1, 2, 3]), + getSimulatorContext() + ); + + expect(result).toEqual([ + expect.objectContaining({ + item: 1, + executionInput: 1, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 2, + executionInput: 2, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + expect.objectContaining({ + item: 3, + executionInput: 3, + executionName: expect.stringMatching( + /^test-execution\/CustomLabelName:[0-9a-f-]{36}$/ + ), + }), + ]); + + for (const entry of result) { + expect(entry.executionId).toEqual(expect.any(String)); + expect(entry.executionStartTime).toEqual(expect.any(String)); + } + + const uniqueIds = new Set(result.map((entry) => entry.executionId)); + expect(uniqueIds.size).toBe(3); + }); + }); + + describe("ItemReader", () => { + test("resolves ItemReader Parameters Bucket and Key from state input", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Parameters: { + "Bucket.$": "$.reader.bucket", + "Key.$": "$.reader.key", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { + key: "dynamic/items.json", + body: JSON.stringify([{ id: "dyn-a" }, { id: "dyn-b" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + key: "dynamic/items.json", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "dyn-a" }, { id: "dyn-b" }]); + }); + + test("reads nested items using ItemReader ReaderConfig.ItemsPointer", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "/payload/records", + }, + Parameters: { + Bucket: "source-bucket", + Key: "nested.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "nested.json", + body: JSON.stringify({ + payload: { + records: [{ id: "nested-a" }, { id: "nested-b" }], + }, + }), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r2" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "nested-a" }, { id: "nested-b" }]); + }); + + test("limits item count using ItemReader ReaderConfig.MaxItems", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItems: 2, + }, + Parameters: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { requestId: "r3" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }]); + }); + + test("limits item count using ItemReader ReaderConfig.MaxItemsPath from state input", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItemsPath: "$.limit", + }, + Parameters: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { + Type: "Pass", + Parameters: { + "id.$": "$$.Map.Item.Value.id", + }, + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }]), + }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { limit: 3 }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([{ id: "a" }, { id: "b" }, { id: "c" }]); + }); + + test("fails with States.ItemReaderFailed when both MaxItems and MaxItemsPath are set", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + MaxItems: 2, + MaxItemsPath: "$.limit", + }, + Parameters: { + Bucket: "source-bucket", + Key: "many-items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ProjectItem", + States: { + ProjectItem: { Type: "Pass", End: true }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "many-items.json", + body: JSON.stringify([{ id: "a" }, { id: "b" }, { id: "c" }]), + }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { limit: 1 }), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("reads a list of S3 objects from a prefix using ItemReader listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + { key: "images/pic.jpg", body: "binary-bytes" }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "prefix-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result).toEqual([ + expect.objectContaining({ + Key: "logs/2024/a.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + expect.objectContaining({ + Key: "logs/2024/b.json", + ETag: expect.any(String), + StorageClass: "STANDARD", + }), + ]); + expect(result[0].Size).toEqual(expect.any(Number)); + expect(result[1].Size).toEqual(expect.any(Number)); + expect(result[0].LastModified).toEqual(expect.any(String)); + expect(result[1].LastModified).toEqual(expect.any(String)); + }); + + test("resolves ItemReader Parameters Bucket and Prefix from state input for listObjectsV2", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + "Bucket.$": "$.reader.bucket", + "Prefix.$": "$.reader.prefix", + }, + }, + ItemSelector: { + "key.$": "$$.Map.Item.Value.Key", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "dynamic-source-bucket", + objects: [ + { key: "logs/2025/a.json", body: '{"a":1}' }, + { key: "logs/2025/b.json", body: '{"b":2}' }, + { key: "logs/2024/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { + reader: { + bucket: "dynamic-source-bucket", + prefix: "logs/2025/", + }, + }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { key: "logs/2025/a.json" }, + { key: "logs/2025/b.json" }, + ]); + }); + + test("applies ItemSelector to listObjectsV2 metadata items", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemSelector: { + "key.$": "$$.Map.Item.Value.Key", + "bytes.$": "$$.Map.Item.Value.Size", + "class.$": "$$.Map.Item.Value.StorageClass", + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + { key: "logs/2023/c.json", body: '{"c":3}' }, + ], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "selector-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { + key: "logs/2024/a.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + { + key: "logs/2024/b.json", + bytes: expect.any(Number), + class: "STANDARD", + }, + ]); + }); + + test("passes object item input type to child executions from ItemReader output", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "logs/2024/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "InspectType", + States: { + InspectType: { + Type: "Task", + Resource: "arn:aws:lambda:::function:inspect-type", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "logs/2024/a.json", body: '{"a":1}' }, + { key: "logs/2024/b.json", body: '{"b":2}' }, + ], + }, + { + service: "lambda", + name: "inspect-type", + function: (input) => ({ + isObject: + input !== null && + typeof input === "object" && + !Array.isArray(input), + hasKey: Object.prototype.hasOwnProperty.call(input, "Key"), + }), + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, { runId: "type-read" }), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { isObject: true, hasKey: true }, + { isObject: true, hasKey: true }, + ]); + }); + + test("fails when ItemReader is configured on INLINE map", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { requestId: "inline-item-reader" }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader bucket is missing", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader key is not found in S3", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { + Bucket: "source-bucket", + Key: "missing.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.json", body: JSON.stringify([1, 2]) }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("returns an empty array when ItemReader listObjectsV2 prefix has no matching objects", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:listObjectsV2", + Parameters: { + Bucket: "source-bucket", + Prefix: "missing-prefix/", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "logs/a.json", body: "{}" }], + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([]); + }); + + test("fails with States.ItemReaderFailed when ItemReader InputType is unsupported", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "CSV", + }, + Parameters: { + Bucket: "source-bucket", + Key: "items.csv", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [{ key: "items.csv", body: "a,b,c" }], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader ItemsPointer is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + ItemsPointer: "payload/records", + }, + Parameters: { + Bucket: "source-bucket", + Key: "items.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { + key: "items.json", + body: JSON.stringify({ payload: { records: [1, 2] } }), + }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + + test("fails with States.ItemReaderFailed when ItemReader getObject payload is not an array", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "JSON", + }, + Parameters: { + Bucket: "source-bucket", + Key: "not-array.json", + }, + }, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + }, + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "source-bucket", + objects: [ + { key: "not-array.json", body: JSON.stringify({ id: "x" }) }, + ], + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, {}), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.ItemReaderFailed" }); + }); + }); + + describe("ItemBatcher", () => { + test("batches items with ItemBatcher before child workflow execution", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: { + "source.$": "$.requestId", + }, + }, + ItemProcessor: { + StartAt: "SummariseBatch", + States: { + SummariseBatch: { + Type: "Task", + Resource: "arn:aws:lambda:::function:summarise-batch", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "summarise-batch", + function: (input) => ({ + source: input.BatchInput?.source, + batchCount: input.Items?.length || 0, + }), + }, + ]; + + const input = { + requestId: "req-77", + items: [{ id: 1 }, { id: 2 }, { id: 3 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { source: "req-77", batchCount: 2 }, + { source: "req-77", batchCount: 1 }, + ]); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is zero", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 0, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher MaxItemsPerBatch is not an integer", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 1.5, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ItemBatcher BatchInput intrinsic function is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 2, + BatchInput: { + "invalid.$": "States.NotAFunction($.value)", + }, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ value: 1 }, { value: 2 }]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "States.IntrinsicFailure" }); + }); + + test("batches items using ItemBatcher MaxItemsPerBatchPath from state input", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ItemBatcher: { + MaxItemsPerBatchPath: "$.batchSize", + }, + ItemProcessor: { + StartAt: "SummariseBatch", + States: { + SummariseBatch: { + Type: "Task", + Resource: "arn:aws:lambda:::function:summarise-batch", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "summarise-batch", + function: (input) => ({ batchCount: input.Items?.length || 0 }), + }, + ]; + + const input = { + batchSize: 2, + items: [{ id: 1 }, { id: 2 }, { id: 3 }, { id: 4 }, { id: 5 }], + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, input), + getSimulatorContext({ resources }) + ); + + expect(result).toEqual([ + { batchCount: 2 }, + { batchCount: 2 }, + { batchCount: 1 }, + ]); + }); + + test("fails when both ItemBatcher MaxItemsPerBatch and MaxItemsPerBatchPath are set", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ItemBatcher: { + MaxItemsPerBatch: 2, + MaxItemsPerBatchPath: "$.batchSize", + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { Type: "Pass", End: true }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { batchSize: 2, items: [1, 2, 3] }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("returns empty output when ItemBatcher is configured and there are no items", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemBatcher: { + MaxItemsPerBatch: 2, + }, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const result = await executeStateMachine( + definition, + getVariables(definition, []), + getSimulatorContext() + ); + + expect(result).toEqual([]); + }); + }); + + describe("ToleratedFailure*", () => { + test("fails with States.ExceedToleratedFailureThreshold when tolerated failures are exceeded", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when tolerated failure percentage is exceeded", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailurePercentage: 40, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: true }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + + test("returns failed iteration error payloads when failures are tolerated", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.shouldFail) { + throw new Error("boom"); + } + return { id: input.id, ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [ + { id: 1, shouldFail: false }, + { id: 2, shouldFail: true }, + { id: 3, shouldFail: false }, + ]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ id: 1, ok: true }); + expect(result[1]).toMatchObject({ + Error: "States.TaskFailed", + Cause: expect.any(String), + }); + expect(result[1].Cause).toContain("boom"); + expect(result[2]).toEqual({ id: 3, ok: true }); + }); + + test("throws original task failure when items fail and no tolerated failure threshold is configured", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1, 2]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ name: "States.TaskFailed" }); + }); + + test("fails when ToleratedFailureCount is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: -1, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailurePercentage is invalid", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailurePercentage: 120, + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [1]), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("fails when ToleratedFailureCountPath resolves to undefined", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$.items", + ToleratedFailureCountPath: "$.missingThreshold", + ItemProcessor: { + StartAt: "ReturnInput", + States: { + ReturnInput: { + Type: "Pass", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, { items: [1] }), + getSimulatorContext() + ) + ).rejects.toMatchObject({ name: "RuntimeError" }); + }); + + test("does not exceed tolerated failure count when failed item count equals threshold", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "MaybeFail", + States: { + MaybeFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:maybe-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "maybe-fail", + function: (input) => { + if (input.fail) { + throw new Error("boom"); + } + return { ok: true }; + }, + }, + ]; + + const result = await executeStateMachine( + definition, + getVariables(definition, [{ fail: true }, { fail: false }]), + getSimulatorContext({ resources }) + ); + + expect(result).toHaveLength(2); + expect(result[0]).toMatchObject({ Error: "States.TaskFailed" }); + expect(result[1]).toEqual({ ok: true }); + }); + + test("fails with States.ExceedToleratedFailureThreshold when failure count is above threshold", async () => { + const definition = { + QueryLanguage: "JSONPath", + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemsPath: "$", + ToleratedFailureCount: 1, + ItemProcessor: { + StartAt: "AlwaysFail", + States: { + AlwaysFail: { + Type: "Task", + Resource: "arn:aws:lambda:::function:always-fail", + End: true, + }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "lambda", + name: "always-fail", + function: () => { + throw new Error("boom"); + }, + }, + ]; + + await expect(() => + executeStateMachine( + definition, + getVariables(definition, [{ id: 1 }, { id: 2 }]), + getSimulatorContext({ resources }) + ) + ).rejects.toMatchObject({ + name: "States.ExceedToleratedFailureThreshold", + }); + }); + }); + + // Statelint (@wmfs/statelint) does not yet recognise ItemReader, ItemBatcher, + // ProcessorConfig, or ToleratedFailure*. Until it does, callers must use + // validateDefinition: false to opt out. + describe("load() integration with validateDefinition: false", () => { + test("end-to-end Map with ItemReader, ItemBatcher and ToleratedFailure*", async () => { + const definition = { + StartAt: "MapStep", + States: { + MapStep: { + Type: "Map", + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + Parameters: { Bucket: "input-bucket", Key: "items.json" }, + ReaderConfig: { InputType: "JSON" }, + }, + ItemBatcher: { MaxItemsPerBatch: 2 }, + ToleratedFailureCount: 0, + ToleratedFailurePercentage: 100, + ItemProcessor: { + StartAt: "EchoBatch", + ProcessorConfig: { Mode: "DISTRIBUTED", ExecutionType: "STANDARD" }, + States: { + EchoBatch: { Type: "Pass", End: true }, + }, + }, + End: true, + }, + }, + }; + + const resources = [ + { + service: "s3", + name: "input-bucket", + objects: [ + { key: "items.json", body: JSON.stringify([{ id: 1 }, { id: 2 }, { id: 3 }]) }, + ], + }, + ]; + + const stateMachine = load(definition, resources, { + validateDefinition: false, + }); + const result = await stateMachine.execute({}); + + expect(result).toEqual([ + { Items: [{ id: 1 }, { id: 2 }] }, + { Items: [{ id: 3 }] }, + ]); + }); + }); +});