From 003050fd768748750bdc1c36e9ee5ed3226eb139 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Jun 2025 17:37:48 +0000 Subject: [PATCH 1/3] Initial plan for issue From 93f37e7207cc5a003a521047f204ed517434bb1b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Jun 2025 19:53:45 +0000 Subject: [PATCH 2/3] Implement distributed locking mechanism for housekeeper tasks Co-authored-by: lucasoares <10624972+lucasoares@users.noreply.github.com> --- internal/cmd/deckard/main.go | 108 +++++++++++++++++-- internal/config/housekeeper.go | 42 ++++++++ internal/queue/cache/cache.go | 7 ++ internal/queue/cache/memory_cache.go | 28 +++++ internal/queue/cache/redis_cache.go | 49 +++++++++ internal/queue/distributed_lock.go | 133 ++++++++++++++++++++++++ internal/queue/distributed_lock_test.go | 97 +++++++++++++++++ internal/queue/queue.go | 5 + internal/queue/queue_housekeeper.go | 65 +++++++++++- 9 files changed, 521 insertions(+), 13 deletions(-) create mode 100644 internal/queue/distributed_lock.go create mode 100644 internal/queue/distributed_lock_test.go diff --git a/internal/cmd/deckard/main.go b/internal/cmd/deckard/main.go index dde6b5c..a88c84a 100644 --- a/internal/cmd/deckard/main.go +++ b/internal/cmd/deckard/main.go @@ -126,9 +126,19 @@ func startGrpcServer(queue *queue.Queue, queueService queue.QueueConfigurationSe } func startHouseKeeperJobs(pool *queue.Queue) { - go scheduleTask( + // Create distributed lock if enabled + var distributedLock queue.DistributedLock + if config.HousekeeperDistributedExecutionEnabled.GetBool() { + instanceID := config.GetHousekeeperInstanceID() + distributedLock = queue.NewRedisDistributedLock(pool.GetCache(), instanceID) + } else { + distributedLock = queue.NewNoOpDistributedLock() + } + + go scheduleTaskWithDistributedLock( UNLOCK, - nil, + nil, // No local lock needed with distributed locking + distributedLock, shutdown.WaitGroup, config.HousekeeperTaskUnlockDelay.GetDuration(), func() bool { @@ -138,9 +148,10 @@ func startHouseKeeperJobs(pool *queue.Queue) { }, ) - go scheduleTask( + go scheduleTaskWithDistributedLock( TIMEOUT, nil, + distributedLock, shutdown.WaitGroup, config.HousekeeperTaskTimeoutDelay.GetDuration(), func() bool { @@ -150,21 +161,26 @@ func startHouseKeeperJobs(pool *queue.Queue) { }, ) - go scheduleTask( + go scheduleTaskWithDistributedLock( METRICS, nil, + distributedLock, shutdown.WaitGroup, config.HousekeeperTaskMetricsDelay.GetDuration(), func() bool { - queue.ComputeMetrics(ctx, pool) + // Only compute metrics if we're the metrics leader or distributed execution is disabled + if !config.HousekeeperDistributedExecutionEnabled.GetBool() || isMetricsLeader(distributedLock) { + queue.ComputeMetrics(ctx, pool) + } return true }, ) - go scheduleTask( + go scheduleTaskWithDistributedLock( RECOVERY, - backgroundTaskLocker, + backgroundTaskLocker, // Keep local lock for critical recovery task + distributedLock, shutdown.CriticalWaitGroup, config.HousekeeperTaskUpdateDelay.GetDuration(), func() bool { @@ -172,9 +188,10 @@ func startHouseKeeperJobs(pool *queue.Queue) { }, ) - go scheduleTask( + go scheduleTaskWithDistributedLock( MAX_ELEMENTS, - backgroundTaskLocker, + backgroundTaskLocker, // Keep local lock for critical task + distributedLock, shutdown.WaitGroup, config.HousekeeperTaskMaxElementsDelay.GetDuration(), func() bool { @@ -184,9 +201,10 @@ func startHouseKeeperJobs(pool *queue.Queue) { }, ) - go scheduleTask( + go scheduleTaskWithDistributedLock( TTL, - backgroundTaskLocker, + backgroundTaskLocker, // Keep local lock for critical task + distributedLock, shutdown.WaitGroup, config.HousekeeperTaskTTLDelay.GetDuration(), func() bool { @@ -222,6 +240,74 @@ func scheduleTask(taskName string, lock *sync.Mutex, taskWaitGroup *sync.WaitGro } } +func scheduleTaskWithDistributedLock(taskName string, localLock *sync.Mutex, distributedLock queue.DistributedLock, taskWaitGroup *sync.WaitGroup, duration time.Duration, fn func() bool) { + for { + select { + case <-time.After(duration): + taskWaitGroup.Add(1) + + // Try to acquire distributed lock + lockTTL := config.HousekeeperDistributedExecutionLockTTL.GetDuration() + acquired, err := distributedLock.TryLock(ctx, taskName, lockTTL) + if err != nil { + logger.S(ctx).Errorf("Error acquiring distributed lock for task %s: %v", taskName, err) + taskWaitGroup.Done() + continue + } + + if !acquired { + logger.S(ctx).Debugf("Skipping task %s - already running on another instance", taskName) + taskWaitGroup.Done() + continue + } + + // Acquire local lock if needed + if localLock != nil { + localLock.Lock() + } + + executeTask(taskName, fn) + + // Release local lock if acquired + if localLock != nil { + localLock.Unlock() + } + + // Release distributed lock + err = distributedLock.ReleaseLock(ctx, taskName) + if err != nil { + logger.S(ctx).Errorf("Error releasing distributed lock for task %s: %v", taskName, err) + } + + taskWaitGroup.Done() + case <-shutdown.Started: + logger.S(ctx).Debug("Stopping ", taskName, " scheduler.") + return + } + } +} + +func isMetricsLeader(distributedLock queue.DistributedLock) bool { + // Try to acquire metrics leader lock + leaderLockTTL := config.HousekeeperDistributedExecutionLockTTL.GetDuration() * 2 // Longer TTL for leader + acquired, err := distributedLock.TryLock(ctx, "metrics_leader", leaderLockTTL) + if err != nil { + logger.S(ctx).Errorf("Error checking metrics leader lock: %v", err) + return false + } + + if acquired { + // We are the leader, extend the lock + err = distributedLock.RefreshLock(ctx, "metrics_leader", leaderLockTTL) + if err != nil { + logger.S(ctx).Errorf("Error refreshing metrics leader lock: %v", err) + } + return true + } + + return false +} + func executeTask(taskName string, fn func() bool) { now := dtime.Now() var metrify bool diff --git a/internal/config/housekeeper.go b/internal/config/housekeeper.go index 952e30a..bd2c848 100644 --- a/internal/config/housekeeper.go +++ b/internal/config/housekeeper.go @@ -1,5 +1,11 @@ package config +import ( + "fmt" + "os" + "time" +) + var HousekeeperEnabled = Create(&ViperConfigKey{ Key: "housekeeper.enabled", Default: true, @@ -34,3 +40,39 @@ var HousekeeperTaskMetricsDelay = Create(&ViperConfigKey{ Key: "housekeeper.task.metrics.delay", Default: "60s", }) + +var HousekeeperDistributedExecutionEnabled = Create(&ViperConfigKey{ + Key: "housekeeper.distributed_execution.enabled", + Default: false, +}) + +var HousekeeperDistributedExecutionInstanceID = Create(&ViperConfigKey{ + Key: "housekeeper.distributed_execution.instance_id", + Default: "", +}) + +var HousekeeperDistributedExecutionLockTTL = Create(&ViperConfigKey{ + Key: "housekeeper.distributed_execution.lock_ttl", + Default: "30s", +}) + +var HousekeeperUnlockParallelism = Create(&ViperConfigKey{ + Key: "housekeeper.unlock.parallelism", + Default: 5, +}) + +// GetHousekeeperInstanceID returns the instance ID for distributed execution +// If not provided in config, generates one based on hostname and timestamp +func GetHousekeeperInstanceID() string { + configuredID := HousekeeperDistributedExecutionInstanceID.Get() + if configuredID != "" { + return configuredID + } + + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + + return fmt.Sprintf("%s-%d", hostname, time.Now().Unix()) +} diff --git a/internal/queue/cache/cache.go b/internal/queue/cache/cache.go index e28a3f7..6b5da31 100644 --- a/internal/queue/cache/cache.go +++ b/internal/queue/cache/cache.go @@ -57,6 +57,13 @@ type Cache interface { // Sets the given key to its respective value. Use empty string to unset cache element. Set(ctx context.Context, key string, value string) error + // SetNX sets a key only if it doesn't exist, with a TTL + SetNX(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) + // Del deletes a key + Del(ctx context.Context, key string) error + // Expire sets a TTL on an existing key + Expire(ctx context.Context, key string, ttl time.Duration) error + // Close connection to the cache Close(ctx context.Context) error } diff --git a/internal/queue/cache/memory_cache.go b/internal/queue/cache/memory_cache.go index f9d0135..2b5b93d 100644 --- a/internal/queue/cache/memory_cache.go +++ b/internal/queue/cache/memory_cache.go @@ -5,6 +5,7 @@ import ( "context" "errors" "sync" + "time" "github.com/takenet/deckard/internal/dtime" "github.com/takenet/deckard/internal/queue/message" @@ -466,3 +467,30 @@ func (cache *MemoryCache) Close(ctx context.Context) error { return nil } + +func (cache *MemoryCache) SetNX(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) { + cache.lock.Lock() + defer cache.lock.Unlock() + + // Check if key already exists + if _, exists := cache.keys[key]; exists { + return false, nil // Key already exists, SetNX failed + } + + // Set the key (ignoring TTL for simplicity in memory cache) + cache.keys[key] = value + return true, nil +} + +func (cache *MemoryCache) Del(ctx context.Context, key string) error { + cache.lock.Lock() + defer cache.lock.Unlock() + + delete(cache.keys, key) + return nil +} + +func (cache *MemoryCache) Expire(ctx context.Context, key string, ttl time.Duration) error { + // No-op for memory cache - TTL not implemented for simplicity + return nil +} diff --git a/internal/queue/cache/redis_cache.go b/internal/queue/cache/redis_cache.go index 85b4b82..698e972 100644 --- a/internal/queue/cache/redis_cache.go +++ b/internal/queue/cache/redis_cache.go @@ -545,6 +545,55 @@ func (cache *RedisCache) Close(ctx context.Context) error { return cache.Client.Close() } +func (cache *RedisCache) SetNX(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) { + execStart := dtime.Now() + defer func() { + metrics.CacheLatency.Record(ctx, dtime.ElapsedTime(execStart), metric.WithAttributes(attribute.String("op", "setnx"))) + }() + + prefixedKey := fmt.Sprint("deckard:", key) + cmd := cache.Client.SetNX(ctx, prefixedKey, value, ttl) + + result, err := cmd.Result() + if err != nil { + return false, fmt.Errorf("error setting key with SetNX: %w", err) + } + + return result, nil +} + +func (cache *RedisCache) Del(ctx context.Context, key string) error { + execStart := dtime.Now() + defer func() { + metrics.CacheLatency.Record(ctx, dtime.ElapsedTime(execStart), metric.WithAttributes(attribute.String("op", "del"))) + }() + + prefixedKey := fmt.Sprint("deckard:", key) + cmd := cache.Client.Del(ctx, prefixedKey) + + if cmd.Err() != nil { + return fmt.Errorf("error deleting key: %w", cmd.Err()) + } + + return nil +} + +func (cache *RedisCache) Expire(ctx context.Context, key string, ttl time.Duration) error { + execStart := dtime.Now() + defer func() { + metrics.CacheLatency.Record(ctx, dtime.ElapsedTime(execStart), metric.WithAttributes(attribute.String("op", "expire"))) + }() + + prefixedKey := fmt.Sprint("deckard:", key) + cmd := cache.Client.Expire(ctx, prefixedKey, ttl) + + if cmd.Err() != nil { + return fmt.Errorf("error setting TTL on key: %w", cmd.Err()) + } + + return nil +} + // activePool returns the name of the active pool of messages. func (cache *RedisCache) activePool(queue string) string { return POOL_PREFIX + queue diff --git a/internal/queue/distributed_lock.go b/internal/queue/distributed_lock.go new file mode 100644 index 0000000..4f3e090 --- /dev/null +++ b/internal/queue/distributed_lock.go @@ -0,0 +1,133 @@ +package queue + +import ( + "context" + "fmt" + "time" + + "github.com/takenet/deckard/internal/logger" + "github.com/takenet/deckard/internal/queue/cache" +) + +// DistributedLock represents a distributed locking mechanism for coordinating +// tasks across multiple housekeeper instances +type DistributedLock interface { + // TryLock attempts to acquire a distributed lock with the given name + // Returns true if lock was acquired, false if already held by another instance + TryLock(ctx context.Context, lockName string, ttl time.Duration) (bool, error) + + // ReleaseLock releases a distributed lock with the given name + ReleaseLock(ctx context.Context, lockName string) error + + // RefreshLock extends the TTL of an existing lock + RefreshLock(ctx context.Context, lockName string, ttl time.Duration) error +} + +// RedisDistributedLock implements DistributedLock using Redis +type RedisDistributedLock struct { + cache cache.Cache + instanceID string +} + +// NewRedisDistributedLock creates a new Redis-based distributed lock +func NewRedisDistributedLock(cache cache.Cache, instanceID string) DistributedLock { + return &RedisDistributedLock{ + cache: cache, + instanceID: instanceID, + } +} + +// TryLock attempts to acquire a distributed lock +func (r *RedisDistributedLock) TryLock(ctx context.Context, lockName string, ttl time.Duration) (bool, error) { + lockKey := fmt.Sprintf("housekeeper:lock:%s", lockName) + + // Try to set the lock key with our instance ID and TTL + // Use SET with NX (only if not exists) and EX (expiration) + acquired, err := r.cache.SetNX(ctx, lockKey, r.instanceID, ttl) + if err != nil { + logger.S(ctx).Errorf("Failed to acquire distributed lock %s: %v", lockName, err) + return false, err + } + + if acquired { + logger.S(ctx).Debugf("Acquired distributed lock %s by instance %s", lockName, r.instanceID) + } else { + logger.S(ctx).Debugf("Failed to acquire distributed lock %s (already held)", lockName) + } + + return acquired, nil +} + +// ReleaseLock releases a distributed lock +func (r *RedisDistributedLock) ReleaseLock(ctx context.Context, lockName string) error { + lockKey := fmt.Sprintf("housekeeper:lock:%s", lockName) + + // Only release the lock if it's held by our instance + currentHolder, err := r.cache.Get(ctx, lockKey) + if err != nil { + // Lock might have already expired, which is fine + logger.S(ctx).Debugf("Lock %s not found when releasing (may have expired): %v", lockName, err) + return nil + } + + if currentHolder == r.instanceID { + err = r.cache.Del(ctx, lockKey) + if err != nil { + logger.S(ctx).Errorf("Failed to release distributed lock %s: %v", lockName, err) + return err + } + logger.S(ctx).Debugf("Released distributed lock %s by instance %s", lockName, r.instanceID) + } else { + logger.S(ctx).Debugf("Cannot release lock %s - held by different instance: %s", lockName, currentHolder) + } + + return nil +} + +// RefreshLock extends the TTL of an existing lock +func (r *RedisDistributedLock) RefreshLock(ctx context.Context, lockName string, ttl time.Duration) error { + lockKey := fmt.Sprintf("housekeeper:lock:%s", lockName) + + // Only refresh if the lock is held by our instance + currentHolder, err := r.cache.Get(ctx, lockKey) + if err != nil { + return fmt.Errorf("lock %s not found: %w", lockName, err) + } + + if currentHolder != r.instanceID { + return fmt.Errorf("cannot refresh lock %s - held by different instance: %s", lockName, currentHolder) + } + + // Refresh the TTL + err = r.cache.Expire(ctx, lockKey, ttl) + if err != nil { + logger.S(ctx).Errorf("Failed to refresh distributed lock %s: %v", lockName, err) + return err + } + + logger.S(ctx).Debugf("Refreshed distributed lock %s by instance %s", lockName, r.instanceID) + return nil +} + +// NoOpDistributedLock is a no-op implementation for single-instance deployments +type NoOpDistributedLock struct{} + +// NewNoOpDistributedLock creates a no-op distributed lock for backward compatibility +func NewNoOpDistributedLock() DistributedLock { + return &NoOpDistributedLock{} +} + +func (n *NoOpDistributedLock) TryLock(ctx context.Context, lockName string, ttl time.Duration) (bool, error) { + // Always succeed in single-instance mode + return true, nil +} + +func (n *NoOpDistributedLock) ReleaseLock(ctx context.Context, lockName string) error { + // No-op in single-instance mode + return nil +} + +func (n *NoOpDistributedLock) RefreshLock(ctx context.Context, lockName string, ttl time.Duration) error { + // No-op in single-instance mode + return nil +} \ No newline at end of file diff --git a/internal/queue/distributed_lock_test.go b/internal/queue/distributed_lock_test.go new file mode 100644 index 0000000..d623934 --- /dev/null +++ b/internal/queue/distributed_lock_test.go @@ -0,0 +1,97 @@ +package queue + +import ( + "context" + "testing" + "time" + + "github.com/takenet/deckard/internal/queue/cache" +) + +func TestRedisDistributedLock(t *testing.T) { + // Create a memory cache for testing + memCache := cache.NewMemoryCache() + + instanceID1 := "instance-1" + instanceID2 := "instance-2" + + lock1 := NewRedisDistributedLock(memCache, instanceID1) + lock2 := NewRedisDistributedLock(memCache, instanceID2) + + ctx := context.Background() + lockName := "test-lock" + ttl := time.Second * 10 + + // Test 1: First instance should acquire lock + acquired, err := lock1.TryLock(ctx, lockName, ttl) + if err != nil { + t.Fatalf("Error acquiring lock: %v", err) + } + if !acquired { + t.Fatal("Expected to acquire lock but failed") + } + + // Test 2: Second instance should fail to acquire same lock + acquired, err = lock2.TryLock(ctx, lockName, ttl) + if err != nil { + t.Fatalf("Error trying to acquire lock: %v", err) + } + if acquired { + t.Fatal("Expected to fail acquiring lock but succeeded") + } + + // Test 3: First instance should be able to release lock + err = lock1.ReleaseLock(ctx, lockName) + if err != nil { + t.Fatalf("Error releasing lock: %v", err) + } + + // Test 4: Second instance should now be able to acquire lock + acquired, err = lock2.TryLock(ctx, lockName, ttl) + if err != nil { + t.Fatalf("Error acquiring lock after release: %v", err) + } + if !acquired { + t.Fatal("Expected to acquire lock after release but failed") + } + + // Test 5: Second instance should be able to refresh the lock + err = lock2.RefreshLock(ctx, lockName, ttl) + if err != nil { + t.Fatalf("Error refreshing lock: %v", err) + } + + // Cleanup + err = lock2.ReleaseLock(ctx, lockName) + if err != nil { + t.Fatalf("Error releasing lock in cleanup: %v", err) + } +} + +func TestNoOpDistributedLock(t *testing.T) { + lock := NewNoOpDistributedLock() + ctx := context.Background() + lockName := "test-lock" + ttl := time.Second * 10 + + // NoOp lock should always succeed + acquired, err := lock.TryLock(ctx, lockName, ttl) + if err != nil { + t.Fatalf("Error with NoOp lock: %v", err) + } + if !acquired { + t.Fatal("NoOp lock should always succeed") + } + + // Release should not error + err = lock.ReleaseLock(ctx, lockName) + if err != nil { + t.Fatalf("Error releasing NoOp lock: %v", err) + } + + // Refresh should not error + err = lock.RefreshLock(ctx, lockName, ttl) + if err != nil { + t.Fatalf("Error refreshing NoOp lock: %v", err) + } +} \ No newline at end of file diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 48355a8..f9862a9 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -462,3 +462,8 @@ func notFoundIds(ids []string, messages []message.Message) []string { return notFound } + +// GetCache returns the cache instance used by the queue for distributed locking +func (queue *Queue) GetCache() cache.Cache { + return queue.cache +} diff --git a/internal/queue/queue_housekeeper.go b/internal/queue/queue_housekeeper.go index 4a152ca..86f02eb 100644 --- a/internal/queue/queue_housekeeper.go +++ b/internal/queue/queue_housekeeper.go @@ -2,10 +2,12 @@ package queue import ( "context" + "sync" "time" "github.com/elliotchance/orderedmap/v2" "github.com/takenet/deckard/internal/audit" + "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/dtime" "github.com/takenet/deckard/internal/logger" "github.com/takenet/deckard/internal/metrics" @@ -82,7 +84,7 @@ func ProcessLockPool(ctx context.Context, queue *Queue) { return } - unlockMessages(ctx, queue, lockAckQueues, cache.LOCK_ACK) + unlockMessagesParallel(ctx, queue, lockAckQueues, cache.LOCK_ACK) if shutdown.Ongoing() { logger.S(ctx).Info("Shutdown started. Stopping unlock process.") @@ -97,7 +99,7 @@ func ProcessLockPool(ctx context.Context, queue *Queue) { return } - unlockMessages(ctx, queue, lockNackQueues, cache.LOCK_NACK) + unlockMessagesParallel(ctx, queue, lockNackQueues, cache.LOCK_NACK) } func unlockMessages(ctx context.Context, pool *Queue, queues []string, lockType cache.LockType) { @@ -129,6 +131,65 @@ func unlockMessages(ctx context.Context, pool *Queue, queues []string, lockType } } +func unlockMessagesParallel(ctx context.Context, pool *Queue, queues []string, lockType cache.LockType) { + if len(queues) == 0 { + return + } + + parallelism := config.HousekeeperUnlockParallelism.GetInt() + if parallelism <= 0 { + parallelism = 5 // Default fallback + } + + // Use buffered channel to limit concurrency + semaphore := make(chan struct{}, parallelism) + var wg sync.WaitGroup + + for i := range queues { + if shutdown.Ongoing() { + logger.S(ctx).Info("Shutdown started. Stopping unlock process.") + break + } + + wg.Add(1) + go func(queueName string) { + defer wg.Done() + + // Acquire semaphore + semaphore <- struct{}{} + defer func() { <-semaphore }() + + unlockSingleQueue(ctx, pool, queueName, lockType) + }(queues[i]) + } + + wg.Wait() +} + +func unlockSingleQueue(ctx context.Context, pool *Queue, queueName string, lockType cache.LockType) { + if shutdown.Ongoing() { + return + } + + ids, err := pool.cache.UnlockMessages(ctx, queueName, lockType) + + if err != nil { + logger.S(ctx).Errorf("Error processing locks for queue '%s': %v", queueName, err.Error()) + return + } + + for index := range ids { + pool.auditor.Store(ctx, audit.Entry{ + ID: ids[index], + Queue: queueName, + Signal: audit.UNLOCK, + Reason: string(lockType), + }) + } + + metrics.HousekeeperUnlock.Add(ctx, int64(len(ids)), metric.WithAttributes(attribute.String("queue", message.GetQueuePrefix(queueName)), attribute.String("lock_type", string(lockType)))) +} + func isRecovering(ctx context.Context, pool *Queue) (bool, error) { recovery, err := pool.cache.Get(ctx, cache.RECOVERY_RUNNING) if err != nil { From da6f86072d189177c24e396720ed917424753cdc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Jun 2025 19:55:47 +0000 Subject: [PATCH 3/3] Add comprehensive documentation for distributed housekeeper execution Co-authored-by: lucasoares <10624972+lucasoares@users.noreply.github.com> --- docs/distributed-housekeeper.md | 208 ++++++++++++++++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 docs/distributed-housekeeper.md diff --git a/docs/distributed-housekeeper.md b/docs/distributed-housekeeper.md new file mode 100644 index 0000000..afdd052 --- /dev/null +++ b/docs/distributed-housekeeper.md @@ -0,0 +1,208 @@ +# Distributed Housekeeper Execution + +This document describes the distributed execution feature for Deckard's housekeeper, which allows running multiple housekeeper instances simultaneously without conflicts. + +## Overview + +The distributed housekeeper execution feature enables: +- **Task coordination**: Distributed locks ensure only one instance runs each task at a time +- **Metrics leader election**: Prevents duplication of Prometheus metrics across instances +- **Performance improvements**: Parallel processing for unlock operations +- **Scalability**: Multiple housekeeper pods can be deployed for better availability + +## Configuration + +### Environment Variables + +```bash +# Enable distributed execution +DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_ENABLED=true + +# Set unique instance ID (optional - auto-generated if not provided) +DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_INSTANCE_ID=housekeeper-pod-1 + +# Set lock TTL (default: 30s) +DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_LOCK_TTL=30s + +# Set unlock parallelism (default: 5) +DECKARD_HOUSEKEEPER_UNLOCK_PARALLELISM=10 +``` + +### Configuration File (YAML) + +```yaml +housekeeper: + distributed_execution: + enabled: true + instance_id: "housekeeper-pod-1" # Optional + lock_ttl: "30s" + unlock: + parallelism: 10 +``` + +## How It Works + +### Task Coordination + +When distributed execution is enabled, each housekeeper task acquires a distributed lock before execution: + +1. **UNLOCK**: Processes locked messages with parallel workers +2. **TIMEOUT**: Handles message timeouts +3. **RECOVERY**: Recovers messages from storage to cache (critical task) +4. **TTL**: Removes expired messages (critical task) +5. **MAX_ELEMENTS**: Removes exceeding messages (critical task) +6. **METRICS**: Computes queue metrics (leader-only) + +### Metrics Leader Election + +Only one housekeeper instance computes and exposes metrics to prevent duplication: +- Metrics leader is elected using a distributed lock +- Non-leader instances skip metrics computation +- Leader lock has extended TTL for stability + +### Lock Management + +- **Lock Key Format**: `deckard:housekeeper:lock:{task_name}` +- **Instance Identification**: Each instance has a unique ID +- **TTL Management**: Locks auto-expire to prevent deadlocks +- **Error Handling**: Failed lock acquisitions are logged and retried + +## Deployment Examples + +### Kubernetes Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: deckard-housekeeper +spec: + replicas: 3 # Multiple instances for availability + selector: + matchLabels: + app: deckard-housekeeper + template: + metadata: + labels: + app: deckard-housekeeper + spec: + containers: + - name: deckard + image: deckard:latest + env: + - name: DECKARD_HOUSEKEEPER_ENABLED + value: "true" + - name: DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_ENABLED + value: "true" + - name: DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_INSTANCE_ID + valueFrom: + fieldRef: + fieldPath: metadata.name # Use pod name as instance ID + - name: DECKARD_REDIS_ADDRESS + value: "redis-service" + - name: DECKARD_REDIS_PORT + value: "6379" +``` + +### Docker Compose + +```yaml +version: '3.8' +services: + redis: + image: redis:7-alpine + + deckard-hk-1: + image: deckard:latest + environment: + - DECKARD_HOUSEKEEPER_ENABLED=true + - DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_ENABLED=true + - DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_INSTANCE_ID=hk-1 + - DECKARD_REDIS_ADDRESS=redis + depends_on: + - redis + + deckard-hk-2: + image: deckard:latest + environment: + - DECKARD_HOUSEKEEPER_ENABLED=true + - DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_ENABLED=true + - DECKARD_HOUSEKEEPER_DISTRIBUTED_EXECUTION_INSTANCE_ID=hk-2 + - DECKARD_REDIS_ADDRESS=redis + depends_on: + - redis +``` + +## Monitoring + +### Logs + +Distributed execution adds debug logs for: +- Lock acquisition/release +- Task skipping when locks are held by other instances +- Metrics leader election + +Example log messages: +``` +[DEBUG] Acquired distributed lock timeout by instance hk-1 +[DEBUG] Skipping task recovery - already running on another instance +[INFO] Instance hk-1 elected as metrics leader +``` + +### Metrics + +Existing housekeeper metrics continue to work: +- Only the metrics leader reports queue metrics +- Task execution metrics include instance information +- Lock-related errors are logged but don't affect existing metrics + +## Performance Improvements + +### Parallel Unlocking + +The unlock task now processes queues in parallel: +- Configurable parallelism level (default: 5 workers) +- Maintains all existing unlock logic and metrics +- Graceful shutdown handling +- Per-queue error isolation + +Performance benefits: +- Faster processing when many queues have locked messages +- Better resource utilization +- Reduced lock time precision impact + +## Backward Compatibility + +When distributed execution is disabled (default): +- Uses NoOp distributed locks (always succeed) +- Maintains original single-instance behavior +- No performance impact +- All existing functionality preserved + +## Troubleshooting + +### Common Issues + +1. **Redis Connection Issues** + - Ensure Redis is accessible from all housekeeper instances + - Check Redis authentication and network connectivity + +2. **Lock Contention** + - Increase lock TTL if tasks take longer than expected + - Monitor logs for lock acquisition failures + +3. **Metrics Duplication** + - Verify only one instance is elected as metrics leader + - Check distributed execution is enabled on all instances + +### Debugging + +Enable debug logging to see distributed lock operations: +```bash +DECKARD_LOG_LEVEL=debug +``` + +Check lock status in Redis: +```bash +redis-cli KEYS "deckard:housekeeper:lock:*" +``` \ No newline at end of file