Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
root = true

[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true

[*.{ts,js,json,md,yml,yaml}]
indent_style = space
indent_size = 4

[*.md]
trim_trailing_whitespace = false

[Makefile]
indent_style = tab
156 changes: 156 additions & 0 deletions examples/promise-pool-example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { PromisePool, PoolTimeoutError } from "bytekit/async";

// ---------------------------------------------------------------------------
// Example 1: Concurrency-limited batch processing
//
// Process a large batch of items with at most 3 concurrent operations.
// Results are returned in the same order as the input array.
// ---------------------------------------------------------------------------
async function example1_concurrencyLimit() {
const ids = Array.from({ length: 10 }, (_, i) => i + 1);
const pool = new PromisePool({ concurrency: 3 });

const results = await pool.run(
ids.map((id) => async () => {
// Simulate variable-length work
await new Promise((r) => setTimeout(r, Math.random() * 100));
return { id, status: "done" };
})
);

console.log("Processed:", results);
// → [{ id: 1, status: "done" }, { id: 2, status: "done" }, ...]
}

// ---------------------------------------------------------------------------
// Example 2: Per-task timeouts with graceful error handling
//
// Each task gets its own independent timeout. When a task times out or throws,
// `onError` is called and the pool continues running the remaining tasks.
// ---------------------------------------------------------------------------
async function example2_timeoutAndOnError() {
const endpoints = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
];

const errors: Array<{ index: number; message: string }> = [];

const pool = new PromisePool({
concurrency: 2,
timeout: 5000, // 5s per task
onError(error, taskIndex) {
if (error instanceof PoolTimeoutError) {
errors.push({ index: taskIndex, message: `Timed out: ${error.message}` });
} else {
errors.push({ index: taskIndex, message: error.message });
}
},
});

const results = await pool.run(
endpoints.map((url) => () => fetch(url).then((r) => r.json()))
);

console.log("Results:", results);
if (errors.length > 0) {
console.warn("Errors encountered:", errors);
}
}

// ---------------------------------------------------------------------------
// Example 3: Pool reuse across multiple batches
//
// PromisePool is stateful and can be reused. The concurrency limit applies
// per `run()` call — useful when tasks arrive in chunks.
// ---------------------------------------------------------------------------
async function example3_poolReuse() {
const pool = new PromisePool({ concurrency: 5 });

async function fetchItem(id: number) {

Check warning on line 71 in examples/promise-pool-example.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Move async function 'fetchItem' to the outer scope.

See more on https://sonarcloud.io/project/issues?id=sebamar88_bytekit&issues=AZ02tvSuwdmNkoppSz4s&open=AZ02tvSuwdmNkoppSz4s&pullRequest=15
const res = await fetch(`https://jsonplaceholder.typicode.com/todos/${id}`);
return res.json() as Promise<{ id: number; title: string }>;
}

// First batch
const batch1 = await pool.run([1, 2, 3].map((id) => () => fetchItem(id)));

// Pool is fully reusable — second batch starts fresh
const batch2 = await pool.run([4, 5, 6].map((id) => () => fetchItem(id)));

console.log("All items:", [...batch1, ...batch2]);
}

// ---------------------------------------------------------------------------
// Example 4: ApiClient integration (pool option)
//
// When `pool` is set in ApiClientConfig, every `request()` call is
// automatically routed through the pool, limiting concurrent HTTP calls.
// ---------------------------------------------------------------------------
async function example4_apiClientIntegration() {
// Uncomment after importing ApiClient:
//
// import { ApiClient } from "bytekit";
//
// const api = new ApiClient({
// baseUrl: "https://api.example.com",
// pool: { concurrency: 2, timeout: 5000 },
// });
//
// // Up to 2 of these requests run at a time, even though we fire all 4
// const [users, posts, comments, tags] = await Promise.all([
// api.request<User[]>("/users"),
// api.request<Post[]>("/posts"),
// api.request<Comment[]>("/comments"),
// api.request<Tag[]>("/tags"),
// ]);

console.log("ApiClient + PromisePool: see comment above for usage.");
}

