feat: implement Redis queue store and mutex#493
Open
niteshpurohit wants to merge 24 commits into
Open
Conversation
- Introduced a Redis-backed queue store to enhance durability and shared state management across worker processes. - Added `PersistenceMutex` for distributed lock management to ensure safe access to shared state. - Implemented `StateSnapshot` for marshaling queue store state, allowing for efficient state persistence. - Updated backend interfaces to support Redis configuration, including necessary options like URL and namespace. - Created comprehensive tests for Redis queue store functionality, ensuring reliability and correctness. - Enhanced documentation to guide users on configuring and using the Redis backend effectively. closes: #112
There was a problem hiding this comment.
Pull request overview
This PR introduces a Redis-backed backend/queue-store path for core/karya, aiming to persist queue/workflow state in Redis and provide a distributed mutex for cross-process coordination (closing #112).
Changes:
- Added
Karya::QueueStore::Rediswith Redis state persistence, plusInternal::PersistenceMutex+Internal::StateSnapshot. - Added
Karya::Backend::Redisand wired Redis backend/queue-store autoloading and configuration updates. - Added unit + e2e coverage and updated documentation to describe Redis backend selection and configuration.
Reviewed changes
Copilot reviewed 29 out of 34 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/pages/backends.md | Updates backend guidance and adds Redis configuration example. |
| docs/pages/adoption/goodjob.md | Adjusts backend selection wording in adoption docs. |
| docs/index.md | Updates docs index wording for frameworks entry. |
| core/karya/spec/karya/queue_store/redis/internal/persistence_mutex_spec.rb | Adds a unit spec covering distributed lock token allocation failure behavior. |
| core/karya/spec/karya/queue_store/redis_spec.rb | Adds unit coverage for Redis queue store persistence and lock behaviors (via a fake Redis client). |
| core/karya/spec/karya/karya_base_spec.rb | Updates backend option validation expectations (removes queue_store_class special casing, allows queue_store instance option). |
| core/karya/spec/karya/job_spec.rb | Adds Marshal round-trip coverage for Karya::Job lifecycle registry behavior. |
| core/karya/spec/karya/cli_backend_boot_spec.rb | Updates corrupted backend option key test fixture (adapter_class). |
| core/karya/spec/karya/backend/redis_spec.rb | Adds unit tests for Karya::Backend::Redis. |
| core/karya/spec/karya/backend/in_memory_spec.rb | Removes tests related to injected queue store factories for InMemory backend. |
| core/karya/spec/e2e/karya/cli_worker_redis_spec.rb | Adds optional e2e worker boot test for Redis backend (env-gated). |
| core/karya/sig/karya/queue_store/redis/internal/state_snapshot.rbs | Adds RBS for Redis state snapshot marshal module. |
| core/karya/sig/karya/queue_store/redis/internal/persistence_mutex.rbs | Adds RBS for Redis distributed mutex. |
| core/karya/sig/karya/queue_store/redis.rbs | Adds RBS for Redis queue store public/private surface and Redis client interface. |
| core/karya/sig/karya/base.rbs | Updates configure_backend signature to accept generic backend options. |
| core/karya/sig/karya/backend/redis.rbs | Adds RBS for Redis backend wrapper. |
| core/karya/sig/karya/backend/in_memory.rbs | Updates RBS to reflect simplified in-memory backend initialization/build behavior. |
| core/karya/sig/karya/backend.rbs | Changes backend_class typing (currently widened to ::Class). |
| core/karya/lib/karya/queue_store/redis/internal/state_snapshot.rb | Implements Redis persistence payload dump/load (currently using Marshal). |
| core/karya/lib/karya/queue_store/redis/internal/persistence_mutex.rb | Implements cross-process locking around state load/mutate/persist. |
| core/karya/lib/karya/queue_store/redis/internal.rb | Adds internal requires for Redis queue store internals. |
| core/karya/lib/karya/queue_store/redis.rb | Implements Redis-backed queue store subclass that persists InMemory state into Redis. |
| core/karya/lib/karya/queue_store.rb | Autoloads QueueStore::Redis. |
| core/karya/lib/karya/job.rb | Adds Marshal hooks to avoid serializing lifecycle registry in persisted jobs. |
| core/karya/lib/karya/base.rb | Removes queue_store_class special-casing from backend option validation. |
| core/karya/lib/karya/backend/redis.rb | Adds Redis backend wrapper to build Redis queue store and carry options. |
| core/karya/lib/karya/backend/in_memory.rb | Simplifies in-memory backend (removes injected factory + forwarded options). |
| core/karya/lib/karya/backend.rb | Autoloads Backend::Redis. |
| core/karya-activerecord/gemfiles/rails_edge.gemfile.lock | Adds redis (~> 5.4) to lockfile dependency set. |
| core/karya-activerecord/gemfiles/rails_8_1.gemfile.lock | Adds redis (~> 5.4) to lockfile dependency set. |
| core/karya-activerecord/gemfiles/rails_8_0.gemfile.lock | Adds redis (~> 5.4) to lockfile dependency set. |
| core/karya-activerecord/gemfiles/rails_7_2.gemfile.lock | Adds redis (~> 5.4) to lockfile dependency set. |
| core/karya-activerecord/Gemfile.lock | Adds redis (~> 5.4) to lockfile dependency set. |
| core/karya-activerecord/Gemfile | Adds redis (~> 5.4) dependency for the activerecord package bundle. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 186 out of 191 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
core/karya/lib/karya/queue_store/internal/initializer_options.rb:30
InitializerOptions::KeywordReaderusesKarya::QueueStore::Internal::ReferenceQueueStore::DEFAULT_*constants, but this file doesn’t requirekarya/queue_store/internal/reference_queue_store. If someone requireskarya/queue_store/internal/initializer_optionsstandalone and callsexpired_tombstone_limit/completed_batch_retention_limit/max_batch_sizebeforeReferenceQueueStoreis loaded, this will raiseNameError. Consider inlining these defaults here (or moving the shared defaults intoKarya::QueueStore::Internal) so the class is self-contained when loaded directly.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 186 out of 191 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
core/karya/lib/karya/queue_store/redis/internal/state_snapshot.rb:94
- The snapshot codec serializes arbitrary symbols but only allows this fixed set when loading, and the list is missing symbols that the store writes into persisted state. For example, circuit-breaker state uses
:open,:half_open, and:cooldown_until, and stuck-job recovery metadata uses:recovery_count,:last_recovered_at, and:last_recovery_reason; after those paths run, Redis can contain a snapshot that the next operation cannot load. Add every symbol that can appear in persisted store state or reject unsupported state before writing.
| expect(JSON.parse(File.read(marker_file))).to include('account_id' => 42) | ||
| expect(read_runtime_state(state_file).fetch('snapshot').fetch('phase')).to eq('stopped') | ||
| ensure | ||
| Redis.new(url: redis_url).del("#{namespace}:queue_store:state", "#{namespace}:queue_store:lock") |
Comment on lines
+20
to
+54
| # Serializes jobs without persisting the runtime lifecycle registry object. | ||
| module MarshalSupport | ||
| def marshal_dump | ||
| { | ||
| id:, | ||
| queue:, | ||
| handler:, | ||
| arguments:, | ||
| priority:, | ||
| concurrency_scope:, | ||
| rate_limit_scope:, | ||
| retry_policy:, | ||
| execution_timeout:, | ||
| expires_at:, | ||
| idempotency_key:, | ||
| uniqueness_key:, | ||
| uniqueness_scope:, | ||
| state:, | ||
| attempt:, | ||
| created_at:, | ||
| updated_at:, | ||
| next_retry_at:, | ||
| failure_classification:, | ||
| dead_letter_reason:, | ||
| dead_lettered_at:, | ||
| dead_letter_source_state: | ||
| } | ||
| end | ||
|
|
||
| def marshal_load(attributes) | ||
| reloaded_job = self.class.new(**attributes) | ||
|
|
||
| @identity = reloaded_job.send(:identity) | ||
| @scheduling = reloaded_job.send(:scheduling) | ||
| @lifecycle_state = reloaded_job.send(:lifecycle_state) |
| autoload :QueueControlResult, 'karya/queue_store/queue_control_result' | ||
| autoload :RecoveryReport, 'karya/queue_store/recovery_report' | ||
| autoload :Internal, 'karya/queue_store/internal' | ||
| autoload :Redis, 'karya/queue_store/redis' |
Comment on lines
+18
to
+22
| def initialize(**options) | ||
| unless options.empty? | ||
| raise InvalidBackendConfigurationError, | ||
| "Karya::Backend::InMemory does not accept backend options: #{options.keys.map(&:inspect).join(', ')}" | ||
| end |
Comment on lines
+12
to
+22
| type event_payload_value = | ||
| Karya::Job | | ||
| Karya::RetryPolicy | | ||
| Karya::failure_classification | | ||
| String | | ||
| Time | | ||
| Integer | | ||
| Float | | ||
| bool | | ||
| nil | | ||
| Array[String] |
Comment on lines
+20
to
+21
| def lease_duration: () -> Integer? | ||
| def now: () -> Time? |
Comment on lines
+199
to
+202
| def delete_journal_events_through(version) | ||
| (1..version).each do |event_version| | ||
| owner.send(:redis_client).del(owner.send(:event_key, event_version)) | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
PersistenceMutexfor distributed lock management to ensure safe access to shared state.StateSnapshotfor marshaling queue store state, allowing for efficient state persistence.closes: #112