Implement a Redis Delayed Queue to support sorted sets#599
Implement a Redis Delayed Queue to support sorted sets#599
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #599 +/- ##
==========================================
- Coverage 81.94% 81.57% -0.38%
==========================================
Files 16 16
Lines 2703 2746 +43
==========================================
+ Hits 2215 2240 +25
- Misses 488 506 +18
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR implements support for a Redis Delayed Queue using sorted sets by introducing a new numeric weight for prioritizing items. Key changes include adding and propagating the weight attribute in triggers and payloads, refactoring push/pop methods to use sorted sets, and incorporating new tests for validating weight-based ordering.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_trigger.py | Updated trigger tests to include weight extraction in notifications |
| tests/test_sync.py | Modified tests for slot truncation and payload flushing with weights |
| tests/test_redisqueue.py | Added tests to verify correct ordering and score calculations in RedisQueue |
| pgsync/trigger.py | Integrated weight handling in trigger logic |
| pgsync/sync.py | Revised payload flushing and notification channel filtering |
| pgsync/redisqueue.py | Reworked queue push/pop to utilize sorted sets with weight-based scoring |
| pgsync/base.py | Extended payload structure with the weight property |
| examples/book/benchmark.py | Added a new CLI option to set weight for benchmarking operations |
| if payloads: | ||
| self.redis.push(payloads) | ||
| payloads = [] | ||
| self._flush_payloads(payloads) |
There was a problem hiding this comment.
After flushing payloads, the payloads list is not cleared. This may lead to duplicate processing of already flushed items; consider resetting the payloads list (e.g., payloads = []) after calling _flush_payloads.
| self._flush_payloads(payloads) | |
| self._flush_payloads(payloads) | |
| payloads = [] |
| Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) | ||
| session = Session() | ||
|
|
||
| if weight: |
There was a problem hiding this comment.
[nitpick] The condition 'if weight:' does not execute when weight is 0 (the default), which may be unintended. Consider explicitly checking for None or always setting the weight configuration regardless of its value.
| if weight: | |
| if weight is not None: |
No description provided.