diff --git a/src/domain/entities/generic/Future.ts b/src/domain/entities/generic/Future.ts index 2855a0c..923d196 100644 --- a/src/domain/entities/generic/Future.ts +++ b/src/domain/entities/generic/Future.ts @@ -169,7 +169,7 @@ export class Future { static sequentialWithAccumulation( futures: Array>, - options: { stopOnError?: boolean } = {} + options: SequentialWithAccumulationOptions = {} ): Future> { const { stopOnError = false } = options; const processSequentially = ( @@ -203,6 +203,60 @@ export class Future { return processSequentially(futures); } + static parallelWithAccumulation( + futures: Array>, + options: ParallelWithAccumulationOptions = {} + ): Future> { + const { concurrency = 10, stopOnError = true } = options; + + const toParallelResult = (future: Future): Future> => { + return future + .map>(data => ({ type: "success", data })) + .mapError>(error => ({ type: "error", error })) + .flatMapError(errorResult => + Future.success>(errorResult) + ); + }; + + const processInParallel = ( + pendingFutures: Array>, + accumulatedData: D[] = [] + ): Future> => { + if (pendingFutures.length === 0) { + return Future.success({ type: "success", data: accumulatedData }); + } + + const currentBatch = pendingFutures.slice(0, concurrency); + const remainingFutures = pendingFutures.slice(concurrency); + + return Future.parallel(currentBatch.map(toParallelResult), { + concurrency: concurrency, + }).flatMap(batchResults => { + const successfulData = batchResults.flatMap(result => + result.type === "success" ? [result.data] : [] + ); + + const batchErrors = batchResults.flatMap(result => + result.type === "error" ? [result.error] : [] + ); + + const nextAccumulatedData = [...accumulatedData, ...successfulData]; + + if (batchErrors.length > 0 && stopOnError) { + return Future.success({ + type: "error", + errors: batchErrors, + data: nextAccumulatedData, + }); + } + + return processInParallel(remainingFutures, nextAccumulatedData); + }); + }; + + return processInParallel(futures); + } + static fromPromise(promise: Promise): FutureData { return Future.fromComputation((resolve, reject) => { promise.then(resolve).catch(err => reject(err ? err.message : "Unknown error")); @@ -215,6 +269,12 @@ export type SequentialAccumulatedData = | { type: "success"; data: D[] } | { type: "error"; error: E; data: D[] }; +export type ParallelAccumulatedData = + | { type: "success"; data: D[] } + | { type: "error"; errors: E[]; data: D[] }; + +type ParallelResult = { type: "success"; data: D } | { type: "error"; error: E }; + export type Cancel = (() => void) | undefined; interface CaptureAsync { @@ -224,6 +284,10 @@ interface CaptureAsync { type ParallelOptions = { concurrency: number }; +type SequentialWithAccumulationOptions = { stopOnError?: boolean }; + +type ParallelWithAccumulationOptions = { concurrency?: number; stopOnError?: boolean }; + /* Example of how use Future.fromComputation */ export function getJSON(url: string): Future { const abortController = new AbortController(); diff --git a/src/domain/entities/generic/__tests/Future.spec.ts b/src/domain/entities/generic/__tests/Future.spec.ts new file mode 100644 index 0000000..f8726ab --- /dev/null +++ b/src/domain/entities/generic/__tests/Future.spec.ts @@ -0,0 +1,390 @@ +import { describe, expect, test, it, vi, expectTypeOf } from "vitest"; +import { Future, ParallelAccumulatedData, SequentialAccumulatedData } from "../Future"; + +describe("Basic builders", () => { + test("Future.success", async () => { + const value$ = Future.success(10); + + expectTypeOf(value$).toEqualTypeOf>(); + await expectAsync(value$, { toEqual: 10 }); + }); + + test("Future.error", async () => { + const error = new CodedError("message: Error 1", { code: "E001" }); + const value$ = Future.error(error); + + expectTypeOf(value$).toEqualTypeOf>(); + await expectAsync(value$, { toThrow: error }); + }); +}); + +describe("run", () => { + it("calls the sucess branch with the value", async () => { + const success = vi.fn(); + const reject = vi.fn(); + + Future.success(1).run(success, reject); + await nextTick(); + + expect(success).toHaveBeenCalledTimes(1); + expect(success.mock.calls[0]).toEqual([1]); + expect(reject).not.toHaveBeenCalled(); + }); + + it("calls the error branch with the error", async () => { + const success = vi.fn(); + const reject = vi.fn(); + + const async = Future.error({ errorCode: "E12" }); + async.run(success, reject); + await nextTick(); + + expect(success).not.toHaveBeenCalled(); + expect(reject).toHaveBeenCalledTimes(1); + const error = reject.mock.calls[0]?.[0]; + expect(error).toEqual({ errorCode: "E12" }); + }); +}); + +describe("toPromise", () => { + it("converts a successful Async to promise", async () => { + await expect(Future.success(1).toPromise()).resolves.toEqual(1); + }); + + it("converts an error Async to promise", async () => { + await expect(Future.error(new Error("message")).toPromise()).rejects.toThrow( + new Error("message") + ); + }); +}); + +describe("helpers", () => { + test("Future.sleep", async () => { + await expectAsync(Future.sleep(1), { toEqual: 1 }); + }); + + test("Future.void", async () => { + await expectAsync(Future.void(), { toEqual: undefined }); + }); +}); + +describe("Transformations", () => { + test("map", async () => { + const value1$ = Future.success(1); + const value2$ = value1$.map(x => x.toString()); + + await expectAsync(value2$, { toEqual: "1" }); + }); + + test("mapError", async () => { + const value1$ = Future.error(1); + const value2$ = value1$.mapError(x => x.toString()); + expectTypeOf(value2$).toEqualTypeOf>(); + + await expectAsync(value2$, { toThrow: "1" }); + }); + + describe("flatMapError", () => { + it("maps an error to a successful Future", async () => { + const value1$ = Future.error(1); + const value2$ = value1$.flatMapError(x => Future.success(x.toString())); + expectTypeOf(value2$).toEqualTypeOf>(); + + await expectAsync(value2$, { toEqual: "1" }); + }); + + it("maps an error to another error Future", async () => { + const value3$ = Future.error(1); + const value4$ = value3$.flatMapError(x => Future.error(x.toString())); + expectTypeOf(value4$).toEqualTypeOf>(); + + await expectAsync(value4$, { toThrow: "1" }); + }); + }); + + describe("flatMap/chain", () => { + it("builds an async value mapping to another async", async () => { + const value$ = Future.success(1) + .chain(value => Future.success(value + 2)) + .flatMap(value => Future.success(value + 3)) + .flatMap(value => Future.success(value + 4)); + + await expectAsync(value$, { toEqual: 10 }); + }); + }); +}); + +describe("Future.block", () => { + describe("when all awaited values in the block are successful", () => { + it("returns the returned value as an async", async () => { + const result$ = Future.block(async $ => { + const value1 = await $(Future.success(1)); + const value2 = await $(Future.success("2")); + const value3 = await $(Future.success(3)); + return value1 + parseInt(value2) + value3; + }); + + await expectAsync(result$, { toEqual: 6 }); + }); + }); + + describe("when any the awaited values in the block is an error", () => { + it("returns that error as the async result", async () => { + const result$ = Future.block(async $ => { + const value1 = await $(Future.success(1)); + const value2 = await $(Future.error("message") as Future); + const value3 = await $(Future.success(3)); + return value1 + value2 + value3; + }); + + await expectAsync(result$, { toThrow: "message" }); + }); + }); + + describe("when any the awaited values in the block is an error", () => { + it("returns that error as the async result", async () => { + const result$ = Future.block_()(async $ => { + const value1 = await $(Future.success(1)); + const value2 = await $(Future.error("message") as Future); + const value3 = await $(Future.success(3)); + return value1 + value2 + value3; + }); + + await expectAsync(result$, { toThrow: "message" }); + }); + }); + + describe("when the helper $.error is called", () => { + it("returns that async error as the async result", async () => { + const value1 = 1; + const double = vi.fn((x: number) => x); + + const result$ = Future.block_()(async $ => { + if (value1 > 0) $.throw(new Error("message")); + const value = await $(Future.success(double(1))); + return value; + }); + + await expectAsync(result$, { toThrow: new Error("message") }); + expect(double).not.toHaveBeenCalled(); + }); + }); +}); + +describe("fromComputation", () => { + describe("for a successful computation", () => { + it("return a success async", async () => { + const value$ = Future.fromComputation((resolve, _reject) => { + resolve(1); + return () => {}; + }); + + await expectAsync(value$, { toEqual: 1 }); + }); + }); + + describe("for an unsuccessful computation", () => { + it("return an error async", async () => { + const value$ = Future.fromComputation((_resolve, reject) => { + reject("message"); + return () => {}; + }); + + await expectAsync(value$, { toThrow: "message" }); + }); + }); +}); + +describe("cancel", () => { + it("cancels the async and the error branch is not called", async () => { + const success = vi.fn(); + const reject = vi.fn(); + + const cancel = Future.sleep(1).run(success, reject); + cancel?.(); + await nextTick(); + + expect(success).not.toHaveBeenCalled(); + expect(reject).toHaveBeenCalledTimes(0); + }); +}); + +describe("join2", () => { + it("returns a single async with the pair of values", async () => { + const join$ = Future.join2(Future.success(123), Future.success("hello")); + + expectTypeOf(join$).toEqualTypeOf>(); + await expectAsync(join$, { toEqual: [123, "hello"] }); + }); + + it("returns an error if some of the inputs is an error", async () => { + const join$ = Future.join2(Future.success(123), Future.error("Some error")); + + expectTypeOf(join$).toEqualTypeOf>(); + await expectAsync(join$, { toThrow: "Some error" }); + }); +}); + +describe("joinObj", () => { + it("returns an async with the object of values", async () => { + const join$ = Future.joinObj({ + n: Future.success(123), + s: Future.success("hello"), + }); + + await expectAsync(join$, { + toEqual: { n: 123, s: "hello" }, + }); + }); + + it("returns an error if some of the inputs is an error", async () => { + const join$ = Future.joinObj({ + n: Future.success(123) as Future, + s: Future.error("Some error") as Future, + }); + expectTypeOf(join$).toEqualTypeOf>(); + + await expectAsync(join$, { toThrow: "Some error" }); + }); +}); + +describe("sequential", () => { + it("returns an async containing all the values as an array", async () => { + const values$ = Future.sequential([ + Future.success(1), + Future.success(2), + Future.success(3), + ]); + await expectAsync(values$, { toEqual: [1, 2, 3] }); + }); +}); + +describe("parallel", async () => { + test("concurrency smaller than length", async () => { + const asyncs = [Future.sleep(3), Future.sleep(1), Future.sleep(2)]; + const values$ = Future.parallel(asyncs, { concurrency: 2 }); + await expectAsync(values$, { toEqual: [3, 1, 2] }); + }); + + test("concurrency larger than length", async () => { + const asyncs = [Future.sleep(3), Future.sleep(1), Future.sleep(2)]; + const values$ = Future.parallel(asyncs, { concurrency: 4 }); + await expectAsync(values$, { toEqual: [3, 1, 2] }); + }); +}); + +describe("sequentialWithAccumulation", () => { + it("if there is no error, it returns an async containing all the accumulated values as an array", async () => { + const $futuresArray = [Future.success(1), Future.success(2), Future.success(3)]; + + const values$ = Future.sequentialWithAccumulation($futuresArray); + const expected: SequentialAccumulatedData = { + type: "success", + data: [1, 2, 3], + }; + + await expectAsync(values$, { toEqual: expected }); + }); + + it("if there is an error in any Future, it continues and returns an async containing all the other accumulated values as an array", async () => { + const $futuresArray = [Future.success(1), Future.error("error"), Future.success(3)]; + + const values$ = Future.sequentialWithAccumulation($futuresArray); + const expected: SequentialAccumulatedData = { + type: "success", + data: [1, 3], + }; + + await expectAsync(values$, { toEqual: expected }); + }); + + it("if there is an error in some Future and the option stopOnError is enabled, it returns the error and an async containing all accumulated values as an array until the error ocurrs", async () => { + const $futuresArray = [ + Future.success(1), + Future.success(2), + Future.error("error"), + Future.success(4), + ]; + + const values$ = Future.sequentialWithAccumulation($futuresArray, { stopOnError: true }); + const expected: SequentialAccumulatedData = { + type: "error", + data: [1, 2], + error: "error", + }; + + await expectAsync(values$, { toEqual: expected }); + }); +}); + +describe("parallelWithAccumulation", () => { + it("if there is no error, it returns an async containing all the accumulated values as an array", async () => { + const $futuresArray = [Future.success(1), Future.success(2), Future.success(3)]; + + const values$ = Future.parallelWithAccumulation($futuresArray); + const expected: ParallelAccumulatedData = { + type: "success", + data: [1, 2, 3], + }; + + await expectAsync(values$, { toEqual: expected }); + }); + + it("if there is an error in any Future, it continues and returns an async containing all the other accumulated values as an array", async () => { + const $futuresArray = [Future.success(1), Future.error("error"), Future.success(3)]; + + const values$ = Future.parallelWithAccumulation($futuresArray); + const expected: ParallelAccumulatedData = { + type: "error", + data: [1, 3], + errors: ["error"], + }; + + await expectAsync(values$, { toEqual: expected }); + }); + + it("if an error occurs, it completes the current batch, accumulates all successful results from it, and terminates before starting the next batch", async () => { + const $futuresArray = [ + Future.error("error"), + Future.success(2), + Future.success(3), + Future.success(4), + ]; + + const values$ = Future.parallelWithAccumulation($futuresArray, { + stopOnError: true, + concurrency: 2, + }); + const expected: ParallelAccumulatedData = { + type: "error", + data: [2], + errors: ["error"], + }; + + await expectAsync(values$, { toEqual: expected }); + }); +}); + +function nextTick() { + return new Promise(process.nextTick); +} + +export async function expectAsync( + value$: Future, + options: { toEqual: D; toThrow?: undefined } | { toEqual?: undefined; toThrow: E } +): Promise { + if ("toEqual" in options) { + await expect(value$.toPromise()).resolves.toEqual(options.toEqual); + } else { + await expect(value$.toPromise()).rejects.toMatchObject(options.toThrow as any); + } +} + +class CodedError extends Error { + code: string; + + constructor(message: string, data: { code: string }) { + super(message); + this.code = data.code; + } +}