Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Dark mode support across admin and frontend CSS.
- Granular permission tables and seeder on activation.
- "Powered by Escalated" badge with admin toggle.
- Workflow `delay` action — pauses a workflow run for N seconds and resumes the remaining actions via a per-minute WP-Cron sweep. Backed by a new `escalated_deferred_workflow_jobs` table with a composite `(status, run_at)` index for efficient polling. Existing installs need to reactivate the plugin to pick up the new table.

### Changed
- License changed from GPL-2.0-or-later to MIT.
Expand Down
35 changes: 35 additions & 0 deletions includes/Cron/class-deferred-workflow-jobs-check.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Escalated\Cron;

use Escalated\Services\WorkflowExecutorService;

/**
* Runs once per minute via WP-Cron to pick up pending
* DeferredWorkflowJob rows whose wait has elapsed and resume their
* remaining_actions. Flips each row's status to `done` / `failed`
* after execution so it's never re-picked up.
*
* Host apps with a misconfigured scheduler (DISABLE_WP_CRON without a
* system cron replacement) will see delay actions pile up as `pending`
* indefinitely.
*/
class Deferred_Workflow_Jobs_Check
{
/**
* Register the cron hook.
*/
public function register(): void
{
add_action('escalated_run_due_deferred_workflow_jobs', [$this, 'run']);
}

/**
* Dispatch all pending deferred workflow jobs whose run_at has elapsed.
*/
public function run(): void
{
$service = new WorkflowExecutorService;
$service->run_due_deferred_jobs();
}
}
106 changes: 106 additions & 0 deletions includes/Models/DeferredWorkflowJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

namespace Escalated\Models;

use Escalated\Escalated;

/**
* Queue row for a paused workflow run — populated by the `delay`
* workflow action when execution hits a wait clause, consumed by
* Escalated\Cron\Deferred_Workflow_Jobs_Check to resume.
*
* Rows are soft-terminal: the cron handler flips `status` to `done`
* (or `failed`) after running so they don't get re-picked up, and
* retains the row for audit.
*
* Mirrors escalated-nestjs/src/entities/deferred-workflow-job.entity.ts.
*/
class DeferredWorkflowJob
{
/**
* Get the table name.
*
* @return string
*/
public static function table()
{
return Escalated::table('deferred_workflow_jobs');
}

/**
* Find a row by ID.
*
* @param int $id
* @return object|null
*/
public static function find($id)
{
global $wpdb;
$table = static::table();

return $wpdb->get_row(
$wpdb->prepare("SELECT * FROM {$table} WHERE id = %d", $id)
);
}

/**
* Create a new deferred job row. `remaining_actions` is JSON-encoded
* by the caller so consumers can decode + re-dispatch.
*
* @return int|false Inserted ID or false on failure.
*/
public static function create(array $data)
{
global $wpdb;
$table = static::table();
$now = current_time('mysql');

$data['created_at'] = $now;
$data['updated_at'] = $now;
$data['status'] = $data['status'] ?? 'pending';

$result = $wpdb->insert($table, $data);

return $result !== false ? $wpdb->insert_id : false;
}

/**
* Update a row.
*
* @param int $id
* @return bool
*/
public static function update($id, array $data)
{
global $wpdb;
$table = static::table();

$data['updated_at'] = current_time('mysql');

return $wpdb->update($table, $data, ['id' => $id]) !== false;
}

/**
* Fetch every `pending` row whose `run_at` has elapsed.
*
* `run_at` is stored in UTC (written via `gmdate` in
* {@see WorkflowExecutorService::schedule_delay}), so we compare
* against GMT here regardless of WordPress's configured timezone.
*
* @return array<object>
*/
public static function pending(): array
{
global $wpdb;
$table = static::table();
$now_gmt = current_time('mysql', true);

return $wpdb->get_results(
$wpdb->prepare(
"SELECT * FROM {$table} WHERE status = %s AND run_at <= %s ORDER BY run_at ASC",
'pending',
$now_gmt
)
) ?: [];
}
}
88 changes: 85 additions & 3 deletions includes/Services/WorkflowExecutorService.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

namespace Escalated\Services;

use Escalated\Models\DeferredWorkflowJob;
use Escalated\Models\Reply;
use Escalated\Models\Tag;
use Escalated\Models\Ticket;

