diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d8a9a8..b9c57b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - Dark mode support across admin and frontend CSS. - Granular permission tables and seeder on activation. - "Powered by Escalated" badge with admin toggle. +- Workflow `delay` action — pauses a workflow run for N seconds and resumes the remaining actions via a per-minute WP-Cron sweep. Backed by a new `escalated_deferred_workflow_jobs` table with a composite `(status, run_at)` index for efficient polling. Existing installs need to reactivate the plugin to pick up the new table. ### Changed - License changed from GPL-2.0-or-later to MIT. diff --git a/includes/Cron/class-deferred-workflow-jobs-check.php b/includes/Cron/class-deferred-workflow-jobs-check.php new file mode 100644 index 0000000..ead7e64 --- /dev/null +++ b/includes/Cron/class-deferred-workflow-jobs-check.php @@ -0,0 +1,35 @@ +run_due_deferred_jobs(); + } +} diff --git a/includes/Models/DeferredWorkflowJob.php b/includes/Models/DeferredWorkflowJob.php new file mode 100644 index 0000000..83f710e --- /dev/null +++ b/includes/Models/DeferredWorkflowJob.php @@ -0,0 +1,106 @@ +get_row( + $wpdb->prepare("SELECT * FROM {$table} WHERE id = %d", $id) + ); + } + + /** + * Create a new deferred job row. `remaining_actions` is JSON-encoded + * by the caller so consumers can decode + re-dispatch. + * + * @return int|false Inserted ID or false on failure. + */ + public static function create(array $data) + { + global $wpdb; + $table = static::table(); + $now = current_time('mysql'); + + $data['created_at'] = $now; + $data['updated_at'] = $now; + $data['status'] = $data['status'] ?? 'pending'; + + $result = $wpdb->insert($table, $data); + + return $result !== false ? $wpdb->insert_id : false; + } + + /** + * Update a row. + * + * @param int $id + * @return bool + */ + public static function update($id, array $data) + { + global $wpdb; + $table = static::table(); + + $data['updated_at'] = current_time('mysql'); + + return $wpdb->update($table, $data, ['id' => $id]) !== false; + } + + /** + * Fetch every `pending` row whose `run_at` has elapsed. + * + * `run_at` is stored in UTC (written via `gmdate` in + * {@see WorkflowExecutorService::schedule_delay}), so we compare + * against GMT here regardless of WordPress's configured timezone. + * + * @return array + */ + public static function pending(): array + { + global $wpdb; + $table = static::table(); + $now_gmt = current_time('mysql', true); + + return $wpdb->get_results( + $wpdb->prepare( + "SELECT * FROM {$table} WHERE status = %s AND run_at <= %s ORDER BY run_at ASC", + 'pending', + $now_gmt + ) + ) ?: []; + } +} diff --git a/includes/Services/WorkflowExecutorService.php b/includes/Services/WorkflowExecutorService.php index ecd532a..1d32d99 100644 --- a/includes/Services/WorkflowExecutorService.php +++ b/includes/Services/WorkflowExecutorService.php @@ -2,8 +2,10 @@ namespace Escalated\Services; +use Escalated\Models\DeferredWorkflowJob; use Escalated\Models\Reply; use Escalated\Models\Tag; +use Escalated\Models\Ticket; /** * Performs the side-effects dictated by a matched Workflow. @@ -13,10 +15,15 @@ * dispatches each entry to the relevant service. * * Action catalog: change_priority, change_status, assign_agent, - * set_department, add_tag, remove_tag, add_note, insert_canned_reply. - * Mirrors the NestJS reference impl in + * set_department, add_tag, remove_tag, add_note, insert_canned_reply, + * delay. Mirrors the NestJS reference impl in * escalated-nestjs/src/services/workflow-executor.service.ts. * + * `delay` splits a run into two halves: everything before the delay + * runs inline, everything after is persisted as a DeferredWorkflowJob + * row and picked up by Escalated\Cron\Deferred_Workflow_Jobs_Check + * once the wait expires. + * * One failing action never halts the others — matches NestJS. Unknown * action types warn-log (via error_log when WP_DEBUG) and skip. */ @@ -46,7 +53,15 @@ public function __construct( public function execute(object $ticket, ?string $actions_json): array { $actions = $this->parse_actions($actions_json); - foreach ($actions as $action) { + $count = count($actions); + for ($i = 0; $i < $count; $i++) { + $action = $actions[$i]; + if (($action['type'] ?? '') === 'delay') { + $remaining = array_slice($actions, $i + 1); + $this->schedule_delay($ticket, (string) ($action['value'] ?? ''), $remaining); + + return $actions; + } try { $this->dispatch($ticket, $action); } catch (\Throwable $e) { @@ -57,6 +72,73 @@ public function execute(object $ticket, ?string $actions_json): array return $actions; } + /** + * Persist remaining actions to the deferred-jobs queue with + * run_at = now + $seconds. Logs a warning + skips when the value + * isn't a positive integer. Mirrors NestJS scheduleDelay. + * + * @param array> $remaining + */ + protected function schedule_delay(object $ticket, string $value, array $remaining): void + { + $seconds = (int) $value; + if (! ctype_digit($value) || $seconds <= 0) { + $this->log_debug(sprintf( + 'delay: invalid seconds value "%s", skipping remaining actions', + $value + )); + + return; + } + $run_at = gmdate('Y-m-d H:i:s', time() + $seconds); + DeferredWorkflowJob::create([ + 'ticket_id' => (int) $ticket->id, + 'remaining_actions' => wp_json_encode($remaining), + 'run_at' => $run_at, + 'status' => 'pending', + ]); + } + + /** + * Dispatch every pending deferred job whose `run_at` has elapsed. + * + * For each row, re-loads the ticket, re-invokes execute() with the + * stored remaining_actions JSON, and flips status to `done` / + * `failed`. Called by Cron\Deferred_Workflow_Jobs_Check. + * + * @return array{processed:int,failed:int} + */ + public function run_due_deferred_jobs(): array + { + $processed = 0; + $failed = 0; + foreach (DeferredWorkflowJob::pending() as $job) { + try { + $ticket = Ticket::find((int) $job->ticket_id); + if (! $ticket) { + DeferredWorkflowJob::update((int) $job->id, [ + 'status' => 'failed', + 'last_error' => sprintf('Ticket #%d not found', (int) $job->ticket_id), + ]); + $failed++; + + continue; + } + $this->execute($ticket, $job->remaining_actions); + DeferredWorkflowJob::update((int) $job->id, ['status' => 'done']); + $processed++; + } catch (\Throwable $e) { + DeferredWorkflowJob::update((int) $job->id, [ + 'status' => 'failed', + 'last_error' => $e->getMessage(), + ]); + $failed++; + } + } + + return ['processed' => $processed, 'failed' => $failed]; + } + protected function parse_actions(?string $actions_json): array { if (empty($actions_json)) { diff --git a/includes/class-activator.php b/includes/class-activator.php index c7ffa64..467c363 100644 --- a/includes/class-activator.php +++ b/includes/class-activator.php @@ -390,6 +390,23 @@ private static function create_tables(): void KEY priority (priority) ) $charset_collate;"; dbDelta($sql); + + // escalated_deferred_workflow_jobs — queue row for the `delay` + // workflow action. Cron\Deferred_Workflow_Jobs_Check polls for + // status=pending + run_at <= now and re-dispatches remaining_actions. + $sql = "CREATE TABLE {$prefix}deferred_workflow_jobs ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + ticket_id BIGINT UNSIGNED NOT NULL, + remaining_actions LONGTEXT NOT NULL, + run_at DATETIME NOT NULL, + status VARCHAR(16) NOT NULL DEFAULT 'pending', + last_error TEXT NULL, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL, + PRIMARY KEY (id), + KEY status_runat (status, run_at) + ) $charset_collate;"; + dbDelta($sql); } /** @@ -785,6 +802,10 @@ private static function schedule_cron_events(): void if (! wp_next_scheduled('escalated_chat_cleanup')) { wp_schedule_event(time(), 'escalated_every_minute', 'escalated_chat_cleanup'); } + + if (! wp_next_scheduled('escalated_run_due_deferred_workflow_jobs')) { + wp_schedule_event(time(), 'escalated_every_minute', 'escalated_run_due_deferred_workflow_jobs'); + } } /** diff --git a/includes/class-escalated.php b/includes/class-escalated.php index 0658589..0aa5040 100644 --- a/includes/class-escalated.php +++ b/includes/class-escalated.php @@ -39,6 +39,7 @@ public function boot(): void (new Services\BroadcastService)->register(); (new Cron\Snooze_Check)->register(); (new Cron\Chat_Cleanup)->register(); + (new Cron\Deferred_Workflow_Jobs_Check)->register(); Cli\AutomationCommand::register(); } diff --git a/tests/Test_Workflow_Executor_Service.php b/tests/Test_Workflow_Executor_Service.php index 81322a0..9b72348 100644 --- a/tests/Test_Workflow_Executor_Service.php +++ b/tests/Test_Workflow_Executor_Service.php @@ -9,6 +9,7 @@ * the service boundary. */ +use Escalated\Models\DeferredWorkflowJob; use Escalated\Models\Reply; use Escalated\Models\Tag; use Escalated\Models\Ticket; @@ -272,4 +273,141 @@ public function test_execute_returns_parsed_action_list(): void $this->assertEquals('change_priority', $result[0]['type']); $this->assertEquals('add_note', $result[1]['type']); } + + // --- delay action --- + + public function test_execute_delay_pauses_and_persists_remaining(): void + { + global $wpdb; + $ticket = $this->make_ticket(['priority' => 'low']); + $before = time(); + + $this->executor->execute( + $ticket, + wp_json_encode([ + ['type' => 'change_priority', 'value' => 'high'], + ['type' => 'delay', 'value' => '60'], + ['type' => 'add_note', 'value' => 'after wait'], + ]) + ); + + // Pre-delay action ran. + $fresh = Ticket::find($ticket->id); + $this->assertEquals('high', $fresh->priority); + + // Post-delay action did NOT run: no note reply inserted yet. + $reply_table = Reply::table(); + $notes = (int) $wpdb->get_var( + $wpdb->prepare( + "SELECT COUNT(*) FROM {$reply_table} WHERE ticket_id = %d AND type = %s", + $ticket->id, + 'note' + ) + ); + $this->assertEquals(0, $notes); + + // One DeferredWorkflowJob row was persisted with the remaining tail. + $jobs_table = DeferredWorkflowJob::table(); + $row = $wpdb->get_row( + $wpdb->prepare("SELECT * FROM {$jobs_table} WHERE ticket_id = %d", $ticket->id) + ); + $this->assertNotNull($row); + $this->assertEquals('pending', $row->status); + $remaining = json_decode($row->remaining_actions, true); + $this->assertCount(1, $remaining); + $this->assertEquals('add_note', $remaining[0]['type']); + $this->assertGreaterThanOrEqual($before + 59, strtotime($row->run_at . ' UTC')); + } + + public function test_execute_delay_invalid_value_skips_remaining(): void + { + global $wpdb; + $ticket = $this->make_ticket(['priority' => 'low']); + + $this->executor->execute( + $ticket, + wp_json_encode([ + ['type' => 'delay', 'value' => 'nonsense'], + ['type' => 'change_priority', 'value' => 'urgent'], + ]) + ); + + // Priority unchanged — post-delay action did not run. + $fresh = Ticket::find($ticket->id); + $this->assertEquals('low', $fresh->priority); + + // No deferred job row was created. + $jobs_table = DeferredWorkflowJob::table(); + $count = (int) $wpdb->get_var( + $wpdb->prepare("SELECT COUNT(*) FROM {$jobs_table} WHERE ticket_id = %d", $ticket->id) + ); + $this->assertEquals(0, $count); + } + + public function test_run_due_deferred_jobs_resumes_and_marks_done(): void + { + global $wpdb; + $ticket = $this->make_ticket(['priority' => 'low']); + + // Seed a job whose run_at has already elapsed. + $id = DeferredWorkflowJob::create([ + 'ticket_id' => $ticket->id, + 'remaining_actions' => wp_json_encode([ + ['type' => 'change_priority', 'value' => 'urgent'], + ]), + 'run_at' => gmdate('Y-m-d H:i:s', time() - 60), + 'status' => 'pending', + ]); + $this->assertNotFalse($id); + + $result = $this->executor->run_due_deferred_jobs(); + + $this->assertEquals(1, $result['processed']); + $this->assertEquals(0, $result['failed']); + + // Ticket priority was updated. + $fresh = Ticket::find($ticket->id); + $this->assertEquals('urgent', $fresh->priority); + + // Row flipped to done. + $row = DeferredWorkflowJob::find($id); + $this->assertEquals('done', $row->status); + } + + public function test_run_due_deferred_jobs_marks_failed_when_ticket_missing(): void + { + $id = DeferredWorkflowJob::create([ + 'ticket_id' => 9_999_999, + 'remaining_actions' => wp_json_encode([]), + 'run_at' => gmdate('Y-m-d H:i:s', time() - 60), + 'status' => 'pending', + ]); + + $result = $this->executor->run_due_deferred_jobs(); + + $this->assertEquals(0, $result['processed']); + $this->assertEquals(1, $result['failed']); + + $row = DeferredWorkflowJob::find($id); + $this->assertEquals('failed', $row->status); + $this->assertStringContainsString('9999999', $row->last_error); + } + + public function test_run_due_deferred_jobs_skips_rows_not_yet_due(): void + { + $id = DeferredWorkflowJob::create([ + 'ticket_id' => $this->make_ticket()->id, + 'remaining_actions' => wp_json_encode([]), + 'run_at' => gmdate('Y-m-d H:i:s', time() + 3600), + 'status' => 'pending', + ]); + + $result = $this->executor->run_due_deferred_jobs(); + + $this->assertEquals(0, $result['processed']); + $this->assertEquals(0, $result['failed']); + + $row = DeferredWorkflowJob::find($id); + $this->assertEquals('pending', $row->status); + } }