// ---------------------------------------------------------------------------
// Example 5: PromisePool vs parallel() — when to use each
//
// Use `parallel()` for one-shot lists.
// Use `PromisePool` when you need reuse, per-task timeouts, or onError hooks.
// ---------------------------------------------------------------------------
async function example5_vsParallel() {
// parallel() — simple, functional, one-shot:
// const results = await parallel(tasks, { concurrency: 3 });

// PromisePool — class-based, reusable, richer options:
const pool = new PromisePool({
concurrency: 3,
timeout: 2000,
onError: (err, idx) => console.warn(`Task ${idx} failed:`, err.message),
});

const tasks = [1, 2, 3, 4, 5].map((n) => async () => n * 2);
const results = await pool.run(tasks);
console.log("PromisePool results:", results); // [2, 4, 6, 8, 10]

// Pool is ready to run another batch immediately
const moreResults = await pool.run([6, 7, 8].map((n) => async () => n * 2));
console.log("Second batch:", moreResults); // [12, 14, 16]
}

// ---------------------------------------------------------------------------
// Run all examples
// ---------------------------------------------------------------------------
(async () => {
console.log("=== Example 1: Concurrency Limit ===");
await example1_concurrencyLimit();

console.log("\n=== Example 2: Timeout + onError ===");
await example2_timeoutAndOnError();

console.log("\n=== Example 3: Pool Reuse ===");
await example3_poolReuse();

console.log("\n=== Example 4: ApiClient Integration ===");
await example4_apiClientIntegration();

console.log("\n=== Example 5: PromisePool vs parallel() ===");
await example5_vsParallel();
})().catch(console.error);

Check warning on line 156 in examples/promise-pool-example.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer top-level await over using a promise chain.

See more on https://sonarcloud.io/project/issues?id=sebamar88_bytekit&issues=AZ02tvSuwdmNkoppSz4t&open=AZ02tvSuwdmNkoppSz4t&pullRequest=15
15 changes: 15 additions & 0 deletions specs/002-parallel-queue/spec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Parallel Queue

> **⚠️ Merged**: Esta spec fue fusionada con el Batching System.
> Ver especificación completa en [004-request-queue-batching](../004-batching-system/spec.md).

## Razón de la Fusión

La cola paralela de requests y el batching system comparten el mismo dominio (gestión de requests HTTP), infraestructura (concurrencia, priorización) y punto de integración (ApiClient). Mantenerlos separados habría generado duplicación de lógica y APIs inconsistentes.

La spec fusionada cubre:

- ✅ Cola con concurrencia controlada (Parallel Queue)
- ✅ Priorización y cancelación de requests
- ✅ Agrupación inteligente por URL (Batching)
- ✅ Integración unificada con ApiClient
101 changes: 101 additions & 0 deletions specs/003-promise-pool/contracts/promise-pool.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Contract: PromisePool Public API

**Module**: `bytekit/async`
**Export**: `PromisePool`
**Phase**: 1 — Contracts
**Date**: 28 de marzo de 2026

## TypeScript Interface

```typescript
/**
* Options for configuring a PromisePool instance.
*/
export interface PromisePoolOptions {
/**
* Maximum number of tasks that can run concurrently.
* @minimum 1
*/
concurrency: number;

/**
* Optional timeout in milliseconds for each individual task.
* If a task exceeds this duration, it rejects with a PoolTimeoutError.
*/
timeout?: number;

/**
* Optional callback invoked when a task fails.
* Does NOT stop the pool — remaining tasks continue executing.
* @param error The error thrown by the failing task.
* @param index Zero-based index of the failing task in the original array.
*/
onError?: (error: Error, index: number) => void;
}

/**
* Error thrown when a task exceeds the configured timeout.
*/
export class PoolTimeoutError extends Error {
constructor(timeoutMs: number) {
super(`Task timed out after ${timeoutMs}ms`);
this.name = "PoolTimeoutError";
}
}

/**
* Executes an array of async tasks with a configurable concurrency limit.
*
* Unlike `parallel()`, PromisePool:
* - Is stateful and reusable across multiple `run()` calls.
* - Does NOT fail fast: individual task errors are isolated via `onError`.
* - Supports per-task timeouts.
*
* @example
* ```typescript
* import { PromisePool } from "bytekit/async";
*
* const pool = new PromisePool({ concurrency: 3, timeout: 5000 });
*
* const results = await pool.run([
* () => fetch("/api/1").then(r => r.json()),
* () => fetch("/api/2").then(r => r.json()),
* () => fetch("/api/3").then(r => r.json()),
* ]);
* ```
*/
export class PromisePool {
constructor(options: PromisePoolOptions);

/**
* Runs an array of task factory functions with concurrency control.
* Tasks are lazy — they are not started until the pool has a free slot.
*
* @param tasks Array of functions that return Promises.
* @returns Promise resolving to an array of results in original order.
* If any task rejects and no `onError` is provided, the
* returned Promise rejects with that error.
* @throws TypeError If `tasks` is not an array.
*/
run<T>(tasks: Array<() => Promise<T>>): Promise<T[]>;
}
```

