diff --git a/.claude/skills/int-evolution-go/scripts/evolution_go_client.py b/.claude/skills/int-evolution-go/scripts/evolution_go_client.py
index 67362cce..776f7110 100755
--- a/.claude/skills/int-evolution-go/scripts/evolution_go_client.py
+++ b/.claude/skills/int-evolution-go/scripts/evolution_go_client.py
@@ -6,7 +6,11 @@
import argparse
import json
import os
+import random
+import socket
import sys
+import time
+import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
@@ -43,38 +47,125 @@ def get_config():
return url.rstrip("/"), key
+def get_webshare_config():
+ """Return proxy dict from WEBSHARE_* env vars, or None if not configured."""
+ host = os.environ.get("WEBSHARE_PROXY_HOST")
+ port = os.environ.get("WEBSHARE_PROXY_PORT")
+ username = os.environ.get("WEBSHARE_PROXY_USERNAME")
+ password = os.environ.get("WEBSHARE_PROXY_PASSWORD")
+ if not all([host, port, username, password]):
+ return None
+ return {
+ "host": host,
+ "port": int(port),
+ "protocol": "http",
+ "username": username,
+ "password": password,
+ }
+
+
+def _retry_http_call_client(do_call, max_attempts=3, base_delay=2.0, max_delay=8.0):
+ """Exponential backoff + jitter for Evolution Go API calls.
+
+ Retries on HTTP 5xx, urllib.error.URLError, and socket.timeout (transient).
+ NEVER retries on HTTP 4xx (deterministic client errors).
+
+ Returns the result of do_call() on success.
+ Raises the last exception after max_attempts are exhausted.
+ Raises immediately on HTTP 4xx (no retry).
+ """
+ last_exc = None
+ for attempt in range(max_attempts):
+ try:
+ return do_call()
+ except urllib.error.HTTPError as e:
+ if e.code < 500:
+ # 4xx — deterministic, raise immediately (caller decides sys.exit vs raise)
+ raise
+ last_exc = e
+ if attempt < max_attempts - 1:
+ delay = min(base_delay ** attempt + random.uniform(0, 0.5), max_delay)
+ print(
+ json.dumps({
+ "evt": "api_request_retry",
+ "attempt": attempt + 1,
+ "max_attempts": max_attempts,
+ "http_status": e.code,
+ "delay_s": round(delay, 2),
+ })
+ )
+ time.sleep(delay)
+ else:
+ print(
+ json.dumps({
+ "evt": "api_request_failed",
+ "attempt": attempt + 1,
+ "max_attempts": max_attempts,
+ "http_status": e.code,
+ "category": "transient",
+ })
+ )
+ except (urllib.error.URLError, socket.timeout) as e:
+ last_exc = e
+ if attempt < max_attempts - 1:
+ delay = min(base_delay ** attempt + random.uniform(0, 0.5), max_delay)
+ print(
+ json.dumps({
+ "evt": "api_request_retry",
+ "attempt": attempt + 1,
+ "max_attempts": max_attempts,
+ "error": str(e),
+ "delay_s": round(delay, 2),
+ })
+ )
+ time.sleep(delay)
+ else:
+ print(
+ json.dumps({
+ "evt": "api_request_failed",
+ "attempt": attempt + 1,
+ "max_attempts": max_attempts,
+ "error": str(e),
+ "category": "transient",
+ })
+ )
+ raise last_exc
+
+
def api_request(method, path, data=None):
- """Make an HTTP request to the Evolution Go API."""
+ """Make an HTTP request to the Evolution Go API.
+
+ /send/* endpoints require the instance token (EVOLUTION_GO_INSTANCE_TOKEN).
+ Management endpoints (/instance/*) use the global API key (EVOLUTION_GO_KEY).
+
+ Applies exponential backoff + jitter on HTTP 5xx / network errors (up to 3 attempts).
+ On HTTP 4xx: raises urllib.error.HTTPError immediately (no retry, deterministic error).
+ On persistent failure after retries: raises the last exception instead of sys.exit(1),
+ allowing library callers to handle it; CLI __main__ catches and sys.exit(1) as before.
+ """
base_url, api_key = get_config()
url = f"{base_url}{path}"
body = json.dumps(data).encode("utf-8") if data else None
- req = urllib.request.Request(
- url,
- data=body,
- method=method,
- headers={
- "apikey": api_key,
- "Content-Type": "application/json",
- },
- )
- try:
+ def _do_call():
+ req = urllib.request.Request(
+ url,
+ data=body,
+ method=method,
+ headers={
+ "apikey": api_key,
+ "Content-Type": "application/json",
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
+ },
+ )
with urllib.request.urlopen(req) as resp:
raw = resp.read()
if raw:
return json.loads(raw)
return {"message": "success"}
- except urllib.error.HTTPError as e:
- try:
- error_body = json.loads(e.read())
- except Exception:
- error_body = {"error": str(e)}
- print(json.dumps({"error": f"HTTP {e.code}", "details": error_body}, indent=2))
- sys.exit(1)
- except urllib.error.URLError as e:
- print(json.dumps({"error": f"Connection failed: {e.reason}"}))
- sys.exit(1)
+
+ return _retry_http_call_client(_do_call)
def to_jid(number):
@@ -523,12 +614,23 @@ def main():
}
handler = commands.get(args.command)
- if handler:
- handler(args)
- else:
+ if not handler:
print(json.dumps({"error": f"Unknown command: {args.command}"}))
sys.exit(1)
+ try:
+ handler(args)
+ except urllib.error.HTTPError as e:
+ try:
+ error_body = json.loads(e.read())
+ except Exception:
+ error_body = {"error": str(e)}
+ print(json.dumps({"error": f"HTTP {e.code}", "details": error_body}, indent=2))
+ sys.exit(1)
+ except (urllib.error.URLError, socket.timeout) as e:
+ print(json.dumps({"error": f"Connection failed: {e}"}))
+ sys.exit(1)
+
if __name__ == "__main__":
main()
diff --git a/ADWs/runner.py b/ADWs/runner.py
index 5a2cda4b..a44c715a 100644
--- a/ADWs/runner.py
+++ b/ADWs/runner.py
@@ -7,6 +7,9 @@
import os
import sys
import json
+import random
+import time
+import urllib.error
from datetime import datetime
from pathlib import Path
@@ -444,6 +447,199 @@ def summary(results: list, title: str = "Completed"):
))
+def send_whatsapp_file(filepath: str, caption: str = "", phone: str = None, expires_in: int = 3600) -> bool:
+ """Upload a file to Cloudflare R2 and send it via WhatsApp (Evolution Go).
+
+ Uploads to R2 under "tmp/-", generates a presigned URL
+ valid for `expires_in` seconds, then calls /send/media.
+ Files in tmp/ are NOT auto-deleted — run periodic cleanup or use backup.py prune.
+
+ Requires: boto3, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT_URL,
+ BACKUP_S3_BUCKET, EVOLUTION_GO_URL, EVOLUTION_GO_INSTANCE_TOKEN.
+ """
+ import boto3
+ import urllib.request
+ from pathlib import Path
+
+ filepath = Path(filepath)
+ if not filepath.exists():
+ console.print(f" [error]✗ Arquivo não encontrado: {filepath}[/error]")
+ return False
+
+ bucket = os.environ.get("BACKUP_S3_BUCKET", "")
+ endpoint_url = os.environ.get("AWS_ENDPOINT_URL", "")
+ if not bucket or not endpoint_url:
+ console.print(" [warning]⚠ R2 não configurado (BACKUP_S3_BUCKET ou AWS_ENDPOINT_URL ausente)[/warning]")
+ return False
+
+ base_url = os.environ.get("EVOLUTION_GO_URL", "").rstrip("/")
+ token = os.environ.get("EVOLUTION_GO_INSTANCE_TOKEN", "")
+ to_phone = phone or os.environ.get("NOTIFY_WHATSAPP_PHONE", "")
+ if not base_url or not token or not to_phone:
+ console.print(" [warning]⚠ Evolution Go não configurado[/warning]")
+ return False
+
+ # Upload to R2 — timestamp prefix avoids name collisions
+ s3_key = f"tmp/{datetime.now().strftime('%Y%m%d-%H%M%S')}-{filepath.name}"
+ console.print(f" [step]▶[/step] Upload R2: {filepath.name}", end="")
+ try:
+ s3 = boto3.client("s3", endpoint_url=endpoint_url)
+ s3.upload_file(str(filepath), bucket, s3_key)
+ presigned_url = s3.generate_presigned_url(
+ "get_object",
+ Params={"Bucket": bucket, "Key": s3_key},
+ ExpiresIn=expires_in,
+ )
+ console.print(f"\r [success]✓[/success] Upload R2: {filepath.name}")
+ except Exception as e:
+ console.print(f"\r [error]✗[/error] Upload R2 falhou: {e}")
+ return False
+
+ # Detect media type
+ suffix = filepath.suffix.lower()
+ if suffix in (".jpg", ".jpeg", ".png", ".gif", ".webp"):
+ mediatype = "image"
+ elif suffix in (".mp4", ".mov", ".avi"):
+ mediatype = "video"
+ elif suffix in (".mp3", ".ogg", ".m4a", ".wav"):
+ mediatype = "audio"
+ else:
+ mediatype = "document"
+
+ # Send via Evolution Go
+ jid = f"{to_phone}@s.whatsapp.net" if "@" not in to_phone else to_phone
+ payload = json.dumps({
+ "number": jid,
+ "url": presigned_url,
+ "type": mediatype,
+ "fileName": filepath.name,
+ "caption": caption,
+ }).encode("utf-8")
+ req = urllib.request.Request(
+ f"{base_url}/send/media",
+ data=payload,
+ method="POST",
+ headers={
+ "apikey": token,
+ "Content-Type": "application/json",
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
+ },
+ )
+ try:
+ with urllib.request.urlopen(req, timeout=15) as resp:
+ ok = resp.status == 200
+ if ok:
+ console.print(f" [success]✓[/success] WhatsApp arquivo enviado: {filepath.name}")
+ else:
+ console.print(f" [warning]⚠ WhatsApp status {resp.status}[/warning]")
+ return ok
+ except Exception as e:
+ console.print(f" [warning]⚠ WhatsApp erro ao enviar arquivo: {e}[/warning]")
+ return False
+
+
+def _retry_http_call(do_call, max_attempts=3, base_delay=2.0, max_delay=8.0):
+ """Generic retry wrapper with exponential backoff + jitter.
+
+ do_call() must return True on success, raise urllib.error.HTTPError or
+ urllib.error.URLError / socket.timeout on failure.
+
+ Retries only on HTTP 5xx, URLError, and socket.timeout (transient).
+ NEVER retries on HTTP 4xx (deterministic client errors).
+
+ Returns (ok: bool, attempts: int, error_category: str | None).
+ Worst-case latency (3 attempts, all 5xx):
+ sleep 0 + sleep ~2.5 + sleep ~4.5 ≈ 7 s total.
+ """
+ import socket
+
+ last_error_category = None
+ for attempt in range(max_attempts):
+ try:
+ result = do_call()
+ return result, attempt + 1, None
+ except urllib.error.HTTPError as e:
+ if e.code < 500:
+ # 4xx — deterministic client error, no retry
+ console.print(
+ f" [warning]⚠ WhatsApp HTTP {e.code} (client error, no retry)[/warning]"
+ )
+ return False, attempt + 1, "permanent"
+ last_error_category = "transient"
+ if attempt < max_attempts - 1:
+ delay = min(base_delay ** attempt + random.uniform(0, 0.5), max_delay)
+ console.print(
+ f" [warning]⚠ WhatsApp HTTP {e.code} (attempt {attempt + 1}/{max_attempts},"
+ f" retry in {delay:.1f}s)[/warning]"
+ )
+ time.sleep(delay)
+ else:
+ console.print(
+ f" [warning]⚠ WhatsApp HTTP {e.code} (attempt {attempt + 1}/{max_attempts},"
+ f" giving up)[/warning]"
+ )
+ except (urllib.error.URLError, socket.timeout) as e:
+ last_error_category = "transient"
+ if attempt < max_attempts - 1:
+ delay = min(base_delay ** attempt + random.uniform(0, 0.5), max_delay)
+ console.print(
+ f" [warning]⚠ WhatsApp network error: {e}"
+ f" (attempt {attempt + 1}/{max_attempts}, retry in {delay:.1f}s)[/warning]"
+ )
+ time.sleep(delay)
+ else:
+ console.print(
+ f" [warning]⚠ WhatsApp network error: {e}"
+ f" (attempt {attempt + 1}/{max_attempts}, giving up)[/warning]"
+ )
+ return False, max_attempts, last_error_category
+
+
+def send_whatsapp(text: str, phone: str = None) -> bool:
+ """Send a WhatsApp message via Evolution Go (no MCP dependency).
+
+ Uses the EvoNexus instance token (EVOLUTION_GO_INSTANCE_TOKEN) which
+ authenticates /send/* endpoints — different from the global EVOLUTION_GO_KEY.
+ Reads EVOLUTION_GO_URL, EVOLUTION_GO_INSTANCE_TOKEN, NOTIFY_WHATSAPP_PHONE from env.
+ Applies exponential backoff + jitter on HTTP 5xx / network errors (up to 3 attempts).
+ Returns True if sent successfully, False otherwise.
+ """
+ import urllib.request
+
+ base_url = os.environ.get("EVOLUTION_GO_URL", "").rstrip("/")
+ token = os.environ.get("EVOLUTION_GO_INSTANCE_TOKEN", "")
+ to_phone = phone or os.environ.get("NOTIFY_WHATSAPP_PHONE", "")
+
+ if not base_url or not token or not to_phone:
+ console.print(" [warning]⚠ WhatsApp not configured (missing EVOLUTION_GO_URL, INSTANCE_TOKEN or NOTIFY_PHONE)[/warning]")
+ return False
+
+ jid = f"{to_phone}@s.whatsapp.net" if "@" not in to_phone else to_phone
+ payload = json.dumps({"number": jid, "text": text}).encode("utf-8")
+
+ def _do_call():
+ req = urllib.request.Request(
+ f"{base_url}/send/text",
+ data=payload,
+ method="POST",
+ headers={
+ "apikey": token,
+ "Content-Type": "application/json",
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
+ },
+ )
+ with urllib.request.urlopen(req, timeout=10) as resp:
+ return resp.status == 200
+
+ ok, attempts, category = _retry_http_call(_do_call)
+ console.print(
+ f" {'[success]✓[/success] WhatsApp enviado' if ok else '[warning]⚠ WhatsApp falhou[/warning]'}"
+ f" action=send_whatsapp attempts={attempts} final_status={'ok' if ok else 'fail'}"
+ f" category={category or 'none'}"
+ )
+ return ok
+
+
def send_telegram(text: str, chat_id: str = None) -> bool:
"""Send a Telegram message via bot API (no MCP dependency).
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index f4fd5855..8070b35b 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -12,7 +12,7 @@ Harassment, discrimination, or abusive behavior will not be tolerated.
### Reporting Bugs
-1. Check existing [issues](https://github.com/EvolutionAPI/evo-nexus/issues)
+1. Check existing [issues](https://github.com/evolution-foundation/evo-nexus/issues)
to avoid duplicates
2. Open a new issue with:
- Clear, descriptive title
diff --git a/README.md b/README.md
index 73e204c7..70be4e38 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
-
+
@@ -46,7 +46,7 @@ It turns a single CLI installation into a team of **38 specialized agents** orga
## Part of the Evolution Foundation ecosystem
-EvoNexus is one of the projects maintained by Evolution Foundation. It is the operating layer that orchestrates the Foundation's own work — including the development of [Evo CRM Community](https://github.com/EvolutionAPI/evo-crm-community), [Evolution API](https://github.com/EvolutionAPI/evolution-api) and [Evolution Go](https://github.com/EvolutionAPI/evolution-go).
+EvoNexus is one of the projects maintained by Evolution Foundation. It is the operating layer that orchestrates the Foundation's own work — including the development of [Evo CRM Community](https://github.com/evolution-foundation/evo-crm-community), [Evolution API](https://github.com/evolution-foundation/evolution-api) and [Evolution Go](https://github.com/evolution-foundation/evolution-go).
### Why EvoNexus?
@@ -101,7 +101,7 @@ EvoNexus is one of the projects maintained by Evolution Foundation. It is the op
### Method 1 — Docker (no setup, runs anywhere)
```bash
-curl -O https://raw.githubusercontent.com/EvolutionAPI/evo-nexus/main/docker-compose.hub.yml
+curl -O https://raw.githubusercontent.com/evolution-foundation/evo-nexus/main/docker-compose.hub.yml
docker compose -f docker-compose.hub.yml up -d
open http://localhost:8080
```
@@ -117,7 +117,7 @@ npx @evoapi/evo-nexus
### Method 3 — Manual clone (developers / contributors)
```bash
-git clone --depth 1 https://github.com/EvolutionAPI/evo-nexus.git
+git clone --depth 1 https://github.com/evolution-foundation/evo-nexus.git
cd evo-nexus
# Interactive setup wizard
diff --git a/dashboard/backend/app.py b/dashboard/backend/app.py
index 2dccb597..7b3e1acb 100644
--- a/dashboard/backend/app.py
+++ b/dashboard/backend/app.py
@@ -613,6 +613,43 @@ def _cors_allowed_origins():
except Exception:
pass
_conn.commit()
+
+ # --- WhatsApp retry pattern: idempotency_key + error_category + last_replay_at (PR-1 2026-05-11) ---
+ # Rollback: DROP INDEX uq_trigger_idem; DROP INDEX ix_trigger_executions_idem_key
+ # Columns are nullable — old code ignores them without breaking.
+ _te_cols = {row[1] for row in _cur.execute("PRAGMA table_info(trigger_executions)").fetchall()}
+ if "idempotency_key" not in _te_cols:
+ _cur.execute("ALTER TABLE trigger_executions ADD COLUMN idempotency_key TEXT")
+ _conn.commit()
+ if "error_category" not in _te_cols:
+ _cur.execute("ALTER TABLE trigger_executions ADD COLUMN error_category TEXT")
+ _conn.commit()
+ if "last_replay_at" not in _te_cols:
+ _cur.execute("ALTER TABLE trigger_executions ADD COLUMN last_replay_at TIMESTAMP")
+ _conn.commit()
+ # Basic index for idempotency lookups by key alone
+ try:
+ _cur.execute(
+ "CREATE INDEX IF NOT EXISTS ix_trigger_executions_idem_key "
+ "ON trigger_executions (idempotency_key)"
+ )
+ _conn.commit()
+ except Exception:
+ pass
+ # Partial unique index: enforces (trigger_id, idempotency_key) uniqueness only when key IS NOT NULL.
+ # SQLite >= 3.8 supports partial indices natively; our runtime is 3.51 (confirmed).
+ # This is the DB-level guard against race-condition duplicates (Step 2 handles app-level dedup).
+ try:
+ _cur.execute(
+ "CREATE UNIQUE INDEX IF NOT EXISTS uq_trigger_idem "
+ "ON trigger_executions (trigger_id, idempotency_key) "
+ "WHERE idempotency_key IS NOT NULL"
+ )
+ _conn.commit()
+ except Exception:
+ pass
+ # --- End WhatsApp retry pattern migration ---
+
_conn.close()
# --- End auto-migrate ---
diff --git a/dashboard/backend/models.py b/dashboard/backend/models.py
index a151bcd8..2dcff6f1 100644
--- a/dashboard/backend/models.py
+++ b/dashboard/backend/models.py
@@ -333,12 +333,17 @@ class TriggerExecution(db.Model):
id = db.Column(db.Integer, primary_key=True)
trigger_id = db.Column(db.Integer, db.ForeignKey("triggers.id", ondelete="CASCADE"), nullable=False)
event_data = db.Column(db.Text, nullable=True, default="{}") # JSON payload received
- status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed
+ status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed, failed_retryable
result_summary = db.Column(db.Text, nullable=True)
error = db.Column(db.Text, nullable=True)
duration_seconds = db.Column(db.Float, nullable=True)
started_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
completed_at = db.Column(db.DateTime, nullable=True)
+ # WhatsApp retry pattern (PR-1: migration 2026-05-11)
+ # rollback: DROP indices uq_trigger_idem + ix_trigger_executions_idem_key; columns are nullable, ignored by old code
+ idempotency_key = db.Column(db.String(255), nullable=True, index=True) # messageId WPP or other source dedup key
+ error_category = db.Column(db.String(20), nullable=True) # transient | permanent | validation | unknown
+ last_replay_at = db.Column(db.DateTime, nullable=True) # rate-limit: 60s between replays of the same execution
@property
def event_data_dict(self) -> dict:
@@ -358,6 +363,9 @@ def to_dict(self):
"duration_seconds": self.duration_seconds,
"started_at": self.started_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.started_at else None,
"completed_at": self.completed_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.completed_at else None,
+ "idempotency_key": self.idempotency_key,
+ "error_category": self.error_category,
+ "last_replay_at": self.last_replay_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.last_replay_at else None,
}
diff --git a/dashboard/backend/routes/triggers.py b/dashboard/backend/routes/triggers.py
index 50bc5336..89d1c928 100644
--- a/dashboard/backend/routes/triggers.py
+++ b/dashboard/backend/routes/triggers.py
@@ -12,9 +12,10 @@
import time
from pathlib import Path
from datetime import datetime, timezone
-from flask import Blueprint, jsonify, request
+from flask import Blueprint, jsonify, request, current_app
from flask_login import current_user
from models import db, Trigger, TriggerExecution, has_permission, audit
+from sqlalchemy.exc import IntegrityError
bp = Blueprint("triggers", __name__)
@@ -23,6 +24,40 @@
VALID_SOURCES = ("github", "linear", "telegram", "discord", "stripe", "custom")
VALID_ACTION_TYPES = ("skill", "prompt", "script")
+# --- WhatsApp retry pattern: DLQ error classification (PR-3 2026-05-11) ---
+# Markers that indicate a transient (retriable) failure vs a permanent one.
+# Permanent errors are deterministic (bad config, missing script, etc.)
+# and must NOT be retried without human intervention.
+_TRANSIENT_MARKERS = (
+ "HTTP 5", # HTTP 5xx from Evolution Go or any subprocess
+ "timed out",
+ "timeout",
+ "Timeout",
+ "Connection refused",
+ "Connection reset",
+ "Network is unreachable",
+ "URLError",
+ "RemoteDisconnected",
+ "BrokenPipeError",
+)
+
+
+def _classify_error(err_msg: str, exc: Exception | None = None) -> str:
+ """Return 'transient' or 'permanent' based on the exception and error text.
+
+ transient → worth retrying (HTTP 5xx, network, timeout)
+ permanent → deterministic failure, replay only on operator decision
+ """
+ if isinstance(exc, subprocess.TimeoutExpired):
+ return "transient"
+ if isinstance(exc, (ValueError, FileNotFoundError, KeyError, TypeError, AttributeError)):
+ return "permanent"
+ msg = err_msg or ""
+ if any(m in msg for m in _TRANSIENT_MARKERS):
+ return "transient"
+ return "permanent"
+# --- End DLQ classification ---
+
# Cache python command at module load time (F3)
_PYTHON_CMD = shutil.which("uv")
PYTHON_CMD = "uv run python" if _PYTHON_CMD else "python3"
@@ -245,7 +280,6 @@ def test_trigger(trigger_id):
trigger_id_int = trigger.id
trigger_name = trigger.name
- from flask import current_app
app = current_app._get_current_object()
def _run():
@@ -318,14 +352,51 @@ def webhook_receiver(trigger_id):
if not _matches_filter(event_data, trigger.event_filter_dict):
return jsonify({"status": "ok"}), 200
+ # --- WhatsApp retry pattern: idempotency key extraction (PR-1 2026-05-11) ---
+ # WPP channel: N8N forwards messageId as idempotency_key or data.messageId.
+ # Other sources (GitHub, Stripe, Linear): no key → idem_key=None → check is skipped.
+ idem_key = None
+ if isinstance(event_data, dict):
+ _data = event_data.get("data") or {}
+ idem_key = (
+ event_data.get("idempotency_key")
+ or event_data.get("messageId")
+ or _data.get("idempotency_key")
+ or _data.get("messageId")
+ or None
+ )
+
+ # Silent dedup (F6 pattern): second POST with same key returns 200 OK without re-executing.
+ if idem_key:
+ existing = TriggerExecution.query.filter_by(
+ trigger_id=trigger.id, idempotency_key=idem_key
+ ).first()
+ if existing:
+ current_app.logger.info(
+ f"evt=idempotent_replay trigger_id={trigger.id} key={idem_key} existing_exec_id={existing.id}"
+ )
+ return jsonify({"status": "ok"}), 200
+ # --- End idempotency dedup ---
+
# Create execution and run async
execution = TriggerExecution(
trigger_id=trigger.id,
event_data=json.dumps(event_data),
status="pending",
+ idempotency_key=idem_key,
)
db.session.add(execution)
- db.session.commit()
+ try:
+ db.session.commit()
+ except IntegrityError:
+ # Race condition: two simultaneous POSTs with same idempotency_key;
+ # the DB partial unique index rejected the second INSERT.
+ # Silent dedup — return 200 OK (F6) without re-executing.
+ db.session.rollback()
+ current_app.logger.info(
+ f"evt=idempotent_replay_race trigger_id={trigger.id} key={idem_key}"
+ )
+ return jsonify({"status": "ok"}), 200
# Capture IDs BEFORE handing off to the worker thread (see test_trigger
# for the same DetachedInstanceError issue) — accessing ``execution.id``
@@ -333,7 +404,6 @@ def webhook_receiver(trigger_id):
execution_id = execution.id
trigger_id_int = trigger.id
- from flask import current_app
app = current_app._get_current_object()
def _run():
@@ -343,9 +413,201 @@ def _run():
thread = threading.Thread(target=_run, daemon=True)
thread.start()
+ current_app.logger.info(
+ f"evt=trigger_webhook trigger_id={trigger_id_int} source={trigger.source}"
+ f" idem_key={idem_key!r} exec_id={execution_id}"
+ )
+
return jsonify({"status": "ok"}), 200
+# ── Replay Endpoint (Step 5 — PR-3 2026-05-11) ────────────────────────────
+
+
+@bp.route("/api/triggers/executions//replay", methods=["POST"])
+def replay_execution(exec_id: int):
+ """Replay a failed execution. Requires session auth (not a public endpoint).
+
+ Rate-limit: 60s between replays of the same execution.
+ Creates a new TriggerExecution row; marks the original as 'replayed'.
+ Returns: {"status": "ok", "new_execution_id": int} or {"error": str} + 4xx.
+ """
+ from flask_login import current_user as _cu
+ if not _cu.is_authenticated:
+ return jsonify({"error": "Forbidden"}), 403
+
+ ex = TriggerExecution.query.get(exec_id)
+ if not ex:
+ return jsonify({"error": "not_found"}), 404
+
+ if ex.status not in ("failed_retryable", "failed"):
+ return jsonify({"error": "not_replayable", "current_status": ex.status}), 400
+
+ # Rate-limit: max 1 replay per execution per 60 seconds
+ if ex.last_replay_at is not None:
+ elapsed = (datetime.now(timezone.utc) - ex.last_replay_at).total_seconds()
+ if elapsed < 60:
+ retry_after = int(60 - elapsed)
+ return jsonify({"error": "rate_limited", "retry_after_seconds": retry_after}), 429
+
+ trigger = Trigger.query.get(ex.trigger_id)
+ if not trigger:
+ return jsonify({"error": "trigger_not_found"}), 404
+
+ # Preserve original event_data (and idempotency_key) so the dedup layer
+ # protects against double-execution if the original had already partially run.
+ try:
+ original_event_data = json.loads(ex.event_data) if ex.event_data else {}
+ except (json.JSONDecodeError, TypeError):
+ original_event_data = {}
+
+ new_ex = TriggerExecution(
+ trigger_id=ex.trigger_id,
+ event_data=ex.event_data,
+ idempotency_key=ex.idempotency_key,
+ status="pending",
+ )
+ db.session.add(new_ex)
+
+ # Mark original as replayed and stamp rate-limit timestamp
+ ex.last_replay_at = datetime.now(timezone.utc)
+ ex.status = "replayed"
+
+ try:
+ db.session.commit()
+ except IntegrityError:
+ # idempotency_key already has a successful execution — silent ok
+ db.session.rollback()
+ return jsonify({"status": "ok", "note": "idempotent_skip"}), 200
+
+ new_execution_id = new_ex.id
+ trigger_id_int = trigger.id
+
+ app = current_app._get_current_object()
+
+ def _run():
+ with app.app_context():
+ _execute_trigger(trigger_id_int, new_execution_id, original_event_data)
+
+ threading.Thread(target=_run, daemon=True).start()
+
+ current_app.logger.info(
+ f"evt=trigger_replay original_exec={exec_id} new_exec={new_execution_id}"
+ f" trigger_id={trigger_id_int}"
+ )
+
+ return jsonify({"status": "ok", "new_execution_id": new_execution_id}), 200
+
+
+# ── Stats Endpoint (Step 6 — PR-3 2026-05-11) ─────────────────────────────
+
+
+@bp.route("/api/triggers/stats", methods=["GET"])
+def trigger_stats():
+ """Return operational metrics for the trigger execution pipeline.
+
+ Query param: ?days=N (default 1, max 30).
+ Used by the /triggers UI badge and the watermark CB check.
+
+ Watermark: when wpp_command_count > 50 OR distinct_users > 1 in the window,
+ circuit_breaker_watermark_hit is set to True and a WARNING is logged.
+ """
+ try:
+ days = max(1, min(30, int(request.args.get("days", 1))))
+ except (TypeError, ValueError):
+ days = 1
+
+ # Use raw SQL via SQLAlchemy text for aggregate queries (no ORM overhead)
+ from sqlalchemy import text as _text
+
+ since_clause = f"datetime('now', '-{days} days')"
+
+ # 1. Total executions + by_status breakdown
+ rows = db.session.execute(
+ _text(
+ f"SELECT status, COUNT(*) as cnt FROM trigger_executions "
+ f"WHERE started_at >= {since_clause} GROUP BY status"
+ )
+ ).fetchall()
+ by_status: dict = {}
+ total_executions = 0
+ for row in rows:
+ by_status[row[0]] = row[1]
+ total_executions += row[1]
+
+ # 2. DLQ size: failed_retryable rows (unreplayed — status is still failed_retryable)
+ dlq_size = by_status.get("failed_retryable", 0)
+
+ # 3. Idempotent replays: count rows whose status was set via dedup log
+ # Approximation: TriggerExecutions with status='replayed' created in window
+ # (exact log parsing is fragile; replayed status is precise enough for watermark)
+ idempotent_replays = db.session.execute(
+ _text(
+ f"SELECT COUNT(*) FROM trigger_executions "
+ f"WHERE status = 'replayed' AND started_at >= {since_clause}"
+ )
+ ).scalar() or 0
+
+ # 4. WPP command count: triggers whose source = 'whatsapp' OR slug contains 'wpp'
+ # Also count executions referencing those triggers
+ wpp_trigger_ids = db.session.execute(
+ _text(
+ "SELECT id FROM triggers WHERE source = 'whatsapp' OR slug LIKE 'wpp%' OR name LIKE 'wpp%' OR name LIKE 'WhatsApp%'"
+ )
+ ).fetchall()
+ wpp_ids = tuple(r[0] for r in wpp_trigger_ids)
+
+ if wpp_ids:
+ # Build IN clause using string interpolation (IDs are integers — safe)
+ id_list = ",".join(str(i) for i in wpp_ids)
+ try:
+ wpp_command_count = db.session.execute(
+ _text(
+ f"SELECT COUNT(*) FROM trigger_executions "
+ f"WHERE trigger_id IN ({id_list}) "
+ f"AND started_at >= {since_clause}"
+ )
+ ).scalar() or 0
+ except Exception:
+ wpp_command_count = 0
+ else:
+ wpp_command_count = 0
+
+ # 5. Distinct users in last 7 days (static 1 for single-user workspace)
+ # When multi-user support arrives this becomes a real query.
+ user_count = 1 # MTA digital is single-operator; update when multi-user lands
+
+ # 6. Retries observed: executions where result_summary or error contains retry evidence
+ # (Step 3 backoff logs "attempts" in the summary)
+ retries_observed = db.session.execute(
+ _text(
+ f"SELECT COUNT(*) FROM trigger_executions "
+ f"WHERE (result_summary LIKE '%\"attempts\"%' OR error LIKE '%attempts%') "
+ f"AND started_at >= {since_clause}"
+ )
+ ).scalar() or 0
+
+ # 7. Watermark check
+ watermark_hit = wpp_command_count > 50 or user_count > 1
+ if watermark_hit:
+ current_app.logger.warning(
+ f"evt=circuit_breaker_watermark_hit wpp_command_count={wpp_command_count}"
+ f" user_count={user_count} window_days={days}"
+ " — review Circuit Breaker (see [C]adr-retry-pattern.md)"
+ )
+
+ return jsonify({
+ "window_days": days,
+ "total_executions": total_executions,
+ "by_status": by_status,
+ "retries_observed": retries_observed,
+ "idempotent_replays": idempotent_replays,
+ "dlq_size": dlq_size,
+ "wpp_command_count": wpp_command_count,
+ "circuit_breaker_watermark_hit": watermark_hit,
+ }), 200
+
+
# ── Webhook Validation & Parsing ───────────────────────────────────────────
@@ -562,17 +824,42 @@ def _execute_trigger(trigger_id: int, execution_id: int, event_data: dict):
else:
raise ValueError(f"Unknown action_type: {trigger.action_type}")
- execution.status = "completed" if result.get("success") else "failed"
+ # --- Step 4: classify subprocess result (PR-3 2026-05-11) ---
+ if result.get("success"):
+ execution.status = "completed"
+ execution.error_category = None
+ else:
+ stderr = (result.get("stderr") or "")[:2000]
+ category = _classify_error(stderr, None)
+ execution.status = "failed_retryable" if category == "transient" else "failed"
+ execution.error_category = category
+ execution.error = stderr
execution.result_summary = (result.get("stdout", "") or "")[:5000]
- if not result.get("success"):
- execution.error = (result.get("stderr", "") or "")[:2000]
+ current_app.logger.info(
+ f"evt=trigger_execute trigger_id={trigger_id} exec_id={execution_id}"
+ f" status={execution.status} category={execution.error_category}"
+ )
+ # --- End Step 4 result classification ---
except subprocess.TimeoutExpired:
- execution.status = "failed"
+ # Transient: timeout is retriable (infrastructure issue, not logic failure)
+ execution.status = "failed_retryable"
execution.error = "Timeout (11 min)"
+ execution.error_category = "transient"
+ current_app.logger.warning(
+ f"evt=trigger_execute trigger_id={trigger_id} exec_id={execution_id}"
+ f" status=failed_retryable category=transient reason=TimeoutExpired"
+ )
except Exception as e:
- execution.status = "failed"
- execution.error = str(e)[:2000]
+ err = str(e)[:2000]
+ category = _classify_error(err, e)
+ execution.status = "failed_retryable" if category == "transient" else "failed"
+ execution.error = err
+ execution.error_category = category
+ current_app.logger.warning(
+ f"evt=trigger_execute trigger_id={trigger_id} exec_id={execution_id}"
+ f" status={execution.status} category={category} error={err[:200]!r}"
+ )
end_time = datetime.now(timezone.utc)
execution.duration_seconds = (end_time - start_time).total_seconds()
diff --git a/dashboard/frontend/src/pages/Triggers.tsx b/dashboard/frontend/src/pages/Triggers.tsx
index 8e97f977..f0c5aafc 100644
--- a/dashboard/frontend/src/pages/Triggers.tsx
+++ b/dashboard/frontend/src/pages/Triggers.tsx
@@ -1,7 +1,7 @@
import { useEffect, useState } from 'react'
import { useToast } from '../components/Toast'
import { useConfirm } from '../components/ConfirmDialog'
-import { Plus, Pencil, Trash2, X, Play, Copy, RefreshCw, KeyRound } from 'lucide-react'
+import { Plus, Pencil, Trash2, X, Play, Copy, RefreshCw, KeyRound, RotateCcw, AlertTriangle } from 'lucide-react'
import { api } from '../lib/api'
import { useAuth } from '../context/AuthContext'
@@ -29,9 +29,30 @@ interface Execution {
status: string
result_summary: string | null
error: string | null
+ error_category: string | null
duration_seconds: number | null
started_at: string
completed_at: string | null
+ idempotency_key: string | null
+ last_replay_at: string | null
+}
+
+interface Stats {
+ window_days: number
+ total_executions: number
+ by_status: Record
+ retries_observed: number
+ idempotent_replays: number
+ dlq_size: number
+ wpp_command_count: number
+ circuit_breaker_watermark_hit: boolean
+}
+
+interface ReplayPreview {
+ execId: number
+ recipient: string
+ command: string
+ timestamp: string
}
const SOURCES = ['github', 'stripe', 'linear', 'telegram', 'discord', 'custom'] as const
@@ -55,6 +76,8 @@ const STATUS_COLORS: Record = {
running: 'bg-blue-500/10 text-blue-400',
completed: 'bg-green-500/10 text-green-400',
failed: 'bg-red-500/10 text-red-400',
+ failed_retryable: 'bg-orange-500/10 text-orange-400',
+ replayed: 'bg-purple-500/10 text-purple-400',
}
const emptyForm = {
@@ -79,6 +102,11 @@ export default function Triggers() {
const [executions, setExecutions] = useState([])
const [execLoading, setExecLoading] = useState(false)
const [newSecret, setNewSecret] = useState<{ id: number; secret: string } | null>(null)
+ // Replay modal state (Step 5 — PR-3)
+ const [replayPreview, setReplayPreview] = useState(null)
+ const [replaying, setReplaying] = useState(false)
+ // Stats (Step 6 — PR-3)
+ const [stats, setStats] = useState(null)
const fetchTriggers = () => {
let url = '/triggers'
@@ -105,6 +133,9 @@ export default function Triggers() {
.then((data: { triggers: TriggerItem[] }) => setTriggers(data.triggers || []))
.catch(() => setTriggers([]))
.finally(() => setLoading(false))
+
+ // Load operational stats badge (Step 6 — PR-3)
+ api.get('/triggers/stats?days=1').then((d: Stats) => setStats(d)).catch(() => {})
}, [filter])
const openCreate = () => {
@@ -197,6 +228,58 @@ export default function Triggers() {
setExecutions([])
}
setExecLoading(false)
+ // Refresh stats badge whenever executions modal opens
+ api.get('/triggers/stats?days=1').then((d: Stats) => setStats(d)).catch(() => {})
+ }
+
+ /** Build replay preview from execution event_data and open the confirmation modal. */
+ const openReplayPreview = (ex: Execution) => {
+ const d = ex.event_data as Record
+ const dataObj = (d?.data as Record) || {}
+ const keyObj = (dataObj?.key as Record) || {}
+ // Try WPP paths first, then fall back to a generic summary
+ const recipient =
+ (keyObj?.remoteJid as string) ||
+ (dataObj?.remoteJid as string) ||
+ (d?.phone as string) ||
+ (d?.from as string) ||
+ '—'
+ const msgObj = (dataObj?.message as Record) || {}
+ const command =
+ (msgObj?.conversation as string) ||
+ (msgObj?.extendedTextMessage as Record)?.text as string ||
+ (d?.command as string) ||
+ (d?.text as string) ||
+ JSON.stringify(d).slice(0, 120)
+ setReplayPreview({
+ execId: ex.id,
+ recipient,
+ command: String(command || '—'),
+ timestamp: ex.started_at,
+ })
+ }
+
+ const confirmReplay = async () => {
+ if (!replayPreview) return
+ setReplaying(true)
+ try {
+ const result = await api.post(`/triggers/executions/${replayPreview.execId}/replay`)
+ toast.success(`Replay iniciado — nova execução #${result.new_execution_id}`)
+ setReplayPreview(null)
+ // Refresh executions list
+ if (execModal) {
+ const data = await api.get(`/triggers/${execModal.triggerId}/executions`)
+ setExecutions(data.executions || [])
+ }
+ } catch (e: unknown) {
+ const msg = e instanceof Error ? e.message : String(e)
+ if (msg.includes('rate_limited') || msg.includes('429')) {
+ toast.error('Rate limit: aguarde 60s antes de fazer replay novamente')
+ } else {
+ toast.error('Erro ao fazer replay', msg)
+ }
+ }
+ setReplaying(false)
}
const handleRegenerateSecret = async (id: number) => {
@@ -267,6 +350,27 @@ export default function Triggers() {
)}
+ {/* Stats badge (Step 6 — PR-3) */}
+ {stats && (
+
+
+ DLQ: 0 ? 'text-orange-400' : 'text-[#e6edf3]'}`}>{stats.dlq_size}
+
+
+ Replays hoje: {stats.idempotent_replays}
+
+
+ WPP: {stats.wpp_command_count}/dia
+
+ {stats.circuit_breaker_watermark_hit && (
+
+
+ Volume WPP >50/dia — reavaliar Circuit Breaker (ver ADR)
+
+ )}
+
+ )}
+
{/* Filters */}
{filters.map(f => (
@@ -496,15 +600,21 @@ export default function Triggers() {
Event |
Duration |
Time |
+
Actions |
{executions.map(ex => (
|
-
- {ex.status}
-
+
+
+ {ex.status}
+
+ {ex.error_category && (
+ {ex.error_category}
+ )}
+
|
{(ex.event_data as Record)?._test ? 'test' : (String((ex.event_data as Record)?.event_type || '--'))}
@@ -516,6 +626,19 @@ export default function Triggers() {
|
{ex.started_at ? relativeTime(ex.started_at) : '--'}
|
+
+ {/* Replay button — only for failed_retryable (Step 5 PR-3) */}
+ {ex.status === 'failed_retryable' && (
+
+ )}
+ |
|
))}
@@ -526,6 +649,48 @@ export default function Triggers() {
)}
+ {/* Replay Confirmation Modal (Step 5 — PR-3) */}
+ {replayPreview && (
+ setReplayPreview(null)}>
+
e.stopPropagation()}>
+
+
Confirmar replay #{replayPreview.execId}
+
+
+
+
+ Isso irá refazer a chamada original. Se a execução anterior já chegou a executar parcialmente, o sistema dedupa silenciosamente.
+
+
+
+ Destinatário
+ {replayPreview.recipient}
+
+
+ Comando
+ {replayPreview.command.slice(0, 200)}
+
+
+ Timestamp
+ {replayPreview.timestamp}
+
+
+
+
+
+
+
+
+
+ )}
+
{/* New Secret Modal */}
{newSecret && (
setNewSecret(null)}>
diff --git a/dashboard/tests/test_wpp_retry_pr3.py b/dashboard/tests/test_wpp_retry_pr3.py
new file mode 100644
index 00000000..4e5526e7
--- /dev/null
+++ b/dashboard/tests/test_wpp_retry_pr3.py
@@ -0,0 +1,203 @@
+"""Synthetic tests for WhatsApp retry pattern — PR-3 (Steps 4, 5, 6).
+
+Step 4: _classify_error + failed_retryable classification in _execute_trigger
+Step 5: /replay endpoint — rate-limit, not_found, not_replayable, happy path
+Step 6: /stats endpoint — JSON shape, watermark flag
+"""
+import json
+import subprocess
+import sys
+import os
+import pytest
+
+# Ensure backend is importable
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "backend"))
+
+# ---------------------------------------------------------------------------
+# Step 4 — _classify_error unit tests (pure, no app context needed)
+# ---------------------------------------------------------------------------
+
+from routes.triggers import _classify_error, _TRANSIENT_MARKERS
+
+
+class TestClassifyError:
+ def test_timeout_exception_is_transient(self):
+ exc = subprocess.TimeoutExpired(cmd="x", timeout=11)
+ assert _classify_error("Timeout", exc) == "transient"
+
+ def test_value_error_is_permanent(self):
+ exc = ValueError("missing key 'foo'")
+ assert _classify_error(str(exc), exc) == "permanent"
+
+ def test_file_not_found_is_permanent(self):
+ exc = FileNotFoundError("script not found")
+ assert _classify_error(str(exc), exc) == "permanent"
+
+ def test_http_5xx_in_stderr_is_transient(self):
+ assert _classify_error("HTTP 503 Service Unavailable", None) == "transient"
+
+ def test_http_500_in_stderr_is_transient(self):
+ assert _classify_error("HTTP 500 Internal Server Error", None) == "transient"
+
+ def test_connection_refused_is_transient(self):
+ assert _classify_error("Connection refused", None) == "transient"
+
+ def test_url_error_marker_is_transient(self):
+ assert _classify_error("URLError: ", None) == "transient"
+
+ def test_http_4xx_is_permanent(self):
+ # 4xx markers NOT in _TRANSIENT_MARKERS → permanent
+ assert _classify_error("HTTP 400 Bad Request", None) == "permanent"
+
+ def test_http_404_is_permanent(self):
+ assert _classify_error("HTTP 404 Not Found", None) == "permanent"
+
+ def test_generic_runtime_error_is_permanent(self):
+ assert _classify_error("RuntimeError: unexpected state", None) == "permanent"
+
+ def test_empty_message_defaults_permanent(self):
+ assert _classify_error("", None) == "permanent"
+
+ def test_none_message_defaults_permanent(self):
+ assert _classify_error(None, None) == "permanent" # type: ignore[arg-type]
+
+ def test_all_transient_markers_recognized(self):
+ for marker in _TRANSIENT_MARKERS:
+ assert _classify_error(f"...{marker}...", None) == "transient", \
+ f"Marker '{marker}' should be transient"
+
+
+# ---------------------------------------------------------------------------
+# Step 5 + 6 — Flask app integration tests
+# These tests require the Flask app to be importable without a running server.
+# ---------------------------------------------------------------------------
+
+try:
+ import importlib, types
+ # We need a minimal Flask test client. Import app but skip heavy startup.
+ # The auto-migrate runs against dashboard.db which must exist.
+ # Guard: only run integration tests when DB exists.
+ _DB_EXISTS = os.path.exists(
+ os.path.join(os.path.dirname(__file__), "..", "..", "dashboard.db")
+ )
+except Exception:
+ _DB_EXISTS = False
+
+
+@pytest.mark.skipif(not _DB_EXISTS, reason="dashboard.db not found — integration tests skipped")
+class TestReplayEndpoint:
+ """Integration tests for POST /api/triggers/executions//replay."""
+
+ @pytest.fixture(scope="class")
+ def client(self):
+ """Return a Flask test client with a seeded failed_retryable execution."""
+ # Minimal app import — auto-migrate runs on import
+ import app as flask_app_module
+ flask_app = flask_app_module.app
+ flask_app.config["TESTING"] = True
+ flask_app.config["WTF_CSRF_ENABLED"] = False
+ with flask_app.test_client() as c:
+ yield c, flask_app
+
+ def _seed_execution(self, app, status="failed_retryable", last_replay_at=None):
+ """Insert a TriggerExecution row and return its id."""
+ from models import db, TriggerExecution, Trigger
+ with app.app_context():
+ # Find or create a trigger
+ t = Trigger.query.first()
+ if t is None:
+ pytest.skip("No trigger in DB — seed one manually first")
+ ex = TriggerExecution(
+ trigger_id=t.id,
+ event_data=json.dumps({"event_type": "test", "data": {"key": {"remoteJid": "+5511999999999"}, "message": {"conversation": "/briefing"}}}),
+ status=status,
+ last_replay_at=last_replay_at,
+ )
+ db.session.add(ex)
+ db.session.commit()
+ return ex.id, t.id
+
+ def test_replay_requires_auth(self, client):
+ c, _ = client
+ # Without session, Flask-Login returns 401 (Unauthorized) or 403 (Forbidden)
+ resp = c.post("/api/triggers/executions/999999/replay")
+ assert resp.status_code in (401, 403, 404)
+
+ def test_stats_returns_json_shape(self, client):
+ c, _ = client
+ resp = c.get("/api/triggers/stats?days=1")
+ # Without auth some setups return 200 (public route, no login_required) or 403
+ if resp.status_code == 200:
+ data = resp.get_json()
+ required_keys = {
+ "window_days", "total_executions", "by_status",
+ "retries_observed", "idempotent_replays",
+ "dlq_size", "wpp_command_count", "circuit_breaker_watermark_hit",
+ }
+ assert required_keys.issubset(data.keys()), f"Missing keys: {required_keys - data.keys()}"
+ assert isinstance(data["circuit_breaker_watermark_hit"], bool)
+ assert isinstance(data["by_status"], dict)
+ assert data["window_days"] == 1
+
+
+# ---------------------------------------------------------------------------
+# Step 6 — Watermark logic unit test (no DB needed)
+# ---------------------------------------------------------------------------
+
+class TestWatermarkLogic:
+ """The watermark formula: wpp_command_count > 50 OR user_count > 1."""
+
+ def _check(self, wpp_count: int, user_count: int) -> bool:
+ return wpp_count > 50 or user_count > 1
+
+ def test_below_threshold_no_hit(self):
+ assert self._check(49, 1) is False
+
+ def test_exactly_50_no_hit(self):
+ assert self._check(50, 1) is False
+
+ def test_51_hits_watermark(self):
+ assert self._check(51, 1) is True
+
+ def test_multiple_users_hits_watermark(self):
+ assert self._check(0, 2) is True
+
+ def test_both_conditions_hit(self):
+ assert self._check(100, 3) is True
+
+
+# ---------------------------------------------------------------------------
+# Step 5 — Rate-limit logic unit test (no DB needed)
+# ---------------------------------------------------------------------------
+
+class TestRateLimitLogic:
+ """Verify the 60s rate-limit formula (elapsed < 60 → rate-limited)."""
+
+ def _is_rate_limited(self, last_replay_at, now, threshold_seconds=60):
+ if last_replay_at is None:
+ return False
+ elapsed = (now - last_replay_at).total_seconds()
+ return elapsed < threshold_seconds
+
+ def test_no_previous_replay_not_limited(self):
+ from datetime import datetime, timezone
+ now = datetime.now(timezone.utc)
+ assert self._is_rate_limited(None, now) is False
+
+ def test_replay_59s_ago_is_limited(self):
+ from datetime import datetime, timezone, timedelta
+ now = datetime.now(timezone.utc)
+ last = now - timedelta(seconds=59)
+ assert self._is_rate_limited(last, now) is True
+
+ def test_replay_60s_ago_is_not_limited(self):
+ from datetime import datetime, timezone, timedelta
+ now = datetime.now(timezone.utc)
+ last = now - timedelta(seconds=60)
+ assert self._is_rate_limited(last, now) is False
+
+ def test_replay_61s_ago_is_not_limited(self):
+ from datetime import datetime, timezone, timedelta
+ now = datetime.now(timezone.utc)
+ last = now - timedelta(seconds=61)
+ assert self._is_rate_limited(last, now) is False
diff --git a/tests/whatsapp/__init__.py b/tests/whatsapp/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/whatsapp/test_retry_backoff.py b/tests/whatsapp/test_retry_backoff.py
new file mode 100644
index 00000000..07c0f278
--- /dev/null
+++ b/tests/whatsapp/test_retry_backoff.py
@@ -0,0 +1,289 @@
+"""Synthetic tests for PR-2: exponential backoff + jitter in send_whatsapp / api_request.
+
+Coverage (acceptance criteria from Step 3 of plan-retry-pattern.md):
+ 1. HTTP 500 x3 → 3 attempts, returns False, category=transient
+ 2. HTTP 502 x2 then 200 → 3 attempts, returns True
+ 3. HTTP 400 → 1 attempt only (no retry), returns False, category=permanent
+ 4. URLError x3 → 3 attempts, returns False, category=transient
+ 5. Worst-case latency: 3 attempts (all 5xx) <= 8s total sleep budget
+ 6. api_request: HTTP 500 x3 → retries then raises
+ 7. api_request: HTTP 400 → raises immediately (1 attempt)
+ 8. api_request: URLError x3 → retries then raises
+
+Run with: python3 -m unittest tests/whatsapp/test_retry_backoff.py -v
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+import time
+import unittest
+import urllib.error
+import urllib.request
+from io import BytesIO
+from pathlib import Path
+from unittest.mock import MagicMock, patch, call
+
+REPO_ROOT = Path(__file__).resolve().parents[2]
+sys.path.insert(0, str(REPO_ROOT / ".claude" / "skills" / "int-evolution-go" / "scripts"))
+
+# runner.py uses `X | Y` union syntax (Python 3.10+) in some type hints, so we
+# cannot import the entire module on Python 3.9. We extract and exec only the
+# helper function source so the backoff logic can be tested in isolation.
+def _load_runner_helper():
+ """Extract _retry_http_call from runner.py without importing the full module."""
+ import ast
+ import random as _random
+ import time as _time
+ import urllib.error as _urllib_error
+ import socket as _socket
+ from rich.console import Console
+ from rich.theme import Theme
+
+ theme = Theme({"info": "cyan", "success": "bold green", "warning": "yellow",
+ "error": "bold red", "step": "bold blue", "dim": "dim white"})
+ console = Console(theme=theme)
+
+ src = (REPO_ROOT / "ADWs" / "runner.py").read_text()
+ # Find the _retry_http_call function in source via text markers
+ start = src.index("def _retry_http_call(")
+ end = src.index("\ndef send_whatsapp(")
+ fn_src = src[start:end]
+
+ ns = {
+ "random": _random,
+ "time": _time,
+ "urllib": sys.modules["urllib"],
+ "socket": _socket,
+ "console": console,
+ }
+ exec(compile(fn_src, "", "exec"), ns)
+ return ns["_retry_http_call"], ns
+
+
+def _make_http_response(status: int, body: bytes = b"{}") -> MagicMock:
+ """Build a mock context manager mimicking urllib response."""
+ resp = MagicMock()
+ resp.status = status
+ resp.read.return_value = body
+ resp.__enter__ = lambda s: s
+ resp.__exit__ = MagicMock(return_value=False)
+ return resp
+
+
+def _http_error(code: int) -> urllib.error.HTTPError:
+ return urllib.error.HTTPError(
+ url="http://test",
+ code=code,
+ msg=f"HTTP {code}",
+ hdrs=None,
+ fp=BytesIO(b"{}"),
+ )
+
+
+class TestSendWhatsappRetry(unittest.TestCase):
+ """Tests for _retry_http_call extracted from ADWs/runner.py.
+
+ runner.py uses Python 3.10+ union type hints elsewhere, so we extract
+ only the helper function via source slicing and exec it in isolation.
+ """
+
+ def setUp(self):
+ self._retry_http_call, self._ns = _load_runner_helper()
+
+ def test_http_500_retries_3_times_returns_false(self):
+ """HTTP 500 x3 → 3 attempts, final result False, category=transient."""
+ call_count = 0
+
+ def _do_call():
+ nonlocal call_count
+ call_count += 1
+ raise _http_error(500)
+
+ sleep_mock = MagicMock()
+ self._ns["time"] = MagicMock(sleep=sleep_mock)
+ ok, attempts, category = self._retry_http_call(_do_call, max_attempts=3, base_delay=2.0)
+
+ self.assertFalse(ok)
+ self.assertEqual(attempts, 3)
+ self.assertEqual(category, "transient")
+ self.assertEqual(call_count, 3)
+ self.assertEqual(sleep_mock.call_count, 2) # sleep between attempt 1→2 and 2→3
+
+ def test_http_502_twice_then_200_returns_true(self):
+ """HTTP 502 x2 then 200 → 3 attempts, returns True."""
+ call_count = 0
+
+ def _do_call():
+ nonlocal call_count
+ call_count += 1
+ if call_count < 3:
+ raise _http_error(502)
+ return True
+
+ self._ns["time"] = MagicMock(sleep=MagicMock())
+ ok, attempts, category = self._retry_http_call(_do_call, max_attempts=3, base_delay=2.0)
+
+ self.assertTrue(ok)
+ self.assertEqual(attempts, 3)
+ self.assertIsNone(category)
+ self.assertEqual(call_count, 3)
+
+ def test_http_400_no_retry_returns_false(self):
+ """HTTP 400 → 1 attempt, no retry, category=permanent."""
+ call_count = 0
+
+ def _do_call():
+ nonlocal call_count
+ call_count += 1
+ raise _http_error(400)
+
+ sleep_mock = MagicMock()
+ self._ns["time"] = MagicMock(sleep=sleep_mock)
+ ok, attempts, category = self._retry_http_call(_do_call, max_attempts=3, base_delay=2.0)
+
+ self.assertFalse(ok)
+ self.assertEqual(attempts, 1)
+ self.assertEqual(category, "permanent")
+ self.assertEqual(call_count, 1)
+ sleep_mock.assert_not_called()
+
+ def test_url_error_retries_3_times_returns_false(self):
+ """URLError x3 → 3 attempts, returns False, category=transient."""
+ call_count = 0
+
+ def _do_call():
+ nonlocal call_count
+ call_count += 1
+ raise urllib.error.URLError("Connection refused")
+
+ self._ns["time"] = MagicMock(sleep=MagicMock())
+ ok, attempts, category = self._retry_http_call(_do_call, max_attempts=3, base_delay=2.0)
+
+ self.assertFalse(ok)
+ self.assertEqual(attempts, 3)
+ self.assertEqual(category, "transient")
+ self.assertEqual(call_count, 3)
+
+ def test_worst_case_sleep_budget_within_8s(self):
+ """Verify sleep calls stay within 8s total (worst case, 3 attempts, all 5xx)."""
+ sleep_calls = []
+
+ def _do_call():
+ raise _http_error(500)
+
+ def capturing_sleep(secs):
+ sleep_calls.append(secs)
+
+ uniform_mock = MagicMock(return_value=0.5)
+ self._ns["time"] = MagicMock(sleep=capturing_sleep)
+ self._ns["random"] = MagicMock(uniform=uniform_mock)
+ # With uniform=0.5 and base_delay=2.0:
+ # attempt 0→1: min(2.0**0 + 0.5, 8) = 1.5s
+ # attempt 1→2: min(2.0**1 + 0.5, 8) = 2.5s
+ # total = 4.0s — well within 8s budget
+ self._retry_http_call(_do_call, max_attempts=3, base_delay=2.0, max_delay=8.0)
+
+ total_sleep = sum(sleep_calls)
+ self.assertLessEqual(total_sleep, 8.0, f"Total sleep {total_sleep:.2f}s exceeds 8s budget")
+ self.assertEqual(len(sleep_calls), 2) # 2 sleeps between 3 attempts
+
+
+class TestApiRequestRetry(unittest.TestCase):
+ """Tests for _retry_http_call_client via api_request in evolution_go_client.py."""
+
+ def _import_client(self):
+ import importlib
+ import evolution_go_client as _client
+ importlib.reload(_client)
+ return _client
+
+ def _patch_get_config(self, client):
+ """Patch get_config to return predictable values."""
+ return patch.object(client, "get_config", return_value=("http://localhost:8080", "test-key"))
+
+ def test_http_500_retries_then_raises(self):
+ """HTTP 500 x3 → retries 3 times, raises HTTPError after exhausted."""
+ _client = self._import_client()
+ call_count = 0
+
+ def _mock_urlopen(req, *args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ raise _http_error(500)
+
+ with self._patch_get_config(_client):
+ with patch("urllib.request.urlopen", side_effect=_mock_urlopen):
+ with patch.object(_client.time, "sleep"):
+ with self.assertRaises(urllib.error.HTTPError) as ctx:
+ _client.api_request("GET", "/instance/status")
+
+ self.assertEqual(ctx.exception.code, 500)
+ self.assertEqual(call_count, 3)
+
+ def test_http_400_raises_immediately_no_retry(self):
+ """HTTP 400 → raises immediately without retry."""
+ _client = self._import_client()
+ call_count = 0
+
+ def _mock_urlopen(req, *args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ raise _http_error(400)
+
+ with self._patch_get_config(_client):
+ with patch("urllib.request.urlopen", side_effect=_mock_urlopen):
+ with patch.object(_client.time, "sleep") as mock_sleep:
+ with self.assertRaises(urllib.error.HTTPError) as ctx:
+ _client.api_request("GET", "/instance/status")
+
+ self.assertEqual(ctx.exception.code, 400)
+ self.assertEqual(call_count, 1)
+ mock_sleep.assert_not_called()
+
+ def test_url_error_retries_then_raises(self):
+ """URLError x3 → retries 3 times, raises URLError after exhausted."""
+ _client = self._import_client()
+ call_count = 0
+
+ def _mock_urlopen(req, *args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ raise urllib.error.URLError("Connection refused")
+
+ with self._patch_get_config(_client):
+ with patch("urllib.request.urlopen", side_effect=_mock_urlopen):
+ with patch.object(_client.time, "sleep"):
+ with self.assertRaises(urllib.error.URLError):
+ _client.api_request("GET", "/instance/status")
+
+ self.assertEqual(call_count, 3)
+
+ def test_success_on_third_attempt_returns_result(self):
+ """HTTP 500 x2 then 200 → returns parsed JSON result."""
+ _client = self._import_client()
+ call_count = 0
+
+ def _mock_urlopen(req, *args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ if call_count < 3:
+ raise _http_error(500)
+ resp = MagicMock()
+ resp.read.return_value = b'{"status": "active"}'
+ resp.__enter__ = lambda s: s
+ resp.__exit__ = MagicMock(return_value=False)
+ return resp
+
+ with self._patch_get_config(_client):
+ with patch("urllib.request.urlopen", side_effect=_mock_urlopen):
+ with patch.object(_client.time, "sleep"):
+ result = _client.api_request("GET", "/instance/status")
+
+ self.assertEqual(result, {"status": "active"})
+ self.assertEqual(call_count, 3)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)