diff --git a/README.md b/README.md index 9bac3e5..471ee9b 100644 --- a/README.md +++ b/README.md @@ -58,12 +58,13 @@ const consumer = new SQSConsumer({ | destroySigner | boolean | false | Whether to destroy the signer when the consumer is destroyed. | ### ConsumerOptions -| Option | Type | Default | Description | -|-------------------------|----------|---------|------------------------------------------------------------| -| visibilityTimeout | number | 30 | The visibility timeout for the messages in seconds | -| waitTimeSeconds | number | 20 | The wait time for the receiveMessage call in seconds | -| itemsPerRequest | number | 10 | The maximum number of messages to be received at once | -| messageAttributeNames | string[] | [] | The message attribute names to be included in the response | +| Option | Type | Default | Description | +|-----------------------|----------|---------|------------------------------------------------------------| +| visibilityTimeout | number | 30 | The visibility timeout for the messages in seconds | +| waitTimeSeconds | number | 20 | The wait time for the receiveMessage call in seconds | +| itemsPerRequest | number | 10 | The maximum number of messages to be received at once | +| messageAttributeNames | string[] | [] | The message attribute names to be included in the response | +| attributeNames | string[] | [] | The attribute names to be included in the response | ### Hooks | Option | Type | Description | diff --git a/src/hooks.ts b/src/hooks.ts index 758d89b..0870273 100644 --- a/src/hooks.ts +++ b/src/hooks.ts @@ -131,7 +131,11 @@ export class Hooks { ...args: any[] ) { for (const fn of this.hooks[hookSymbol.S]) { - message = await fn(message, ...args); + const returnMessage = await fn(message, ...args); + + if (returnMessage) { + message = returnMessage; + } } return message; } @@ -143,7 +147,7 @@ export class Hooks { ) { for (const fn of this.hooks[hookSymbol.S]) { const continueLoop = await fn(...args); - if (!continueLoop) { + if (continueLoop === false) { return false; } } diff --git a/src/index.ts b/src/index.ts index 1bcfee9..3bd2965 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,20 +31,40 @@ type ConsumerOptions = { waitTimeSeconds?: number; itemsPerRequest?: number; messageAttributeNames?: string[]; + attributeNames?: string[]; }; -type HooksOptions = { - onPoll?: (messages: Message[]) => Promise; - onMessage?: (message: Message) => Promise; - onHandlerSuccess?: (message: Message) => Promise; - onHandlerTimeout?: (message: Message) => Promise; - onHandlerError?: (message: Message, error: Error) => Promise; - onSuccess?: (message: Message) => Promise; +type hookReturnMessages = + | Promise + | Message[] + | Promise + | void + | undefined; +type hookReturnMessage = + | Promise + | Message + | Promise + | void + | undefined; +type hookReturnBoolean = + | Promise + | boolean + | Promise + | void + | undefined; + +export type HooksOptions = { + onPoll?: (messages: Message[]) => hookReturnMessages; + onMessage?: (message: Message) => hookReturnMessage; + onHandlerSuccess?: (message: Message) => hookReturnMessage; + onHandlerTimeout?: (message: Message) => hookReturnBoolean; + onHandlerError?: (message: Message, error: Error) => hookReturnBoolean; + onSuccess?: (message: Message) => hookReturnBoolean; onError?: ( hook: HookName, message: Message, error: Error, - ) => Promise; + ) => hookReturnBoolean; onSQSError?: (error: Error, message?: Message) => Promise; }; @@ -99,6 +119,7 @@ export class SQSConsumer { itemsPerRequest: options.consumerOptions?.itemsPerRequest ?? 10, messageAttributeNames: options.consumerOptions?.messageAttributeNames ?? [], + attributeNames: options.consumerOptions?.attributeNames ?? [], }; this.sqsClient = options.clientOptions?.sqsClient ?? @@ -158,7 +179,7 @@ export class SQSConsumer { } else { handlerResult = await handler(message); } - await Promise.all([this.hooks.runHook("onHandlerSuccess", message)]); + await this.hooks.runHook("onHandlerSuccess", message); return handlerResult; } catch (error) { if (error instanceof TimeoutError) { @@ -237,6 +258,7 @@ export class SQSConsumer { MaxNumberOfMessages: this.consumerOptions.itemsPerRequest, VisibilityTimeout: visibilityTimeout, MessageAttributeNames: this.consumerOptions.messageAttributeNames, + AttributeNames: this.consumerOptions.attributeNames, }, ); /* c8 ignore next 1 */ @@ -303,42 +325,46 @@ export class SQSConsumer { } } + /* c8 ignore next 3 */ + public get isRunning() { + return this.running; + } + public addHook( hookName: "onPoll", value: (messages: Message[]) => Promise, - ): void; + ): this; public addHook( hookName: "onMessage", value: (message: Message) => Promise, - ): void; - // eslint-disable-next-line @typescript-eslint/unified-signatures + ): this; public addHook( hookName: "onHandlerSuccess", value: (message: Message) => Promise, - ): void; + ): this; public addHook( hookName: "onHandlerTimeout", value: (message: Message) => Promise, - ): void; + ): this; public addHook( hookName: "onHandlerError", value: (message: Message, error: Error) => Promise, - ): void; - // eslint-disable-next-line @typescript-eslint/unified-signatures + ): this; public addHook( hookName: "onSuccess", value: (message: Message) => Promise, - ): void; + ): this; public addHook( hookName: "onError", value: (hook: HookName, message: Message, error: Error) => Promise, - ): void; + ): this; public addHook( hookName: "onSQSError", value: (error: Error, message?: Message) => Promise, - ): void; - public addHook(hookName: HookName, value: Function): void; + ): this; + public addHook(hookName: HookName, value: Function): this; public addHook(hookName: HookName, value: Function) { - return this.hooks.addHook(hookName, value); + this.hooks.addHook(hookName, value); + return this; } } diff --git a/test/test/error.ts b/test/test/error.ts index 51f7bf8..9bf93cc 100644 --- a/test/test/error.ts +++ b/test/test/error.ts @@ -1,4 +1,5 @@ -// @ts-expect-error +// biome-ignore lint/suspicious/noTsIgnore: is a Test file +// @ts-ignore import "../helpers/localtest"; import { setTimeout } from "node:timers/promises"; import { Signer } from "@fgiova/aws-signature"; @@ -159,7 +160,7 @@ test("sqs-consumer class Errors", { only: true }, async (t) => { consumer.addHook("onSQSError", async (error: Error) => { try { resolve(JSON.parse(error.message).message); - } catch (e) { + } catch (_e) { resolve(error.message); } diff --git a/test/test/index.ts b/test/test/index.ts index ecd86ce..f964c96 100644 --- a/test/test/index.ts +++ b/test/test/index.ts @@ -1,4 +1,5 @@ -// @ts-expect-error +// biome-ignore lint/suspicious/noTsIgnore: is a Test file +// @ts-ignore import "../helpers/localtest"; import { clearInterval } from "node:timers"; import { setTimeout } from "node:timers/promises"; @@ -6,7 +7,8 @@ import { Signer, SignerSingleton } from "@fgiova/aws-signature"; import { type Message, MiniSQSClient } from "@fgiova/mini-sqs-client"; import { teardown, test } from "tap"; import { SQSConsumer } from "../../src/index"; -// @ts-expect-error +// biome-ignore lint/suspicious/noTsIgnore: is a Test file +// @ts-ignore import { sqsPurge } from "../helpers/sqsMessage"; const queueARN = "arn:aws:sqs:eu-central-1:000000000000:test-queue"; diff --git a/test/test/queue-hooks.ts b/test/test/queue-hooks.ts index 19f1de4..b06a0cd 100644 --- a/test/test/queue-hooks.ts +++ b/test/test/queue-hooks.ts @@ -1,4 +1,5 @@ -// @ts-expect-error +// biome-ignore lint/suspicious/noTsIgnore: is a Test file +// @ts-ignore import "../helpers/localtest"; import { setTimeout } from "node:timers/promises"; import { SignerSingleton } from "@fgiova/aws-signature"; @@ -6,7 +7,8 @@ import { type Message, MiniSQSClient } from "@fgiova/mini-sqs-client"; import { teardown, test } from "tap"; import { SQSConsumer } from "../../src"; import type { HookName } from "../../src/hooks"; -// @ts-expect-error +// biome-ignore lint/suspicious/noTsIgnore: is a Test file +// @ts-ignore import { sqsPurge } from "../helpers/sqsMessage"; const queueARN = "arn:aws:sqs:eu-central-1:000000000000:test-queue-hooks";