-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
Description
Summary
Extend the job manager to support Redis as a persistent store, enabling multi-instance deployments with distributed job claiming.
Background
The current MemoryStore implementation works well for single-instance deployments. However, for high availability and horizontal scaling, we need a Redis-based store that supports:
- Persistent job state across restarts
- Multiple config-server instances sharing a job queue
- Atomic job claiming to prevent duplicate execution
Requirements
1. RedisStore Implementation
Implement JobStore interface with Redis backend:
type RedisStore struct {
client *redis.Client
prefix string // key prefix for namespacing
}Key schema:
{prefix}:job:{id}- Job data (hash or JSON){prefix}:jobs:pending- Pending job queue (sorted set by created_at){prefix}:jobs:running- Running jobs (set){prefix}:jobs:by_type:{type}- Jobs indexed by type (set)
2. Distributed Job Claiming
Add claim methods to JobStore interface:
type JobStore interface {
// Existing methods...
// ClaimNext atomically claims the next pending job for a worker
// Returns nil if no jobs available or all claimed
ClaimNext(ctx context.Context, workerID string) (*Job, error)
// ReleaseClaim releases a job claim (on worker failure/restart)
ReleaseClaim(ctx context.Context, jobID string) error
// RefreshClaim extends the claim TTL (heartbeat)
RefreshClaim(ctx context.Context, jobID string) error
}Claim mechanism (using Lua script for atomicity):
- ZPOPMIN from pending queue
- SET claim lock with TTL
- Add to running set
- Return job data
3. Worker Pool Adaptation
Modify workerPool to use claim-based pulling when using distributed store:
type workerPool struct {
store JobStore
distributed bool // true for Redis, false for memory
workerID string // unique identifier for this instance
}4. Stale Claim Recovery
Background process to recover jobs from failed workers:
- Monitor claim TTL expiration
- Re-queue jobs whose claims expired without completion
Acceptance Criteria
-
RedisStoreimplements allJobStoreinterface methods - Atomic job claiming prevents duplicate execution across instances
- Claim TTL and heartbeat mechanism handles worker failures
- Stale job recovery re-queues orphaned jobs
- Configuration option to choose store type (memory/redis)
- Integration tests with Redis (testcontainers)
- Graceful degradation if Redis is unavailable
Technical Notes
- Use
go-redis/redis/v9client - Lua scripts for atomic operations
- Consider Redis Cluster compatibility
- Default claim TTL: 5 minutes with 1-minute heartbeat
Related
- Depends on: feat(config-server): Implement generic async Job Manager #3 (Generic Async Job Manager) ✅ Implemented
Reactions are currently unavailable