Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ const consumer = new SQSConsumer({
| destroySigner | boolean | false | Whether to destroy the signer when the consumer is destroyed. |

### ConsumerOptions
| Option | Type | Default | Description |
|-------------------------|----------|---------|------------------------------------------------------------|
| visibilityTimeout | number | 30 | The visibility timeout for the messages in seconds |
| waitTimeSeconds | number | 20 | The wait time for the receiveMessage call in seconds |
| itemsPerRequest | number | 10 | The maximum number of messages to be received at once |
| messageAttributeNames | string[] | [] | The message attribute names to be included in the response |
| Option | Type | Default | Description |
|-----------------------|----------|---------|------------------------------------------------------------|
| visibilityTimeout | number | 30 | The visibility timeout for the messages in seconds |
| waitTimeSeconds | number | 20 | The wait time for the receiveMessage call in seconds |
| itemsPerRequest | number | 10 | The maximum number of messages to be received at once |
| messageAttributeNames | string[] | [] | The message attribute names to be included in the response |
| attributeNames | string[] | [] | The attribute names to be included in the response |

### Hooks
| Option | Type | Description |
Expand Down
8 changes: 6 additions & 2 deletions src/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ export class Hooks {
...args: any[]
) {
for (const fn of this.hooks[hookSymbol.S]) {
message = await fn(message, ...args);
const returnMessage = await fn(message, ...args);

if (returnMessage) {
Comment thread
fgiova marked this conversation as resolved.
message = returnMessage;
}
}
return message;
}
Expand All @@ -143,7 +147,7 @@ export class Hooks {
) {
for (const fn of this.hooks[hookSymbol.S]) {
const continueLoop = await fn(...args);
if (!continueLoop) {
if (continueLoop === false) {
return false;
}
}
Expand Down
68 changes: 47 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,40 @@ type ConsumerOptions = {
waitTimeSeconds?: number;
itemsPerRequest?: number;
messageAttributeNames?: string[];
attributeNames?: string[];
};

type HooksOptions = {
onPoll?: (messages: Message[]) => Promise<Message[]>;
onMessage?: (message: Message) => Promise<Message>;
onHandlerSuccess?: (message: Message) => Promise<Message>;
onHandlerTimeout?: (message: Message) => Promise<boolean>;
onHandlerError?: (message: Message, error: Error) => Promise<boolean>;
onSuccess?: (message: Message) => Promise<boolean>;
type hookReturnMessages =
| Promise<Message[]>
| Message[]
| Promise<void>
| void
| undefined;
type hookReturnMessage =
| Promise<Message>
| Message
| Promise<void>
| void
| undefined;
type hookReturnBoolean =
| Promise<boolean>
| boolean
| Promise<void>
| void
| undefined;

export type HooksOptions = {
onPoll?: (messages: Message[]) => hookReturnMessages;
onMessage?: (message: Message) => hookReturnMessage;
onHandlerSuccess?: (message: Message) => hookReturnMessage;
onHandlerTimeout?: (message: Message) => hookReturnBoolean;
onHandlerError?: (message: Message, error: Error) => hookReturnBoolean;
onSuccess?: (message: Message) => hookReturnBoolean;
onError?: (
hook: HookName,
message: Message,
error: Error,
) => Promise<boolean>;
) => hookReturnBoolean;
onSQSError?: (error: Error, message?: Message) => Promise<void>;
};

Expand Down Expand Up @@ -99,6 +119,7 @@ export class SQSConsumer {
itemsPerRequest: options.consumerOptions?.itemsPerRequest ?? 10,
messageAttributeNames:
options.consumerOptions?.messageAttributeNames ?? [],
attributeNames: options.consumerOptions?.attributeNames ?? [],
};
this.sqsClient =
options.clientOptions?.sqsClient ??
Expand Down Expand Up @@ -158,7 +179,7 @@ export class SQSConsumer {
} else {
handlerResult = await handler(message);
}
await Promise.all([this.hooks.runHook("onHandlerSuccess", message)]);
await this.hooks.runHook("onHandlerSuccess", message);
return handlerResult;
} catch (error) {
if (error instanceof TimeoutError) {
Expand Down Expand Up @@ -237,6 +258,7 @@ export class SQSConsumer {
MaxNumberOfMessages: this.consumerOptions.itemsPerRequest,
VisibilityTimeout: visibilityTimeout,
MessageAttributeNames: this.consumerOptions.messageAttributeNames,
AttributeNames: this.consumerOptions.attributeNames,
},
);
/* c8 ignore next 1 */
Expand Down Expand Up @@ -303,42 +325,46 @@ export class SQSConsumer {
}
}

/* c8 ignore next 3 */
public get isRunning() {
return this.running;
}

public addHook(
hookName: "onPoll",
value: (messages: Message[]) => Promise<Message[]>,
): void;
): this;
public addHook(
hookName: "onMessage",
value: (message: Message) => Promise<Message>,
): void;
// eslint-disable-next-line @typescript-eslint/unified-signatures
): this;
public addHook(
hookName: "onHandlerSuccess",
value: (message: Message) => Promise<Message>,
): void;
): this;
public addHook(
hookName: "onHandlerTimeout",
value: (message: Message) => Promise<boolean>,
): void;
): this;
public addHook(
hookName: "onHandlerError",
value: (message: Message, error: Error) => Promise<boolean>,
): void;
// eslint-disable-next-line @typescript-eslint/unified-signatures
): this;
public addHook(
hookName: "onSuccess",
value: (message: Message) => Promise<boolean>,
): void;
): this;
public addHook(
hookName: "onError",
value: (hook: HookName, message: Message, error: Error) => Promise<boolean>,
): void;
): this;
public addHook(
hookName: "onSQSError",
value: (error: Error, message?: Message) => Promise<void>,
): void;
public addHook(hookName: HookName, value: Function): void;
): this;
public addHook(hookName: HookName, value: Function): this;
public addHook(hookName: HookName, value: Function) {
return this.hooks.addHook(hookName, value);
this.hooks.addHook(hookName, value);
return this;
}
}
5 changes: 3 additions & 2 deletions test/test/error.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @ts-expect-error
// biome-ignore lint/suspicious/noTsIgnore: is a Test file
// @ts-ignore
import "../helpers/localtest";
import { setTimeout } from "node:timers/promises";
import { Signer } from "@fgiova/aws-signature";
Expand Down Expand Up @@ -159,7 +160,7 @@ test("sqs-consumer class Errors", { only: true }, async (t) => {
consumer.addHook("onSQSError", async (error: Error) => {
try {
resolve(JSON.parse(error.message).message);
} catch (e) {
} catch (_e) {
resolve(error.message);
}

Expand Down
6 changes: 4 additions & 2 deletions test/test/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// @ts-expect-error
// biome-ignore lint/suspicious/noTsIgnore: is a Test file
// @ts-ignore
import "../helpers/localtest";
import { clearInterval } from "node:timers";
import { setTimeout } from "node:timers/promises";
import { Signer, SignerSingleton } from "@fgiova/aws-signature";
import { type Message, MiniSQSClient } from "@fgiova/mini-sqs-client";
import { teardown, test } from "tap";
import { SQSConsumer } from "../../src/index";
// @ts-expect-error
// biome-ignore lint/suspicious/noTsIgnore: is a Test file
// @ts-ignore
import { sqsPurge } from "../helpers/sqsMessage";

const queueARN = "arn:aws:sqs:eu-central-1:000000000000:test-queue";
Expand Down
6 changes: 4 additions & 2 deletions test/test/queue-hooks.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// @ts-expect-error
// biome-ignore lint/suspicious/noTsIgnore: is a Test file
// @ts-ignore
import "../helpers/localtest";
import { setTimeout } from "node:timers/promises";
import { SignerSingleton } from "@fgiova/aws-signature";
import { type Message, MiniSQSClient } from "@fgiova/mini-sqs-client";
import { teardown, test } from "tap";
import { SQSConsumer } from "../../src";
import type { HookName } from "../../src/hooks";
// @ts-expect-error
// biome-ignore lint/suspicious/noTsIgnore: is a Test file
// @ts-ignore
import { sqsPurge } from "../helpers/sqsMessage";

const queueARN = "arn:aws:sqs:eu-central-1:000000000000:test-queue-hooks";
Expand Down