From 7e45708f48c80e2064826bf35fbb5d14299a67bd Mon Sep 17 00:00:00 2001 From: YaroslavStryhun Date: Tue, 9 Sep 2025 13:00:11 +0200 Subject: [PATCH 1/3] Add reties delay logic --- src/Queues/DTO/BackoffStrategyDTO.php | 46 +++++++++++++++++++ src/Queues/Queue.php | 8 +++- src/qless-core/qless.lua | 66 +++++++++++++++++++++------ 3 files changed, 103 insertions(+), 17 deletions(-) create mode 100644 src/Queues/DTO/BackoffStrategyDTO.php diff --git a/src/Queues/DTO/BackoffStrategyDTO.php b/src/Queues/DTO/BackoffStrategyDTO.php new file mode 100644 index 0000000..71eec8a --- /dev/null +++ b/src/Queues/DTO/BackoffStrategyDTO.php @@ -0,0 +1,46 @@ +initialDelay = $initialDelay; + $this->factor = $factor; + } + + /** + * @return int + */ + public function getFactor(): int + { + return $this->factor; + } + + /** + * @return int + */ + public function getInitialDelay(): int + { + return $this->initialDelay; + } + + public function toArray(): array + { + return [ + 'factor' => $this->getFactor(), + 'initial_delay' => $this->getInitialDelay(), + ]; + } +} diff --git a/src/Queues/Queue.php b/src/Queues/Queue.php index e13b55d..b828184 100644 --- a/src/Queues/Queue.php +++ b/src/Queues/Queue.php @@ -12,6 +12,7 @@ use Qless\Exceptions\UnknownPropertyException; use Qless\Jobs\BaseJob; use Qless\Jobs\JobData; +use Qless\Queues\DTO\BackoffStrategyDTO; use Qless\Support\PropertyAccessor; use Ramsey\Uuid\Uuid; @@ -85,7 +86,8 @@ public function put( ?int $retries = null, ?int $priority = null, ?array $tags = null, - ?array $depends = null + ?array $depends = null, + ?BackoffStrategyDTO $backoffStrategyDTO = null ): string { try { $jid = $jid ?: str_replace('-', '', Uuid::uuid4()->toString()); @@ -120,7 +122,9 @@ public function put( 'retries', is_null($retries) ? 5 : $retries, 'depends', - json_encode($depends ?: [], JSON_UNESCAPED_SLASHES) + json_encode($depends ?: [], JSON_UNESCAPED_SLASHES), + 'backoff', + json_encode($backoffStrategyDTO ? $backoffStrategyDTO->toArray() : [], JSON_UNESCAPED_SLASHES) ); $this->getEventsManager()->fire(new QueueEvent\AfterEnqueue($this, $jid, $data->toArray(), $className)); diff --git a/src/qless-core/qless.lua b/src/qless-core/qless.lua index 6feec74..15b026e 100644 --- a/src/qless-core/qless.lua +++ b/src/qless-core/qless.lua @@ -385,7 +385,10 @@ Qless.config.defaults = { ['histogram-history'] = 7, ['jobs-history-count'] = 50000, ['jobs-history'] = 604800, - ['jobs-failed-history'] = 604800 + ['jobs-failed-history'] = 604800, + -- retries logic + ['backoff-initial-delay'] = 0, -- Default delay in seconds. 0 means disabled. + ['backoff-factor'] = 3 -- Exponential factor. } Qless.config.get = function(key, default) @@ -1584,19 +1587,25 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed' , -1) end - redis.call('hmset', QlessJob.ns .. jid, - 'jid' , jid, - 'klass' , klass, - 'data' , raw_data, - 'priority' , priority, - 'tags' , cjson.encode(tags), - 'state' , ((delay > 0) and 'scheduled') or 'waiting', - 'worker' , '', - 'expires' , 0, - 'queue' , self.name, - 'retries' , retries, - 'remaining', retries, - 'time' , string.format("%.20f", now)) + local job_fields = { + 'jid' , jid, + 'klass' , klass, + 'data' , raw_data, + 'priority' , priority, + 'tags' , cjson.encode(tags), + 'state' , ((delay > 0) and 'scheduled') or 'waiting', + 'worker' , '', + 'expires' , 0, + 'queue' , self.name, + 'retries' , retries, + 'remaining', retries, + 'time' , string.format("%.20f", now) + } + if options['backoff'] then + table.insert(job_fields, 'backoff') + table.insert(job_fields, cjson.encode(options['backoff'])) + end + redis.call('hmset', QlessJob.ns .. jid, job_fields) for i, j in ipairs(depends) do local state = redis.call('hget', QlessJob.ns .. j, 'state') @@ -1954,7 +1963,34 @@ function QlessQueue:invalidate_locks(now, count) redis.call('zadd', 'ql:failed-jobs-list', now, jid) clearOldFailedJobs(now) else - table.insert(jids, jid) + local backoff_json = redis.call('hget', Qless.ns .. jid, 'backoff') + local backoff_config = {} + if backoff_json then + backoff_config = cjson.decode(backoff_json) + end + + local initial_delay = tonumber(backoff_config['initial_delay']) + local backoff_factor = tonumber(backoff_config['factor']) + if initial_delay == nil then + initial_delay = tonumber(Qless.config.get('backoff-initial-delay', 0)) + end + if backoff_factor == nil then + backoff_factor = tonumber(Qless.config.get('backoff-factor', 3)) + end + if initial_delay == 0 then + table.insert(jids, jid) + else + local job = Qless.job(jid) + local job_history = job:history() + local retry_count = #job_history - 1 + if retry_count < 0 then retry_count = 0 end + + local delay = initial_delay * (backoff_factor ^ retry_count) + + self.locks.remove(jid) + self.scheduled.add(now + delay, jid) + redis.call('hset', QlessJob.ns .. jid, 'state', 'scheduled') + end end end end From 71ba6aa1788cdc9c3bc07f98477b0b567efd2e49 Mon Sep 17 00:00:00 2001 From: YaroslavStryhun Date: Tue, 9 Sep 2025 13:46:24 +0200 Subject: [PATCH 2/3] fix --- src/qless-core/qless.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/qless-core/qless.lua b/src/qless-core/qless.lua index 15b026e..11a5120 100644 --- a/src/qless-core/qless.lua +++ b/src/qless-core/qless.lua @@ -1605,7 +1605,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) table.insert(job_fields, 'backoff') table.insert(job_fields, cjson.encode(options['backoff'])) end - redis.call('hmset', QlessJob.ns .. jid, job_fields) + redis.call('hmset', QlessJob.ns .. jid, unpack(job_fields)) for i, j in ipairs(depends) do local state = redis.call('hget', QlessJob.ns .. j, 'state') From 987019c571e224a943e9f9e6d0ddd264a48afe00 Mon Sep 17 00:00:00 2001 From: YaroslavStryhun Date: Tue, 9 Sep 2025 13:54:09 +0200 Subject: [PATCH 3/3] fix --- src/qless-core/qless.lua | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/qless-core/qless.lua b/src/qless-core/qless.lua index 11a5120..2f32364 100644 --- a/src/qless-core/qless.lua +++ b/src/qless-core/qless.lua @@ -1603,7 +1603,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) } if options['backoff'] then table.insert(job_fields, 'backoff') - table.insert(job_fields, cjson.encode(options['backoff'])) + table.insert(job_fields, options['backoff']) end redis.call('hmset', QlessJob.ns .. jid, unpack(job_fields)) @@ -1963,7 +1963,7 @@ function QlessQueue:invalidate_locks(now, count) redis.call('zadd', 'ql:failed-jobs-list', now, jid) clearOldFailedJobs(now) else - local backoff_json = redis.call('hget', Qless.ns .. jid, 'backoff') + local backoff_json = redis.call('hget', QlessJob.ns .. jid, 'backoff') local backoff_config = {} if backoff_json then backoff_config = cjson.decode(backoff_json) @@ -1980,12 +1980,11 @@ function QlessQueue:invalidate_locks(now, count) if initial_delay == 0 then table.insert(jids, jid) else - local job = Qless.job(jid) - local job_history = job:history() - local retry_count = #job_history - 1 - if retry_count < 0 then retry_count = 0 end + local total_retries = tonumber(redis.call('hget', QlessJob.ns .. jid, 'retries') or 5) + local retries_left = tonumber(redis.call('hget', QlessJob.ns .. jid, 'remaining') or total_retries) - local delay = initial_delay * (backoff_factor ^ retry_count) + local attempt_index = total_retries - retries_left + local delay = initial_delay * (backoff_factor ^ attempt_index) self.locks.remove(jid) self.scheduled.add(now + delay, jid)