## Behaviour Contracts

| Scenario | Expected Behaviour |
| -------- | ------------------ |
| Empty `tasks` array | Resolves immediately with `[]` |
| `concurrency = 1` | Tasks execute sequentially |
| `concurrency >= tasks.length` | Equivalent to `Promise.all` |
| Task throws/rejects | `onError` called; promise for that task rejects; pool continues |
| Task exceeds `timeout` | Rejects with `PoolTimeoutError`; pool continues |
| `concurrency < 1` in constructor | Throws `TypeError` synchronously |
| Non-array passed to `run()` | Throws `TypeError` synchronously |

## Exported from `bytekit/async`

```typescript
export { PromisePool, PoolTimeoutError } from "./promise-pool.js";
export type { PromisePoolOptions } from "./promise-pool.js";
```
67 changes: 67 additions & 0 deletions specs/003-promise-pool/data-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Data Model: Promise Pool

**Feature**: `003-promise-pool`
**Phase**: 1 — Design & Contracts
**Date**: 28 de marzo de 2026

## Entities

### `PromisePoolOptions`

Configuración de la instancia del pool.

| Campo | Tipo | Requerido | Descripción |
| ----- | ---- | --------- | ----------- |
| `concurrency` | `number` | ✅ | Máximo de tareas simultáneas. Mínimo: 1. |
| `timeout` | `number` | ❌ | Timeout en ms por task. Si se excede, la task falla con `PoolTimeoutError`. |
| `onError` | `(error: Error, taskIndex: number) => void` | ❌ | Callback invocado cuando una task falla. No detiene el pool. |

**Validation rules**:

- `concurrency < 1` → `TypeError("concurrency must be at least 1")`
- `timeout <= 0` → `TypeError("timeout must be a positive number")`

---

### `QueueItem<T>` *(interno)*

Representa una task encolada esperando ejecución.

| Campo | Tipo | Descripción |
| ----- | ---- | ----------- |
| `task` | `() => Promise<unknown>` | Factory function de la tarea. |
| `resolve` | `(value: unknown) => void` | Resolver de la promesa de control. |
| `reject` | `(reason: unknown) => void` | Rejecter de la promesa de control. |
| `index` | `number` | Índice original en el array de input (para mantener orden). |

---

### `PromisePool` *(clase)*

Gestor del pool con estado.

| Propiedad | Tipo | Descripción |
| --------- | ---- | ----------- |
| `options` | `PromisePoolOptions` | Configuración inmutable de la instancia. |
| `running` | `number` | Contador de tasks actualmente en ejecución. |
| `queue` | `QueueItem[]` | Cola FIFO de tasks pendientes. |

**State transitions**:

```text
IDLE ──── run(tasks) ──── PROCESSING ──── all tasks done ──── IDLE
task starts │ task ends
running++ → running-- → dequeue next
```

---

## Error Types

| Error | Cuándo | Tipo |
| ----- | ------ | ---- |
| `TypeError` | `concurrency < 1` o `tasks` no es array | Construcción / `run()` |
| `PoolTimeoutError extends Error` | Task supera `timeout` ms | Durante ejecución |
| Error original de la task | Task rechaza | Re-lanzado via `onError` + reject individual |
Loading
Loading