Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.6
rev: v0.11.8
hooks:
- id: ruff
- id: ruff-format
Expand Down
7 changes: 4 additions & 3 deletions ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
line-length = 110
indent-width = 4

# Assume Python 3.8.
target-version = "py38"
# Assume Python 3.9.
target-version = "py39"

# Enable automatic fixes.
fix = true
Expand All @@ -21,8 +21,9 @@ line-ending = "auto"
# "F": Pyflakes https://docs.astral.sh/ruff/rules/#pyflakes-f
# "E", "W": pycodestyle https://docs.astral.sh/ruff/rules/#pycodestyle-e-w
# "I": isort https://docs.astral.sh/ruff/rules/#isort-i
# "B": flake8-bugbear https://docs.astral.sh/ruff/rules/#flake8-bugbear-b
[lint]
select = ["UP", "F", "E", "W", "I"]
select = ["UP", "F", "E", "W", "I", "B"]
ignore = ["E203", "E501"]

# Isort config.
Expand Down
20 changes: 13 additions & 7 deletions sqsx/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import signal
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Any, Callable, Dict, Optional
from types import FrameType
from typing import Any, Callable, Optional

from pydantic import BaseModel, Field, PrivateAttr

Expand Down Expand Up @@ -32,8 +33,8 @@ def consume_messages(
logger.info(f"Starting consuming tasks, queue_url={self.url}")

if enable_signal_to_exit_gracefully:
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
signal.signal(signal.SIGINT, self._exit_gracefully_from_signal)
signal.signal(signal.SIGTERM, self._exit_gracefully_from_signal)

while True:
if self._should_consume_tasks_stop:
Expand Down Expand Up @@ -66,10 +67,13 @@ def consume_messages(
if not run_forever:
break

def exit_gracefully(self, signal_num, current_stack_frame) -> None:
logger.info("Starting graceful shutdown process")
def exit_gracefully(self) -> None:
logger.info(f"Starting graceful shutdown process, queue_url={self.url}")
self._should_consume_tasks_stop = True

def _exit_gracefully_from_signal(self, signal: int, frame: Optional[FrameType]):
self.exit_gracefully()

def _message_ack(self, sqs_message: dict) -> None:
receipt_handle = sqs_message["ReceiptHandle"]
self.sqs_client.delete_message(QueueUrl=self.url, ReceiptHandle=receipt_handle)
Expand All @@ -95,7 +99,7 @@ class Queue(BaseModel, BaseQueueMixin):
sqs_client: Any
min_backoff_seconds: int = Field(default=30)
max_backoff_seconds: int = Field(default=900)
_handlers: Dict[str, Callable] = PrivateAttr(default={})
_handlers: dict[str, Callable] = PrivateAttr(default={})
_should_consume_tasks_stop: bool = PrivateAttr(default=False)

def add_task(self, task_name: str, **task_kwargs) -> dict:
Expand Down Expand Up @@ -165,7 +169,9 @@ class RawQueue(BaseModel, BaseQueueMixin):
max_backoff_seconds: int = Field(default=900)
_should_consume_tasks_stop: bool = PrivateAttr(default=False)

def add_message(self, message_body: str, message_attributes: dict = {}) -> dict:
def add_message(self, message_body: str, message_attributes: Optional[dict] = None) -> dict:
if message_attributes is None:
message_attributes = {}
return self.sqs_client.send_message(
QueueUrl=self.url,
MessageAttributes=message_attributes,
Expand Down