/**
* Performs the side-effects dictated by a matched Workflow.
Expand All @@ -13,10 +15,15 @@
* dispatches each entry to the relevant service.
*
* Action catalog: change_priority, change_status, assign_agent,
* set_department, add_tag, remove_tag, add_note, insert_canned_reply.
* Mirrors the NestJS reference impl in
* set_department, add_tag, remove_tag, add_note, insert_canned_reply,
* delay. Mirrors the NestJS reference impl in
* escalated-nestjs/src/services/workflow-executor.service.ts.
*
* `delay` splits a run into two halves: everything before the delay
* runs inline, everything after is persisted as a DeferredWorkflowJob
* row and picked up by Escalated\Cron\Deferred_Workflow_Jobs_Check
* once the wait expires.
*
* One failing action never halts the others — matches NestJS. Unknown
* action types warn-log (via error_log when WP_DEBUG) and skip.
*/
Expand Down Expand Up @@ -46,7 +53,15 @@ public function __construct(
public function execute(object $ticket, ?string $actions_json): array
{
$actions = $this->parse_actions($actions_json);
foreach ($actions as $action) {
$count = count($actions);
for ($i = 0; $i < $count; $i++) {
$action = $actions[$i];
if (($action['type'] ?? '') === 'delay') {
$remaining = array_slice($actions, $i + 1);
$this->schedule_delay($ticket, (string) ($action['value'] ?? ''), $remaining);

return $actions;
}
try {
$this->dispatch($ticket, $action);
} catch (\Throwable $e) {
Expand All @@ -57,6 +72,73 @@ public function execute(object $ticket, ?string $actions_json): array
return $actions;
}

/**
* Persist remaining actions to the deferred-jobs queue with
* run_at = now + $seconds. Logs a warning + skips when the value
* isn't a positive integer. Mirrors NestJS scheduleDelay.
*
* @param array<int,array<string,mixed>> $remaining
*/
protected function schedule_delay(object $ticket, string $value, array $remaining): void
{
$seconds = (int) $value;
if (! ctype_digit($value) || $seconds <= 0) {
$this->log_debug(sprintf(
'delay: invalid seconds value "%s", skipping remaining actions',
$value
));

return;
}
$run_at = gmdate('Y-m-d H:i:s', time() + $seconds);
DeferredWorkflowJob::create([
'ticket_id' => (int) $ticket->id,
'remaining_actions' => wp_json_encode($remaining),
'run_at' => $run_at,
'status' => 'pending',
]);
}

/**
* Dispatch every pending deferred job whose `run_at` has elapsed.
*
* For each row, re-loads the ticket, re-invokes execute() with the
* stored remaining_actions JSON, and flips status to `done` /
* `failed`. Called by Cron\Deferred_Workflow_Jobs_Check.
*
* @return array{processed:int,failed:int}
*/
public function run_due_deferred_jobs(): array
{
$processed = 0;
$failed = 0;
foreach (DeferredWorkflowJob::pending() as $job) {
try {
$ticket = Ticket::find((int) $job->ticket_id);
if (! $ticket) {
DeferredWorkflowJob::update((int) $job->id, [
'status' => 'failed',
'last_error' => sprintf('Ticket #%d not found', (int) $job->ticket_id),
]);
$failed++;

continue;
}
$this->execute($ticket, $job->remaining_actions);
DeferredWorkflowJob::update((int) $job->id, ['status' => 'done']);
$processed++;
} catch (\Throwable $e) {
DeferredWorkflowJob::update((int) $job->id, [
'status' => 'failed',
'last_error' => $e->getMessage(),
]);
$failed++;
}
}

return ['processed' => $processed, 'failed' => $failed];
}

protected function parse_actions(?string $actions_json): array
{
if (empty($actions_json)) {
Expand Down
21 changes: 21 additions & 0 deletions includes/class-activator.php
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,23 @@ private static function create_tables(): void
KEY priority (priority)
) $charset_collate;";
dbDelta($sql);

// escalated_deferred_workflow_jobs — queue row for the `delay`
// workflow action. Cron\Deferred_Workflow_Jobs_Check polls for
// status=pending + run_at <= now and re-dispatches remaining_actions.
$sql = "CREATE TABLE {$prefix}deferred_workflow_jobs (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
ticket_id BIGINT UNSIGNED NOT NULL,
remaining_actions LONGTEXT NOT NULL,
run_at DATETIME NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'pending',
last_error TEXT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
PRIMARY KEY (id),
KEY status_runat (status, run_at)
) $charset_collate;";
dbDelta($sql);
}

/**
Expand Down Expand Up @@ -785,6 +802,10 @@ private static function schedule_cron_events(): void
if (! wp_next_scheduled('escalated_chat_cleanup')) {
wp_schedule_event(time(), 'escalated_every_minute', 'escalated_chat_cleanup');
}

if (! wp_next_scheduled('escalated_run_due_deferred_workflow_jobs')) {
wp_schedule_event(time(), 'escalated_every_minute', 'escalated_run_due_deferred_workflow_jobs');
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions includes/class-escalated.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public function boot(): void
(new Services\BroadcastService)->register();
(new Cron\Snooze_Check)->register();
(new Cron\Chat_Cleanup)->register();
(new Cron\Deferred_Workflow_Jobs_Check)->register();

Cli\AutomationCommand::register();
}
Expand Down
Loading