Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion src/domain/entities/generic/Future.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class Future<E, D> {

static sequentialWithAccumulation<E, D>(
futures: Array<Future<E, D>>,
options: { stopOnError?: boolean } = {}
options: SequentialWithAccumulationOptions = {}
): Future<never, SequentialAccumulatedData<E, D>> {
const { stopOnError = false } = options;
const processSequentially = (
Expand Down Expand Up @@ -203,6 +203,60 @@ export class Future<E, D> {
return processSequentially(futures);
}

static parallelWithAccumulation<E, D>(
futures: Array<Future<E, D>>,
options: ParallelWithAccumulationOptions = {}
): Future<never, ParallelAccumulatedData<E, D>> {
const { concurrency = 10, stopOnError = true } = options;

const toParallelResult = (future: Future<E, D>): Future<never, ParallelResult<E, D>> => {
return future
.map<ParallelResult<E, D>>(data => ({ type: "success", data }))
.mapError<ParallelResult<E, D>>(error => ({ type: "error", error }))
.flatMapError(errorResult =>
Future.success<never, ParallelResult<E, D>>(errorResult)
);
};

const processInParallel = (
pendingFutures: Array<Future<E, D>>,
accumulatedData: D[] = []
): Future<never, ParallelAccumulatedData<E, D>> => {
if (pendingFutures.length === 0) {
return Future.success({ type: "success", data: accumulatedData });
}

const currentBatch = pendingFutures.slice(0, concurrency);
const remainingFutures = pendingFutures.slice(concurrency);

return Future.parallel(currentBatch.map(toParallelResult), {
concurrency: concurrency,
}).flatMap(batchResults => {
const successfulData = batchResults.flatMap(result =>
result.type === "success" ? [result.data] : []
);

const batchErrors = batchResults.flatMap(result =>
result.type === "error" ? [result.error] : []
);

const nextAccumulatedData = [...accumulatedData, ...successfulData];

if (batchErrors.length > 0 && stopOnError) {
return Future.success({
type: "error",
errors: batchErrors,
data: nextAccumulatedData,
});
}

return processInParallel(remainingFutures, nextAccumulatedData);
});
};

return processInParallel(futures);
}

static fromPromise<Data>(promise: Promise<Data>): FutureData<Data> {
return Future.fromComputation((resolve, reject) => {
promise.then(resolve).catch(err => reject(err ? err.message : "Unknown error"));
Expand All @@ -215,6 +269,12 @@ export type SequentialAccumulatedData<E, D> =
| { type: "success"; data: D[] }
| { type: "error"; error: E; data: D[] };

export type ParallelAccumulatedData<E, D> =
| { type: "success"; data: D[] }
| { type: "error"; errors: E[]; data: D[] };

type ParallelResult<E, D> = { type: "success"; data: D } | { type: "error"; error: E };

export type Cancel = (() => void) | undefined;

interface CaptureAsync<E> {
Expand All @@ -224,6 +284,10 @@ interface CaptureAsync<E> {

type ParallelOptions = { concurrency: number };

type SequentialWithAccumulationOptions = { stopOnError?: boolean };

type ParallelWithAccumulationOptions = { concurrency?: number; stopOnError?: boolean };

/* Example of how use Future.fromComputation */
export function getJSON<U>(url: string): Future<TypeError | SyntaxError, U> {
const abortController = new AbortController();
Expand Down
Loading
Loading