diff --git a/.env.example b/.env.example index df7cc38..6179eb3 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,21 @@ GEMINI_API_KEY= # Gemini 2.0 Flash (recommended, cheapest) # LLM_URL=http://127.0.0.1:8080/v1 # Local LLM (llama.cpp, Ollama) # LLM_MODEL= # Override model name +# Backend selection for auto-apply (APPLY_BACKEND) +# Options: claude (default), opencode +# To use OpenCode: set APPLY_BACKEND=opencode and register MCPs +# APPLY_BACKEND=opencode + +# Backend-specific defaults for apply command (optional) +# APPLY_CLAUDE_MODEL= # Claude backend default model (default: haiku) +# APPLY_OPENCODE_MODEL= # OpenCode backend default model (fallback: LLM_MODEL or gpt-4o-mini) +# APPLY_OPENCODE_AGENT= # OpenCode --agent value (optional) + +# OpenCode MCP baseline: +# Ensure these MCP servers exist in `opencode mcp list` before applying: +# - playwright +# - gmail + # Auto-Apply (optional) CAPSOLVER_API_KEY= # For CAPTCHA solving during auto-apply diff --git a/README.md b/README.md index e7fe08e..e4a7644 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ applypilot apply --dry-run # fill forms without submitting ## Two Paths ### Full Pipeline (recommended) -**Requires:** Python 3.11+, Node.js (for npx), Gemini API key (free), Claude Code CLI, Chrome +**Requires:** Python 3.11+, Node.js (for npx), Gemini API key (free), Claude Code CLI or OpenCode CLI, Chrome Runs all 6 stages, from job discovery to autonomous application submission. This is the full power of ApplyPilot. @@ -63,7 +63,7 @@ Runs stages 1-5: discovers jobs, scores them, tailors your resume, generates cov | **3. Score** | AI rates every job 1-10 based on your resume and preferences. Only high-fit jobs proceed | | **4. Tailor** | AI rewrites your resume per job: reorganizes, emphasizes relevant experience, adds keywords. Never fabricates | | **5. Cover Letter** | AI generates a targeted cover letter per job | -| **6. Auto-Apply** | Claude Code navigates application forms, fills fields, uploads documents, answers questions, and submits | +| **6. Auto-Apply** | Orchestrates browser-driven submission using an external backend (Claude or OpenCode). The backend launches a browser, detects the form type, fills personal information and work history, uploads the tailored resume and cover letter, answers screening questions with AI, and submits. | Each stage is independent. Run them all or pick what you need. @@ -90,7 +90,7 @@ Each stage is independent. Run them all or pick what you need. | Node.js 18+ | Auto-apply | Needed for `npx` to run Playwright MCP server | | Gemini API key | Scoring, tailoring, cover letters | Free tier (15 RPM / 1M tokens/day) is enough | | Chrome/Chromium | Auto-apply | Auto-detected on most systems | -| Claude Code CLI | Auto-apply | Install from [claude.ai/code](https://claude.ai/code) | +| Claude Code CLI or OpenCode CLI | Auto-apply | Claude: install from https://claude.ai/code; OpenCode: install from https://opencode.ai and register MCPs | **Gemini API key is free.** Get one at [aistudio.google.com](https://aistudio.google.com). OpenAI and local models (Ollama/llama.cpp) are also supported. @@ -115,7 +115,175 @@ Your personal data in one structured file: contact info, work authorization, com Job search queries, target titles, locations, boards. Run multiple searches with different parameters. ### `.env` -API keys and runtime config: `GEMINI_API_KEY`, `LLM_MODEL`, `CAPSOLVER_API_KEY` (optional). +API keys and runtime config: `GEMINI_API_KEY`, `LLM_MODEL`, `CAPSOLVER_API_KEY` (optional). See Backend and Gateway configuration for details on multi-backend selection and gateway compatibility. + +--- + +## Backend and Gateway configuration (Gemini first, OpenCode backend) + +ApplyPilot supports multiple LLM backends. The baseline-first approach for LLMs is Gemini. For the auto-apply orchestration, the code's runtime default backend is Claude (APPLY_BACKEND unset => "claude"). OpenCode (opencode) is the recommended production path and is supported as an alternative; set APPLY_BACKEND=opencode to use it. Configure your environment carefully and never commit real keys. + +1) Baseline LLM (Gemini) +- Set GEMINI_API_KEY to use Google Gemini for scoring, tailoring, and cover letters. This is the recommended default and is used automatically when present. + +2) Gateway compatibility (9router / OpenAI-compatible gateways) +- If you need a proxy or gateway that speaks the OpenAI-compatible API (for example, 9router, self-hosted gateways, or Ollama with a REST wrapper), set these env vars in your `.env` or runtime environment: + + - LLM_URL: Base URL of your gateway, for example `https://my-9router.example.com/v1` + - LLM_API_KEY: API key for that gateway (keep secret) + - LLM_MODEL: Model name exposed by the gateway, for example `gpt-4o-mini` + +- Example (do not paste real keys): + + export LLM_URL="https://my-9router.example.com/v1" + export LLM_API_KEY="sk-xxxxxxxx" + export LLM_MODEL="gpt-4o-mini" + +3) Backend selection for auto-apply and orchestration +- Use APPLY_BACKEND to select which orchestration backend the system will auto-apply with. Supported values: + - opencode: Use the OpenCode backend and its MCP integrations (recommended) + - claude: Use Claude Code CLI for auto-apply (current code default when APPLY_BACKEND is not set) + +- Backend defaults are configurable: + - `APPLY_CLAUDE_MODEL` (default: `haiku`) + - `APPLY_OPENCODE_MODEL` (fallback: `LLM_MODEL`, then `gpt-4o-mini`) + - `APPLY_OPENCODE_AGENT` (passed as `--agent` to `opencode run`) + + Example (use OpenCode): + + export APPLY_BACKEND=opencode + export APPLY_OPENCODE_MODEL="gh/claude-sonnet-4.5" + export APPLY_OPENCODE_AGENT="coder" + +4) OpenCode MCP prerequisite +- When using the opencode backend you must register the OpenCode MCP provider before first run. Run: + + opencode mcp add my-mcp --provider=openai --url "$LLM_URL" --api-key "$LLM_API_KEY" --model "$LLM_MODEL" + +- Replace the provider and flags according to your MCP. This registers the gateway so OpenCode can reach it at runtime. Note: OpenCode manages MCP servers globally in its own config; you cannot pass an MCP config file per invocation. +- For parity with Claude apply flow, ensure `opencode mcp list` contains both MCP server names: + - `playwright` + - `gmail` + ApplyPilot validates this baseline before running the OpenCode backend. + +5) Claude fallback / code default +- The code default backend when APPLY_BACKEND is not set is `claude`. If you plan to rely on the default behavior or explicitly set APPLY_BACKEND=claude, ensure Claude Code CLI is installed and configured. Claude remains supported as a fallback orchestration backend. + +6) Security and secret handling +- Never add API keys to git. Use a local file outside the repo (for example `~/.applypilot/.env`) or a secret manager. +- Add `.env` or `~/.applypilot/.env` to your `.gitignore`. +- Rotate keys regularly and treat gateway keys like production secrets. +- When sharing examples, replace any keys with `sk-xxxxxxxx` or `GEMINI_API_KEY=xxxxx` placeholders. + +7) 9router example variables +- 9router and similar gateways expect the following env variables for compatibility with ApplyPilot's AI stages: `LLM_URL`, `LLM_API_KEY`, `LLM_MODEL`. Make sure the gateway exposes an OpenAI-compatible v1 completions/chat endpoint. + +8) Verification +- After setting env vars and optionally registering MCPs for opencode, run `applypilot doctor`. It will report configured providers and flag missing MCP registration or missing CLI binaries. If doctor reports issues, follow its guidance. + +### OpenCode Configuration Details + +ApplyPilot uses OpenCode in **project mode** with isolated configuration to prevent conflicts with your personal OpenCode setup: + +**Configuration File:** `~/.applypilot/.opencode/opencode.jsonc` + +This file is where you define: +- The custom `applypilot-apply` agent +- MCP server configurations +- Permission scopes +- Model defaults + +**XDG Directory Isolation:** +ApplyPilot sets `XDG_CONFIG_HOME=~/.applypilot` when running OpenCode, which means: +- OpenCode loads **only** `~/.applypilot/.opencode/opencode.jsonc` +- Your global `~/.config/opencode/` is ignored +- Auth credentials (`~/.opencode/auth.json`) still work (different XDG var) +- State and sessions use default locations + +This isolation ensures ApplyPilot's agent configuration doesn't interfere with your personal OpenCode workflows. + +**Example `~/.applypilot/.opencode/opencode.jsonc`:** +```jsonc +{ + "$schema": "https://opencode.ai/config.json", + "permission": { + "*": "allow", + "question": "deny" + }, + "tools": { + "question": false + }, + "agent": { + "applypilot-apply": { + "description": "Autonomous job application agent", + "mode": "primary", + "model": "github-copilot/gpt-5-mini", + "prompt": "{file:../prompts/apply-agent.md}", + "permission": { + // Safety: deny file editing and bash + "task": "deny", + "edit": "deny", + "write": "deny", + "bash": "deny", + // Read-only tools allowed + "read": "allow", + "grep": "allow", + "glob": "allow", + "lsp": "allow", + // All Playwright browser tools enabled + "playwright_browser_navigate": "allow", + "playwright_browser_click": "allow", + "playwright_browser_fill_form": "allow", + "playwright_browser_snapshot": "allow", + "playwright_browser_evaluate": "allow", + "playwright_browser_file_upload": "allow", + "playwright_browser_tabs": "allow", + "playwright_browser_wait_for": "allow", + "playwright_browser_screenshot": "allow", + // All Gmail tools enabled + "gmail_search_emails": "allow", + "gmail_read_email": "allow", + "gmail_send_email": "allow", + "gmail_create_draft": "allow" + } + } + }, + "mcp": { + "playwright": { + "type": "local", + "enabled": true, + "command": [ + "npx", "@playwright/mcp@latest", + "--cdp-endpoint=http://localhost:9222" + ] + }, + "gmail": { + "type": "local", + "enabled": true, + "command": [ + "npx", "-y", "@gongrzhe/server-gmail-autoauth-mcp" + ] + } + } +} +``` + +**MCP Server Registration:** +```bash +# Register Playwright MCP for browser automation +opencode mcp add playwright --provider=openai --url="$LLM_URL" --api-key="$LLM_API_KEY" + +# Verify registration +opencode mcp list +``` + +**Key Points:** +- The agent name `applypilot-apply` is referenced in the backend code +- The prompt file path is relative to the `~/.applypilot/` directory +- Browser tools are explicitly allowed; file editing is denied for safety +- Model can be overridden via `APPLY_OPENCODE_MODEL` environment variable + +--- ### Package configs (shipped with ApplyPilot) - `config/employers.yaml` - Workday employer registry (48 preconfigured) @@ -142,9 +310,14 @@ Generates a custom resume per job: reorders experience, emphasizes relevant skil Writes a targeted cover letter per job referencing the specific company, role, and how your experience maps to their requirements. ### Auto-Apply -Claude Code launches a Chrome instance, navigates to each application page, detects the form type, fills personal information and work history, uploads the tailored resume and cover letter, answers screening questions with AI, and submits. A live dashboard shows progress in real-time. +Auto-apply is implemented via a pluggable backend with two supported options: + +- **Claude**: uses the Claude Code CLI (default when APPLY_BACKEND is unset) +- **OpenCode**: uses the OpenCode CLI with pre-configured MCP servers + +Both backends perform the same high-level tasks: launch a browser, detect form types, fill personal details, upload tailored documents, answer screening questions, and submit applications. A live dashboard shows progress in real-time. -The Playwright MCP server is configured automatically at runtime per worker. No manual MCP setup needed. +Choose your backend by setting APPLY_BACKEND=claude or APPLY_BACKEND=opencode. Each requires the respective CLI to be installed and configured. ```bash # Utility modes (no Chrome/Claude needed) diff --git a/src/applypilot/apply/backends.py b/src/applypilot/apply/backends.py new file mode 100644 index 0000000..f868c8e --- /dev/null +++ b/src/applypilot/apply/backends.py @@ -0,0 +1,1145 @@ +"""Agent backend abstraction for auto-apply launcher. + +This module provides a strategy/adapter pattern for switching between +different agent backends (Claude CLI, OpenCode, etc.) while preserving +the existing output parsing and status taxonomy. + +@file backends.py +@description Backend abstraction layer for agent execution. + Integrates with launcher.py to enable multi-backend support. +""" + +from __future__ import annotations + +import json +import logging +import os +import re +import subprocess +from abc import ABC, abstractmethod +from pathlib import Path +from typing import TYPE_CHECKING, Sequence + +if TYPE_CHECKING: + from typing import Any + +logger = logging.getLogger(__name__) + +# Supported backend identifiers +VALID_BACKENDS: frozenset[str] = frozenset({"claude", "opencode"}) +DEFAULT_BACKEND: str = "claude" +DEFAULT_CLAUDE_MODEL: str = "haiku" +DEFAULT_OPENCODE_MODEL: str = "gpt-4o-mini" +ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-9;]*[A-Za-z]") + + +class BackendError(Exception): + """Raised when backend operations fail or invalid backend is requested.""" + + pass + + +class InvalidBackendError(BackendError): + """Raised when an unsupported backend identifier is provided.""" + + def __init__(self, backend: str, available: frozenset[str]) -> None: + self.backend = backend + self.available = available + super().__init__( + f"Invalid backend '{backend}'. " + f"Supported backends: {', '.join(sorted(available))}. " + f"Set via APPLY_BACKEND environment variable or backend config option." + ) + + +class AgentBackend(ABC): + """Abstract base class for agent execution backends. + + Implementations must provide a run_job method that executes the agent + with the given prompt and configuration, returning status and duration. + """ + + @abstractmethod + def run_job( + self, + job: dict[str, Any], + port: int, + worker_id: int, + model: str, + agent: str | None, + dry_run: bool, + prompt: str, + mcp_config_path: Path, + worker_dir: Path, + required_mcp_servers: Sequence[str] | None = None, + update_callback: Any | None = None, + ) -> tuple[str, int]: + """Execute the agent for a single job application. + + Args: + job: Job dictionary with url, title, site, etc. + port: CDP port for browser connection. + worker_id: Numeric worker identifier. + model: Model name for the backend. + dry_run: If True, don't actually submit applications. + prompt: The full agent prompt text. + mcp_config_path: Path to MCP configuration file. + worker_dir: Working directory for the agent. + update_callback: Optional callback for status updates. + + Returns: + Tuple of (status_string, duration_ms). Status is one of: + 'applied', 'expired', 'captcha', 'login_issue', + 'failed:reason', or 'skipped'. + """ + ... + + @property + @abstractmethod + def name(self) -> str: + """Return the backend identifier name.""" + ... + + @abstractmethod + def get_active_proc(self, worker_id: int) -> subprocess.Popen | None: + """Get the active process for a worker (for signal handling). + + Args: + worker_id: Numeric worker identifier. + + Returns: + The active subprocess.Popen instance, or None if no process is active. + """ + ... + + +class ClaudeBackend(AgentBackend): + """Claude Code CLI backend implementation. + + Spawns Claude Code CLI subprocess with Playwright MCP integration. + Parses stream-json output to extract result status and token usage. + """ + + def __init__(self) -> None: + self._active_procs: dict[int, subprocess.Popen] = {} + + @property + def name(self) -> str: + return "claude" + + def _build_command( + self, + model: str, + mcp_config_path: Path, + ) -> list[str]: + """Build the Claude CLI command arguments.""" + return [ + "claude", + "--model", + model, + "-p", + "--mcp-config", + str(mcp_config_path), + "--permission-mode", + "bypassPermissions", + "--no-session-persistence", + "--disallowedTools", + ( + "mcp__gmail__draft_email,mcp__gmail__modify_email," + "mcp__gmail__delete_email,mcp__gmail__download_attachment," + "mcp__gmail__batch_modify_emails,mcp__gmail__batch_delete_emails," + "mcp__gmail__create_label,mcp__gmail__update_label," + "mcp__gmail__delete_label,mcp__gmail__get_or_create_label," + "mcp__gmail__list_email_labels,mcp__gmail__create_filter," + "mcp__gmail__list_filters,mcp__gmail__get_filter," + "mcp__gmail__delete_filter" + ), + "--output-format", + "stream-json", + "--verbose", + "-", + ] + + def _prepare_environment(self) -> dict[str, str]: + """Prepare clean environment for Claude process.""" + env = os.environ.copy() + env.pop("CLAUDECODE", None) + env.pop("CLAUDE_CODE_ENTRYPOINT", None) + return env + + def run_job( + self, + job: dict[str, Any], + port: int, + worker_id: int, + model: str, + agent: str | None, + dry_run: bool, + prompt: str, + mcp_config_path: Path, + worker_dir: Path, + required_mcp_servers: Sequence[str] | None = None, + update_callback: Any | None = None, + ) -> tuple[str, int]: + """Execute Claude Code for a single job application. + + This implementation preserves the existing launcher.py behavior + for Claude CLI execution, including output parsing and status + taxonomy (APPLIED, EXPIRED, CAPTCHA, LOGIN_ISSUE, FAILED). + """ + import re + import time + from datetime import datetime, timedelta + + from applypilot import config + from applypilot.apply.dashboard import add_event, get_state, update_state + + cmd = self._build_command(model, mcp_config_path) + env = self._prepare_environment() + + update_state( + worker_id, + status="applying", + job_title=job["title"], + company=job.get("site", ""), + score=job.get("fit_score", 0), + start_time=time.time(), + actions=0, + last_action="starting", + ) + add_event(f"[W{worker_id}] Starting: {job['title'][:40]} @ {job.get('site', '')}") + + worker_log = config.LOG_DIR / f"worker-{worker_id}.log" + + # Set up JSONL audit file + audit_path = config.LOG_DIR / f"worker-{worker_id}.events.jsonl" + audit_file = open(audit_path, "a", encoding="utf-8") + + # Clean up old audit files (30-day retention) + cutoff = datetime.utcnow() - timedelta(days=30) + for old_file in config.LOG_DIR.glob("worker-*.events.jsonl"): + try: + if datetime.utcfromtimestamp(old_file.stat().st_mtime) < cutoff: + old_file.unlink() + except Exception: + pass + + ts_header = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_header = ( + f"\n{'=' * 60}\n" + f"[{ts_header}] {job['title']} @ {job.get('site', '')}\n" + f"URL: {job.get('application_url') or job['url']}\n" + f"Score: {job.get('fit_score', 'N/A')}/10\n" + f"{'=' * 60}\n" + ) + + start = time.time() + stats: dict = {} + proc = None + + try: + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + errors="replace", + env=env, + cwd=str(worker_dir), + ) + self._active_procs[worker_id] = proc + if proc.stdin is None or proc.stdout is None: + raise BackendError("Claude backend process streams unavailable") + stdin = proc.stdin + stdout = proc.stdout + + stdin.write(prompt) + stdin.close() + + text_parts: list[str] = [] + with open(worker_log, "a", encoding="utf-8") as lf: + lf.write(log_header) + + # Instrumentation for timing analysis + last_msg_time = None + tool_start_time = None + current_tool = None + turn_count = 0 + perf_log_path = config.LOG_DIR / f"worker-{worker_id}.perf.jsonl" + perf_file = open(perf_log_path, "a", encoding="utf-8") + + for line in stdout: + line = line.strip() + if not line: + continue + + msg_received_time = time.time() + + # Calculate gap from last message + if last_msg_time: + gap_ms = (msg_received_time - last_msg_time) * 1000 + else: + gap_ms = 0 + + try: + msg = json.loads(line) + msg_type = msg.get("type") + + # Log message timing + if gap_ms > 100: # Only log significant gaps (>100ms) + perf_entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": "message_received", + "msg_type": msg_type, + "gap_ms": round(gap_ms, 2), + "turn": turn_count, + } + perf_file.write(json.dumps(perf_entry) + "\n") + perf_file.flush() + + # Track step start + if msg_type == "step_start": + turn_count += 1 + step_start_time = time.time() + if turn_count == 1: + add_event(f"[W{worker_id}] ✓ Agent active - starting application...") + perf_entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": "llm_turn_start", + "turn": turn_count, + } + perf_file.write(json.dumps(perf_entry) + "\n") + perf_file.flush() + + # Log if this was a long wait + if gap_ms > 5000: # >5 seconds + lf.write(f"\n[PERF] Long gap before turn {turn_count}: {gap_ms/1000:.1f}s (likely API latency + LLM thinking)\n") + lf.flush() + + # Track tool execution + if msg_type == "tool_use": + tool_start_time = time.time() + # Extract tool name (simplified) + part = msg.get("part", {}) + current_tool = part.get("tool", part.get("name", "unknown")) + if msg_type == "assistant": + for block in msg.get("message", {}).get("content", []): + bt = block.get("type") + if bt == "text": + text_parts.append(block["text"]) + lf.write(block["text"] + "\n") + elif bt == "tool_use": + name = ( + block.get("name", "") + .replace("mcp__playwright__", "") + .replace("mcp__gmail__", "gmail:") + ) + inp = block.get("input", {}) + if "url" in inp: + desc = f"{name} {inp['url'][:60]}" + elif "ref" in inp: + desc = f"{name} {inp.get('element', inp.get('text', ''))}"[:50] + elif "fields" in inp: + desc = f"{name} ({len(inp['fields'])} fields)" + elif "paths" in inp: + desc = f"{name} upload" + else: + desc = name + + # Write enhanced log with timestamp + ts = datetime.utcnow().isoformat() + lf.write(f"{ts} >> {desc}\n") + + # Add dashboard event + add_event(f"[W{worker_id}] {desc[:80]}") + + # Update state + ws = get_state(worker_id) + cur_actions = ws.actions if ws else 0 + update_state(worker_id, actions=cur_actions + 1, last_action=desc[:35]) + + # Write JSONL audit entry + audit_entry = { + "timestamp": ts, + "worker_id": worker_id, + "type": "tool_use", + "tool": name, + "input": inp, # Full input payload + "tab_id": inp.get("tab_id"), + "url": inp.get("url"), + } + audit_file.write(json.dumps(audit_entry, ensure_ascii=False) + "\n") + audit_file.flush() + elif msg_type == "result": + stats = { + "input_tokens": msg.get("usage", {}).get("input_tokens", 0), + "output_tokens": msg.get("usage", {}).get("output_tokens", 0), + "cache_read": msg.get("usage", {}).get("cache_read_input_tokens", 0), + "cache_create": msg.get("usage", {}).get("cache_creation_input_tokens", 0), + "cost_usd": msg.get("total_cost_usd", 0), + "turns": msg.get("num_turns", 0), + } + text_parts.append(msg.get("result", "")) + except json.JSONDecodeError: + text_parts.append(line) + lf.write(line + "\n") + + proc.wait(timeout=300) + returncode = proc.returncode + proc = None + + if returncode and returncode < 0: + return "skipped", int((time.time() - start) * 1000) + + output = "\n".join(text_parts) + elapsed = int(time.time() - start) + duration_ms = int((time.time() - start) * 1000) + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + job_log = config.LOG_DIR / f"claude_{ts}_w{worker_id}_{job.get('site', 'unknown')[:20]}.txt" + job_log.write_text(output, encoding="utf-8") + + if stats: + cost = stats.get("cost_usd", 0) + ws = get_state(worker_id) + prev_cost = ws.total_cost if ws else 0.0 + update_state(worker_id, total_cost=prev_cost + cost) + + def _clean_reason(s: str) -> str: + return re.sub(r'[*`"]+$', "", s).strip() + + for result_status in ["APPLIED", "EXPIRED", "CAPTCHA", "LOGIN_ISSUE"]: + if f"RESULT:{result_status}" in output: + add_event(f"[W{worker_id}] {result_status} ({elapsed}s): {job['title'][:30]}") + update_state(worker_id, status=result_status.lower(), last_action=f"{result_status} ({elapsed}s)") + return result_status.lower(), duration_ms + + if "RESULT:FAILED" in output: + for out_line in output.split("\n"): + if "RESULT:FAILED" in out_line: + reason = ( + out_line.split("RESULT:FAILED:")[-1].strip() + if ":" in out_line[out_line.index("FAILED") + 6 :] + else "unknown" + ) + reason = _clean_reason(reason) + PROMOTE_TO_STATUS = {"captcha", "expired", "login_issue"} + if reason in PROMOTE_TO_STATUS: + add_event(f"[W{worker_id}] {reason.upper()} ({elapsed}s): {job['title'][:30]}") + update_state(worker_id, status=reason, last_action=f"{reason.upper()} ({elapsed}s)") + return reason, duration_ms + add_event(f"[W{worker_id}] FAILED ({elapsed}s): {reason[:30]}") + update_state(worker_id, status="failed", last_action=f"FAILED: {reason[:25]}") + return f"failed:{reason}", duration_ms + return "failed:unknown", duration_ms + + add_event(f"[W{worker_id}] NO RESULT ({elapsed}s)") + update_state(worker_id, status="failed", last_action=f"no result ({elapsed}s)") + return "failed:no_result_line", duration_ms + + except subprocess.TimeoutExpired: + duration_ms = int((time.time() - start) * 1000) + elapsed = int(time.time() - start) + add_event(f"[W{worker_id}] TIMEOUT ({elapsed}s)") + update_state(worker_id, status="failed", last_action=f"TIMEOUT ({elapsed}s)") + return "failed:timeout", duration_ms + except Exception as e: + duration_ms = int((time.time() - start) * 1000) + add_event(f"[W{worker_id}] ERROR: {str(e)[:40]}") + update_state(worker_id, status="failed", last_action=f"ERROR: {str(e)[:25]}") + return f"failed:{str(e)[:100]}", duration_ms + finally: + audit_file.close() + self._active_procs.pop(worker_id, None) + if proc is not None and proc.poll() is None: + self._kill_process_tree(proc.pid) + + def _kill_process_tree(self, pid: int) -> None: + """Kill a process and all its children.""" + import signal + + try: + import psutil + + parent = psutil.Process(pid) + for child in parent.children(recursive=True): + child.terminate() + parent.terminate() + _, alive = psutil.wait_procs([parent], timeout=3) + for p in alive: + p.kill() + except ImportError: + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + pass + except ProcessLookupError: + pass + + def get_active_proc(self, worker_id: int) -> subprocess.Popen | None: + """Get the active process for a worker (for signal handling).""" + return self._active_procs.get(worker_id) + + def kill_all(self) -> None: + """Kill all active Claude processes.""" + for worker_id, proc in list(self._active_procs.items()): + if proc.poll() is None: + self._kill_process_tree(proc.pid) + self._active_procs.clear() + + +class OpenCodeBackend(AgentBackend): + """OpenCode CLI backend implementation. + + Spawns OpenCode CLI subprocess in non-interactive 'run' mode with + Playwright MCP integration. Parses JSON event stream to extract + result status and token usage. + + OpenCode MCP servers must be pre-configured (via `opencode mcp add`) + since OpenCode does not accept per-invocation MCP config files. + """ + + def __init__(self, use_global_config: bool | None = None) -> None: + self._active_procs: dict[int, subprocess.Popen] = {} + if use_global_config is None: + use_global_config = os.environ.get("APPLY_OPENCODE_USE_GLOBAL_CONFIG", "").lower() in ("1", "true", "yes") + self._use_global_config = use_global_config + + @property + def name(self) -> str: + return "opencode" + + def _find_binary(self) -> str: + """Locate the opencode binary, raising clear errors if missing.""" + import shutil + # Check default installation location + default_path = Path.home() / ".opencode" / "bin" / "opencode" + if default_path.exists(): + return str(default_path) + raise BackendError( + "OpenCode CLI not found on PATH. " + "Install it from https://opencode.ai or run: " + "curl -fsSL https://opencode.ai/install | bash\n" + "Then configure MCP servers: opencode mcp add playwright" + ) + + def _build_command( + self, + model: str, + worker_dir: Path, + agent: str | None, + prompt: str, + ) -> list[str]: + """Build the OpenCode CLI command arguments. + + Note: OpenCode manages MCP servers via its own config system, + not per-invocation config files like Claude CLI. Playwright MCP + must be pre-configured via `opencode mcp add`. + """ + binary = self._find_binary() + cmd = [ + binary, + "run", + "--format", + "json", + "--dir", + str(worker_dir), + ] + if model: + cmd.extend(["--model", model]) + if agent: + cmd.extend(["--agent", agent]) + # Add --variant for faster reasoning (minimal = fastest, high = most thorough) + variant = os.environ.get("APPLY_OPENCODE_VARIANT", "minimal") + if variant: + cmd.extend(["--variant", variant]) + # OpenCode expects the prompt as positional argument(s) + cmd.append(prompt) + return cmd + + def _list_mcp_servers(self) -> set[str]: + from applypilot import config + binary = self._find_binary() + proc = subprocess.run( + [binary, "mcp", "list"], + capture_output=True, + text=True, + check=False, + env=self._prepare_environment(), + cwd=str(config.APP_DIR), + ) + output = proc.stdout or "" + cleaned = ANSI_ESCAPE_RE.sub("", output) + servers: set[str] = set() + for line in cleaned.splitlines(): + m = re.match( + r"^\s*[●*]?\s*[✓x]?\s*([A-Za-z0-9_-]+)\s+(connected|disconnected|error)\b", + line.strip(), + ) + if m: + servers.add(m.group(1)) + return servers + + def _ensure_required_mcp_servers(self, required: Sequence[str] | None) -> None: + if not required: + return + configured = self._list_mcp_servers() + missing = [name for name in required if name not in configured] + if not missing: + return + raise BackendError( + "OpenCode MCP baseline mismatch. Missing server(s): " + + ", ".join(missing) + + ". Configure matching MCP servers before apply. " + "Expected baseline: " + ", ".join(required) + ". Example: `opencode mcp add -- `" + ) + + def _validate_agent_config(self, agent: str | None) -> None: + """Validate that the agent exists in the resolved opencode config. + + Uses `opencode debug agent ` so the CLI loads and merges all + config files itself — no manual JSONC parsing needed. + + Args: + agent: Agent name to validate. + Raises: + BackendError: If the agent is not found or the CLI check fails. + """ + if not agent: + return + import json + + binary = self._find_binary() + env = self._prepare_environment() + from applypilot import config as ap_config + proc = subprocess.run( + [binary, "debug", "agent", agent], + capture_output=True, + text=True, + check=False, + env=env, + cwd=str(ap_config.APP_DIR), + ) + if proc.returncode != 0: + err = (proc.stdout or proc.stderr or "").strip() + raise BackendError( + f"Agent '{agent}' not found in opencode config. " + f"opencode said: {err}. " + "Make sure the agent is defined in ~/.applypilot/.opencode/opencode.jsonc " + "and named 'applypilot-apply'." + ) + # Parse the returned JSON to surface any prompt file issues early. + # Failures here are non-fatal: the agent exists, run_job will catch issues. + try: + json.loads(ANSI_ESCAPE_RE.sub("", proc.stdout)) + except json.JSONDecodeError: + pass + + + def _prepare_environment(self) -> dict[str, str]: + """Prepare environment for OpenCode process. + + By default, redirects XDG_CONFIG_HOME so opencode only loads the + applypilot project config (~/.applypilot/.opencode/opencode.jsonc) + and ignores the user's global ~/.config/opencode/ directory. + + Set APPLY_OPENCODE_USE_GLOBAL_CONFIG=true to opt into loading the + global opencode config alongside the project config. + """ + env = os.environ.copy() + # Remove OpenCode Desktop environment variables. + # When running inside the desktop app, these vars cause the CLI to + # attach to the existing desktop session instead of running standalone. + # XDG_STATE_HOME is also stripped: the desktop app points it at + # /Library/Application Support/ai.opencode.desktop/ which is wrong + # for standalone CLI usage; letting it fall back to the default + # (~/.local/state/opencode/) is correct. + desktop_vars = [ + "OPENCODE_CLIENT", + "OPENCODE_SERVER_PASSWORD", + "OPENCODE_SERVER_USERNAME", + "OPENCODE", + "OPENCODE_EXPERIMENTAL_FILEWATCHER", + "OPENCODE_EXPERIMENTAL_ICON_DISCOVERY", + "__CFBundleIdentifier", + "XDG_STATE_HOME", + ] + for key in desktop_vars: + env.pop(key, None) + # Ensure PATH includes the opencode binary directory so the CLI is + # always found even when not on the user's normal PATH. + opencode_bin = str(Path.home() / ".opencode" / "bin") + current_path = env.get("PATH", "") + if opencode_bin not in current_path: + env["PATH"] = f"{opencode_bin}:{current_path}" + # Set TERM if absent (needed for proper terminal handling). + if "TERM" not in env or not env["TERM"]: + env["TERM"] = "xterm-256color" + if not self._use_global_config: + # Redirect XDG_CONFIG_HOME so opencode looks for its "global" config + # at ~/.applypilot/opencode/ (which does not exist) instead of + # ~/.config/opencode/. This prevents the user's personal opencode + # config — including plugins like OMO and personal instructions — + # from being loaded. + # + # Why not use "plugin": [] to suppress plugins? opencode's config + # merge function (mergeConfigConcatArrays) CONCATENATES arrays across + # config files — setting an empty array in project config does not + # clear entries from global config. XDG_CONFIG_HOME redirection is the + # only reliable way to prevent global config from loading. + # + # XDG_CONFIG_HOME only affects config file discovery. Auth credentials + # (~/.opencode/auth.json), the session DB (~/.local/share/opencode/), + # and state (~/.local/state/opencode/) use different XDG vars and are + # unaffected by this redirect. + from applypilot import config as ap_config + env["XDG_CONFIG_HOME"] = str(ap_config.APP_DIR) + return env + + def run_job( + self, + job: dict[str, Any], + port: int, + worker_id: int, + model: str, + agent: str | None, + dry_run: bool, + prompt: str, + mcp_config_path: Path, + worker_dir: Path, + required_mcp_servers: Sequence[str] | None = None, + update_callback: Any | None = None, + ) -> tuple[str, int]: + """Execute OpenCode for a single job application. + + Mirrors the status taxonomy of ClaudeBackend: APPLIED, EXPIRED, + CAPTCHA, LOGIN_ISSUE, FAILED. Parses OpenCode's JSON event stream + (step_start, text, tool_use, step_finish) to extract results. + """ + import re + import time + from datetime import datetime, timedelta + + from applypilot import config + from applypilot.apply.dashboard import add_event, get_state, update_state + + self._ensure_required_mcp_servers(required_mcp_servers) + self._validate_agent_config(agent) + cmd = self._build_command(model, worker_dir, agent, prompt) + env = self._prepare_environment() + + update_state( + worker_id, + status="applying", + job_title=job["title"], + company=job.get("site", ""), + score=job.get("fit_score", 0), + start_time=time.time(), + actions=0, + last_action="starting (opencode)", + ) + add_event(f"[W{worker_id}] Starting (opencode): {job['title'][:40]} @ {job.get('site', '')}") + + # Let user know initialization takes time + add_event(f"[W{worker_id}] ⏳ OpenCode initializing (15-20s)...") + + worker_log = config.LOG_DIR / f"worker-{worker_id}.log" + + # Set up JSONL audit file + audit_path = config.LOG_DIR / f"worker-{worker_id}.events.jsonl" + audit_file = open(audit_path, "a", encoding="utf-8") + + # Clean up old audit files (30-day retention) + cutoff = datetime.utcnow() - timedelta(days=30) + for old_file in config.LOG_DIR.glob("worker-*.events.jsonl"): + try: + if datetime.utcfromtimestamp(old_file.stat().st_mtime) < cutoff: + old_file.unlink() + except Exception: + pass + + ts_header = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_header = ( + f"\n{'=' * 60}\n" + f"[{ts_header}] [opencode] {job['title']} @ {job.get('site', '')}\n" + f"URL: {job.get('application_url') or job['url']}\n" + f"Score: {job.get('fit_score', 'N/A')}/10\n" + f"{'=' * 60}\n" + ) + + start = time.time() + stats: dict = {} + proc = None + + try: + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + errors="replace", + env=env, + cwd=str(config.APP_DIR), + ) + self._active_procs[worker_id] = proc + if proc.stdin is None or proc.stdout is None: + raise BackendError("OpenCode backend process streams unavailable") + stdin = proc.stdin + stdout = proc.stdout + + # Prompt passed as positional arg to opencode, not via stdin + stdin.close() + + text_parts: list[str] = [] + with open(worker_log, "a", encoding="utf-8") as lf: + lf.write(log_header) + + # Instrumentation for timing analysis + last_msg_time = None + tool_start_time = None + current_tool = None + turn_count = 0 + perf_log_path = config.LOG_DIR / f"worker-{worker_id}.perf.jsonl" + perf_file = open(perf_log_path, "a", encoding="utf-8") + + for line in stdout: + line = line.strip() + if not line: + continue + + msg_received_time = time.time() + + # Calculate gap from last message + if last_msg_time: + gap_ms = (msg_received_time - last_msg_time) * 1000 + else: + gap_ms = 0 + + try: + msg = json.loads(line) + msg_type = msg.get("type") + + # Log message timing + if gap_ms > 100: # Only log significant gaps (>100ms) + perf_entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": "message_received", + "msg_type": msg_type, + "gap_ms": round(gap_ms, 2), + "turn": turn_count, + } + perf_file.write(json.dumps(perf_entry) + "\n") + perf_file.flush() + + # Track step start + if msg_type == "step_start": + turn_count += 1 + step_start_time = time.time() + if turn_count == 1: + add_event(f"[W{worker_id}] ✓ Agent active - starting application...") + perf_entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": "llm_turn_start", + "turn": turn_count, + } + perf_file.write(json.dumps(perf_entry) + "\n") + perf_file.flush() + + # Log if this was a long wait + if gap_ms > 5000: # >5 seconds + lf.write(f"\n[PERF] Long gap before turn {turn_count}: {gap_ms/1000:.1f}s (likely API latency + LLM thinking)\n") + lf.flush() + + # Track tool execution + if msg_type == "tool_use": + tool_start_time = time.time() + # Extract tool name (simplified) + part = msg.get("part", {}) + current_tool = part.get("tool", part.get("name", "unknown")) + + if msg_type == "text": + text_content = msg.get("part", {}).get("text", "") + if text_content: + text_parts.append(text_content) + lf.write(text_content + "\n") + + elif msg_type == "tool_use": + # Try multiple locations for tool name and input + # OpenCode may output these at top level or in "part" sub-object + name = msg.get("name", "") + if not name: + part = msg.get("part", {}) + name = part.get("name", "") + if not name: + part = msg.get("part", {}) + name = part.get("tool", "") # OpenCode uses 'tool' field + inp = msg.get("input", {}) + if not inp: + part = msg.get("part", {}) + inp = part.get("input", {}) + if not inp: + part = msg.get("part", {}) + state = part.get("state", {}) + inp = state.get("input", {}) # OpenCode nests input in state + # Clean up MCP prefixes + name = name.replace("mcp__playwright__", "").replace("mcp__gmail__", "gmail:") + + # Debug: log structure to help diagnose format issues + if not name: + logger.debug(f"Tool use message has empty name. Keys: {list(msg.keys())}") + if "url" in inp: + desc = f"{name} {inp['url'][:60]}" + elif "ref" in inp: + desc = (f"{name} {inp.get('element', inp.get('text', ''))}")[:50] + elif "fields" in inp: + desc = f"{name} ({len(inp['fields'])} fields)" + elif "paths" in inp: + desc = f"{name} upload" + else: + desc = name + + # Write enhanced log with timestamp + ts = datetime.utcnow().isoformat() + lf.write(f"{ts} >> {desc}\n") + + # Add dashboard event + add_event(f"[W{worker_id}] {desc[:80]}") + + # Update state + ws = get_state(worker_id) + cur_actions = ws.actions if ws else 0 + update_state( + worker_id, + actions=cur_actions + 1, + last_action=desc[:35], + ) + + # Calculate tool timing if we have start time + tool_exec_time = None + if tool_start_time: + tool_exec_time = time.time() - tool_start_time + + # Write JSONL audit entry + audit_entry = { + "timestamp": ts, + "worker_id": worker_id, + "type": "tool_use", + "tool": name, + "input": inp, # Full input payload + "tab_id": inp.get("tab_id"), + "url": inp.get("url"), + "tool_exec_time": round(tool_exec_time, 3) if tool_exec_time else None, + "turn": turn_count, + } + audit_file.write(json.dumps(audit_entry, ensure_ascii=False) + "\n") + audit_file.flush() + + # Log performance data + if tool_exec_time: + perf_entry = { + "timestamp": ts, + "event": "tool_executed", + "tool": name, + "exec_time_ms": round(tool_exec_time * 1000, 2), + "turn": turn_count, + } + perf_file.write(json.dumps(perf_entry) + "\n") + perf_file.flush() + + elif msg_type == "step_finish": + step_end_time = time.time() + part = msg.get("part", {}) + tokens = part.get("tokens", {}) + cache = tokens.get("cache", {}) + + # Log turn completion + perf_entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": "llm_turn_complete", + "turn": turn_count, + "input_tokens": tokens.get("input", 0), + "output_tokens": tokens.get("output", 0), + } + perf_file.write(json.dumps(perf_entry) + "\n") + perf_file.flush() + stats = { + "input_tokens": tokens.get("input", 0), + "output_tokens": tokens.get("output", 0), + "cache_read": cache.get("read", 0), + "cache_create": cache.get("write", 0), + "cost_usd": part.get("cost", 0), + "turns": 1, + } + + except json.JSONDecodeError: + text_parts.append(line) + lf.write(line + "\n") + + proc.wait(timeout=300) + returncode = proc.returncode + proc = None + + if returncode and returncode < 0: + return "skipped", int((time.time() - start) * 1000) + + output = "\n".join(text_parts) + elapsed = int(time.time() - start) + duration_ms = int((time.time() - start) * 1000) + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + job_log = config.LOG_DIR / f"opencode_{ts}_w{worker_id}_{job.get('site', 'unknown')[:20]}.txt" + job_log.write_text(output, encoding="utf-8") + + if stats: + cost = stats.get("cost_usd", 0) + ws = get_state(worker_id) + prev_cost = ws.total_cost if ws else 0.0 + update_state(worker_id, total_cost=prev_cost + cost) + + def _clean_reason(s: str) -> str: + return re.sub(r'[*`"]+$', "", s).strip() + + for result_status in ["APPLIED", "EXPIRED", "CAPTCHA", "LOGIN_ISSUE"]: + if f"RESULT:{result_status}" in output: + add_event(f"[W{worker_id}] {result_status} ({elapsed}s): {job['title'][:30]}") + update_state( + worker_id, + status=result_status.lower(), + last_action=f"{result_status} ({elapsed}s)", + ) + return result_status.lower(), duration_ms + + if "RESULT:FAILED" in output: + for out_line in output.split("\n"): + if "RESULT:FAILED" in out_line: + reason = ( + out_line.split("RESULT:FAILED:")[-1].strip() + if ":" in out_line[out_line.index("FAILED") + 6 :] + else "unknown" + ) + reason = _clean_reason(reason) + PROMOTE_TO_STATUS = {"captcha", "expired", "login_issue"} + if reason in PROMOTE_TO_STATUS: + add_event(f"[W{worker_id}] {reason.upper()} ({elapsed}s): {job['title'][:30]}") + update_state( + worker_id, + status=reason, + last_action=f"{reason.upper()} ({elapsed}s)", + ) + return reason, duration_ms + add_event(f"[W{worker_id}] FAILED ({elapsed}s): {reason[:30]}") + update_state( + worker_id, + status="failed", + last_action=f"FAILED: {reason[:25]}", + ) + return f"failed:{reason}", duration_ms + return "failed:unknown", duration_ms + + add_event(f"[W{worker_id}] NO RESULT ({elapsed}s)") + update_state(worker_id, status="failed", last_action=f"no result ({elapsed}s)") + return "failed:no_result_line", duration_ms + + except BackendError: + # Re-raise backend errors (e.g. missing binary) without wrapping + raise + except subprocess.TimeoutExpired: + duration_ms = int((time.time() - start) * 1000) + elapsed = int(time.time() - start) + add_event(f"[W{worker_id}] TIMEOUT ({elapsed}s)") + update_state(worker_id, status="failed", last_action=f"TIMEOUT ({elapsed}s)") + return "failed:timeout", duration_ms + except Exception as e: + duration_ms = int((time.time() - start) * 1000) + add_event(f"[W{worker_id}] ERROR: {str(e)[:40]}") + update_state( + worker_id, + status="failed", + last_action=f"ERROR: {str(e)[:25]}", + ) + return f"failed:{str(e)[:100]}", duration_ms + finally: + audit_file.close() + self._active_procs.pop(worker_id, None) + if proc is not None and proc.poll() is None: + self._kill_process_tree(proc.pid) + + def _kill_process_tree(self, pid: int) -> None: + """Kill a process and all its children.""" + import signal + + try: + import psutil + + parent = psutil.Process(pid) + for child in parent.children(recursive=True): + child.terminate() + parent.terminate() + _, alive = psutil.wait_procs([parent], timeout=3) + for p in alive: + p.kill() + except ImportError: + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + pass + except ProcessLookupError: + pass + + def get_active_proc(self, worker_id: int) -> subprocess.Popen | None: + """Get the active process for a worker (for signal handling).""" + return self._active_procs.get(worker_id) + + def kill_all(self) -> None: + """Kill all active OpenCode processes.""" + for worker_id, proc in list(self._active_procs.items()): + if proc.poll() is None: + self._kill_process_tree(proc.pid) + self._active_procs.clear() + + +def get_backend(backend_name: str | None = None) -> AgentBackend: + """Factory function to get the appropriate backend instance. + Args: + backend_name: Backend identifier ("claude", "opencode", or None for default). + Reads from APPLY_BACKEND env var if not provided. + An AgentBackend instance. + InvalidBackendError: If the backend identifier is not supported. + """ + backend_name = resolve_backend_name(backend_name) + if backend_name not in VALID_BACKENDS: + raise InvalidBackendError(backend_name, VALID_BACKENDS) + if backend_name == "claude": + return ClaudeBackend() + if backend_name == "opencode": + return OpenCodeBackend() + # This should never happen due to the check above + raise InvalidBackendError(backend_name, VALID_BACKENDS) + + +def get_available_backends() -> frozenset[str]: + """Return the set of available backend identifiers.""" + return VALID_BACKENDS + + +def resolve_backend_name(backend_name: str | None = None) -> str: + if backend_name is None: + backend_name = os.environ.get("APPLY_BACKEND", DEFAULT_BACKEND) + return backend_name.lower().strip() + + +def resolve_default_model(backend_name: str) -> str: + if backend_name == "opencode": + return os.environ.get("APPLY_OPENCODE_MODEL") or os.environ.get("LLM_MODEL") or DEFAULT_OPENCODE_MODEL + return os.environ.get("APPLY_CLAUDE_MODEL") or DEFAULT_CLAUDE_MODEL + + +def resolve_default_agent(backend_name: str) -> str | None: + if backend_name == "opencode": + # Default to "applypilot-apply" agent defined in opencode.jsonc, unless overridden by env var + return os.environ.get("APPLY_OPENCODE_AGENT") or "applypilot-apply" + return None diff --git a/src/applypilot/apply/launcher.py b/src/applypilot/apply/launcher.py index 341a11a..7f9e9b9 100644 --- a/src/applypilot/apply/launcher.py +++ b/src/applypilot/apply/launcher.py @@ -27,31 +27,119 @@ from applypilot.database import get_connection from applypilot.apply import chrome, dashboard, prompt as prompt_mod from applypilot.apply.chrome import ( - launch_chrome, cleanup_worker, kill_all_chrome, - reset_worker_dir, cleanup_on_exit, _kill_process_tree, + launch_chrome, + cleanup_worker, + kill_all_chrome, + reset_worker_dir, + cleanup_on_exit, + _kill_process_tree, BASE_CDP_PORT, ) from applypilot.apply.dashboard import ( - init_worker, update_state, add_event, get_state, - render_full, get_totals, + init_worker, + update_state, + add_event, + get_state, + render_full, + get_totals, +) +import requests +from applypilot.apply.backends import ( + get_backend, + AgentBackend, + InvalidBackendError, + DEFAULT_BACKEND, + resolve_backend_name, + resolve_default_model, + resolve_default_agent, ) logger = logging.getLogger(__name__) + # Blocked sites loaded from config/sites.yaml def _load_blocked(): from applypilot.config import load_blocked_sites + return load_blocked_sites() + +def pre_navigate_to_job(job: dict, port: int, worker_id: int) -> bool: + """Pre-navigate to job URL using Chrome CDP HTTP endpoint. + + Uses Chrome's CDP HTTP endpoint to create a persistent tab that any + CDP client can attach to. Closes existing tabs first to avoid buildup. + + Args: + job: Job dictionary with url/application_url + port: CDP port for browser connection + worker_id: Worker identifier for logging + + Returns: + True if navigation succeeded, False otherwise + """ + try: + import urllib.parse + import requests + import time + + job_url = job.get("application_url") or job["url"] + add_event(f"[W{worker_id}] Pre-navigating to {job_url[:50]}...") + + base_url = f"http://localhost:{port}" + + # 1. Close existing tabs to avoid buildup + try: + list_resp = requests.get(f"{base_url}/json/list", timeout=5) + if list_resp.status_code == 200: + targets = list_resp.json() + for target in targets: + target_id = target.get("id") + if target_id and target.get("type") == "page": + try: + requests.get(f"{base_url}/json/close/{target_id}", timeout=5) + except Exception: + pass + except Exception as e: + add_event(f"[W{worker_id}] Warning: Could not close existing tabs: {str(e)[:30]}") + + # 2. Create new persistent tab via CDP HTTP endpoint + encoded_url = urllib.parse.quote(job_url, safe="") + response = requests.get(f"{base_url}/json/new?{encoded_url}", timeout=10) + + if response.status_code != 200: + add_event(f"[W{worker_id}] Pre-navigation failed: HTTP {response.status_code}") + return False + + target_info = response.json() + target_id = target_info.get("id") + + # 3. Wait for page to load + time.sleep(3) + + add_event(f"[W{worker_id}] Pre-navigation complete (target: {target_id})") + return True + + except ImportError as e: + add_event(f"[W{worker_id}] Pre-navigation skipped: missing dependency {e}") + logger.debug(f"Pre-navigation dependency missing for worker {worker_id}: {e}") + return False + except Exception as e: + add_event(f"[W{worker_id}] Pre-navigation failed: {str(e)[:30]}") + logger.warning(f"Pre-navigation failed for worker {worker_id}: {e}") + return False + + # How often to poll the DB when the queue is empty (seconds) POLL_INTERVAL = config.DEFAULTS["poll_interval"] # Thread-safe shutdown coordination _stop_event = threading.Event() -# Track active Claude Code processes for skip (Ctrl+C) handling -_claude_procs: dict[int, subprocess.Popen] = {} -_claude_lock = threading.Lock() +# Track active backends for skip (Ctrl+C) handling +# Each worker has one backend instance; signal handler queries for active processes +_worker_backends: dict[int, AgentBackend] = {} +_backends_lock = threading.Lock() # Register cleanup on exit atexit.register(cleanup_on_exit) @@ -63,6 +151,7 @@ def _load_blocked(): # MCP config # --------------------------------------------------------------------------- + def _make_mcp_config(cdp_port: int) -> dict: """Build MCP config dict for a specific CDP port.""" return { @@ -87,8 +176,8 @@ def _make_mcp_config(cdp_port: int) -> dict: # Database operations # --------------------------------------------------------------------------- -def acquire_job(target_url: str | None = None, min_score: int = 7, - worker_id: int = 0) -> dict | None: + +def acquire_job(target_url: str | None = None, min_score: int = 7, worker_id: int = 0) -> dict | None: """Atomically acquire the next job to apply to. Args: @@ -105,15 +194,18 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, if target_url: like = f"%{target_url.split('?')[0].rstrip('/')}%" - row = conn.execute(""" + row = conn.execute( + """ SELECT url, title, site, application_url, tailored_resume_path, fit_score, location, full_description, cover_letter_path FROM jobs WHERE (url = ? OR application_url = ? OR application_url LIKE ? OR url LIKE ?) AND tailored_resume_path IS NOT NULL - AND apply_status != 'in_progress' + AND (apply_status IS NULL OR apply_status != 'in_progress') LIMIT 1 - """, (target_url, target_url, like, like)).fetchone() + """, + (target_url, target_url, like, like), + ).fetchone() else: blocked_sites, blocked_patterns = _load_blocked() # Build parameterized filters to avoid SQL injection @@ -127,7 +219,8 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, if blocked_patterns: url_clauses = " ".join(f"AND url NOT LIKE ?" for _ in blocked_patterns) params.extend(blocked_patterns) - row = conn.execute(f""" + row = conn.execute( + f""" SELECT url, title, site, application_url, tailored_resume_path, fit_score, location, full_description, cover_letter_path FROM jobs @@ -139,7 +232,9 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, {url_clauses} ORDER BY fit_score DESC, url LIMIT 1 - """, [config.DEFAULTS["max_apply_attempts"]] + params).fetchone() + """, + [config.DEFAULTS["max_apply_attempts"]] + params, + ).fetchone() if not row: conn.rollback() @@ -147,6 +242,7 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, # Skip manual ATS sites (unsolvable CAPTCHAs) from applypilot.config import is_manual_ats + apply_url = row["application_url"] or row["url"] if is_manual_ats(apply_url): conn.execute( @@ -158,12 +254,15 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, return None now = datetime.now(timezone.utc).isoformat() - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'in_progress', agent_id = ?, last_attempted_at = ? WHERE url = ? - """, (f"worker-{worker_id}", now, row["url"])) + """, + (f"worker-{worker_id}", now, row["url"]), + ) conn.commit() return dict(row) @@ -172,27 +271,38 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, raise -def mark_result(url: str, status: str, error: str | None = None, - permanent: bool = False, duration_ms: int | None = None, - task_id: str | None = None) -> None: +def mark_result( + url: str, + status: str, + error: str | None = None, + permanent: bool = False, + duration_ms: int | None = None, + task_id: str | None = None, +) -> None: """Update a job's apply status in the database.""" conn = get_connection() now = datetime.now(timezone.utc).isoformat() if status == "applied": - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'applied', applied_at = ?, apply_error = NULL, agent_id = NULL, apply_duration_ms = ?, apply_task_id = ? WHERE url = ? - """, (now, duration_ms, task_id, url)) + """, + (now, duration_ms, task_id, url), + ) else: attempts = 99 if permanent else "COALESCE(apply_attempts, 0) + 1" - conn.execute(f""" + conn.execute( + f""" UPDATE jobs SET apply_status = ?, apply_error = ?, apply_attempts = {attempts}, agent_id = NULL, apply_duration_ms = ?, apply_task_id = ? WHERE url = ? - """, (status, error or "unknown", duration_ms, task_id, url)) + """, + (status, error or "unknown", duration_ms, task_id, url), + ) conn.commit() @@ -210,8 +320,8 @@ def release_lock(url: str) -> None: # Utility modes (--gen, --mark-applied, --mark-failed, --reset-failed) # --------------------------------------------------------------------------- -def gen_prompt(target_url: str, min_score: int = 7, - model: str = "sonnet", worker_id: int = 0) -> Path | None: + +def gen_prompt(target_url: str, min_score: int = 7, model: str = "sonnet", worker_id: int = 0) -> Path | None: """Generate a prompt file and print the Claude CLI command for manual debugging. Returns: @@ -258,17 +368,23 @@ def mark_job(url: str, status: str, reason: str | None = None) -> None: conn = get_connection() now = datetime.now(timezone.utc).isoformat() if status == "applied": - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'applied', applied_at = ?, apply_error = NULL, agent_id = NULL WHERE url = ? - """, (now, url)) + """, + (now, url), + ) else: - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'failed', apply_error = ?, apply_attempts = 99, agent_id = NULL WHERE url = ? - """, (reason or "manual", url)) + """, + (reason or "manual", url), + ) conn.commit() @@ -294,9 +410,26 @@ def reset_failed() -> int: # Per-job execution # --------------------------------------------------------------------------- -def run_job(job: dict, port: int, worker_id: int = 0, - model: str = "sonnet", dry_run: bool = False) -> tuple[str, int]: - """Spawn a Claude Code session for one job application. + +def run_job( + job: dict, + port: int, + worker_id: int = 0, + model: str | None = None, + agent: str | None = None, + dry_run: bool = False, + backend: AgentBackend | None = None, +) -> tuple[str, int]: + """Spawn an agent backend session for one job application. + + Args: + job: Job dictionary with all required fields. + port: CDP port for browser connection. + worker_id: Numeric worker identifier. + model: Model name (backend-specific). Uses backend default if None. + agent: Agent name for OpenCode backend. Ignored by Claude backend. + dry_run: Don't click Submit. + backend: AgentBackend instance. If None, uses default Claude backend. Returns: Tuple of (status_string, duration_ms). Status is one of: @@ -321,198 +454,39 @@ def run_job(job: dict, port: int, worker_id: int = 0, mcp_config_path = config.APP_DIR / f".mcp-apply-{worker_id}.json" mcp_config_path.write_text(json.dumps(_make_mcp_config(port)), encoding="utf-8") - # Build claude command - cmd = [ - "claude", - "--model", model, - "-p", - "--mcp-config", str(mcp_config_path), - "--permission-mode", "bypassPermissions", - "--no-session-persistence", - "--disallowedTools", ( - "mcp__gmail__draft_email,mcp__gmail__modify_email," - "mcp__gmail__delete_email,mcp__gmail__download_attachment," - "mcp__gmail__batch_modify_emails,mcp__gmail__batch_delete_emails," - "mcp__gmail__create_label,mcp__gmail__update_label," - "mcp__gmail__delete_label,mcp__gmail__get_or_create_label," - "mcp__gmail__list_email_labels,mcp__gmail__create_filter," - "mcp__gmail__list_filters,mcp__gmail__get_filter," - "mcp__gmail__delete_filter" - ), - "--output-format", "stream-json", - "--verbose", "-", - ] - - env = os.environ.copy() - env.pop("CLAUDECODE", None) - env.pop("CLAUDE_CODE_ENTRYPOINT", None) + # Get or create backend + if backend is None: + backend = get_backend(DEFAULT_BACKEND) + resolved_model = model or resolve_default_model(backend.name) + resolved_agent = agent or resolve_default_agent(backend.name) + required_mcp_servers = list(_make_mcp_config(port)["mcpServers"].keys()) worker_dir = reset_worker_dir(worker_id) - update_state(worker_id, status="applying", job_title=job["title"], - company=job.get("site", ""), score=job.get("fit_score", 0), - start_time=time.time(), actions=0, last_action="starting") - add_event(f"[W{worker_id}] Starting: {job['title'][:40]} @ {job.get('site', '')}") - - worker_log = config.LOG_DIR / f"worker-{worker_id}.log" - ts_header = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - log_header = ( - f"\n{'=' * 60}\n" - f"[{ts_header}] {job['title']} @ {job.get('site', '')}\n" - f"URL: {job.get('application_url') or job['url']}\n" - f"Score: {job.get('fit_score', 'N/A')}/10\n" - f"{'=' * 60}\n" - ) - - start = time.time() - stats: dict = {} - proc = None + # Copy resume PDF to worker directory so agent can access it + import shutil + current_pdf = config.APPLY_WORKER_DIR / "current" / "Nicholas_Roth_Resume.pdf" try: - proc = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - encoding="utf-8", - errors="replace", - env=env, - cwd=str(worker_dir), - ) - with _claude_lock: - _claude_procs[worker_id] = proc - - proc.stdin.write(agent_prompt) - proc.stdin.close() - - text_parts: list[str] = [] - with open(worker_log, "a", encoding="utf-8") as lf: - lf.write(log_header) - - for line in proc.stdout: - line = line.strip() - if not line: - continue - try: - msg = json.loads(line) - msg_type = msg.get("type") - if msg_type == "assistant": - for block in msg.get("message", {}).get("content", []): - bt = block.get("type") - if bt == "text": - text_parts.append(block["text"]) - lf.write(block["text"] + "\n") - elif bt == "tool_use": - name = ( - block.get("name", "") - .replace("mcp__playwright__", "") - .replace("mcp__gmail__", "gmail:") - ) - inp = block.get("input", {}) - if "url" in inp: - desc = f"{name} {inp['url'][:60]}" - elif "ref" in inp: - desc = f"{name} {inp.get('element', inp.get('text', ''))}"[:50] - elif "fields" in inp: - desc = f"{name} ({len(inp['fields'])} fields)" - elif "paths" in inp: - desc = f"{name} upload" - else: - desc = name - - lf.write(f" >> {desc}\n") - ws = get_state(worker_id) - cur_actions = ws.actions if ws else 0 - update_state(worker_id, - actions=cur_actions + 1, - last_action=desc[:35]) - elif msg_type == "result": - stats = { - "input_tokens": msg.get("usage", {}).get("input_tokens", 0), - "output_tokens": msg.get("usage", {}).get("output_tokens", 0), - "cache_read": msg.get("usage", {}).get("cache_read_input_tokens", 0), - "cache_create": msg.get("usage", {}).get("cache_creation_input_tokens", 0), - "cost_usd": msg.get("total_cost_usd", 0), - "turns": msg.get("num_turns", 0), - } - text_parts.append(msg.get("result", "")) - except json.JSONDecodeError: - text_parts.append(line) - lf.write(line + "\n") - - proc.wait(timeout=300) - returncode = proc.returncode - proc = None - - if returncode and returncode < 0: - return "skipped", int((time.time() - start) * 1000) - - output = "\n".join(text_parts) - elapsed = int(time.time() - start) - duration_ms = int((time.time() - start) * 1000) - - ts = datetime.now().strftime("%Y%m%d_%H%M%S") - job_log = config.LOG_DIR / f"claude_{ts}_w{worker_id}_{job.get('site', 'unknown')[:20]}.txt" - job_log.write_text(output, encoding="utf-8") - - if stats: - cost = stats.get("cost_usd", 0) - ws = get_state(worker_id) - prev_cost = ws.total_cost if ws else 0.0 - update_state(worker_id, total_cost=prev_cost + cost) - - def _clean_reason(s: str) -> str: - return re.sub(r'[*`"]+$', '', s).strip() - - for result_status in ["APPLIED", "EXPIRED", "CAPTCHA", "LOGIN_ISSUE"]: - if f"RESULT:{result_status}" in output: - add_event(f"[W{worker_id}] {result_status} ({elapsed}s): {job['title'][:30]}") - update_state(worker_id, status=result_status.lower(), - last_action=f"{result_status} ({elapsed}s)") - return result_status.lower(), duration_ms - - if "RESULT:FAILED" in output: - for out_line in output.split("\n"): - if "RESULT:FAILED" in out_line: - reason = ( - out_line.split("RESULT:FAILED:")[-1].strip() - if ":" in out_line[out_line.index("FAILED") + 6:] - else "unknown" - ) - reason = _clean_reason(reason) - PROMOTE_TO_STATUS = {"captcha", "expired", "login_issue"} - if reason in PROMOTE_TO_STATUS: - add_event(f"[W{worker_id}] {reason.upper()} ({elapsed}s): {job['title'][:30]}") - update_state(worker_id, status=reason, - last_action=f"{reason.upper()} ({elapsed}s)") - return reason, duration_ms - add_event(f"[W{worker_id}] FAILED ({elapsed}s): {reason[:30]}") - update_state(worker_id, status="failed", - last_action=f"FAILED: {reason[:25]}") - return f"failed:{reason}", duration_ms - return "failed:unknown", duration_ms - - add_event(f"[W{worker_id}] NO RESULT ({elapsed}s)") - update_state(worker_id, status="failed", last_action=f"no result ({elapsed}s)") - return "failed:no_result_line", duration_ms - - except subprocess.TimeoutExpired: - duration_ms = int((time.time() - start) * 1000) - elapsed = int(time.time() - start) - add_event(f"[W{worker_id}] TIMEOUT ({elapsed}s)") - update_state(worker_id, status="failed", last_action=f"TIMEOUT ({elapsed}s)") - return "failed:timeout", duration_ms - except Exception as e: - duration_ms = int((time.time() - start) * 1000) - add_event(f"[W{worker_id}] ERROR: {str(e)[:40]}") - update_state(worker_id, status="failed", last_action=f"ERROR: {str(e)[:25]}") - return f"failed:{str(e)[:100]}", duration_ms - finally: - with _claude_lock: - _claude_procs.pop(worker_id, None) - if proc is not None and proc.poll() is None: - _kill_process_tree(proc.pid) + if current_pdf.exists(): + worker_pdf = worker_dir / "Nicholas_Roth_Resume.pdf" + shutil.copy(str(current_pdf), str(worker_pdf)) + except Exception: + logger.debug("Could not copy resume to worker dir", exc_info=True) + + # Delegate to backend implementation + return backend.run_job( + job=job, + port=port, + worker_id=worker_id, + model=resolved_model, + agent=resolved_agent, + dry_run=dry_run, + prompt=agent_prompt, + mcp_config_path=mcp_config_path, + worker_dir=worker_dir, + required_mcp_servers=required_mcp_servers, + ) # --------------------------------------------------------------------------- @@ -520,12 +494,20 @@ def _clean_reason(s: str) -> str: # --------------------------------------------------------------------------- PERMANENT_FAILURES: set[str] = { - "expired", "captcha", "login_issue", - "not_eligible_location", "not_eligible_salary", - "already_applied", "account_required", - "not_a_job_application", "unsafe_permissions", - "unsafe_verification", "sso_required", - "site_blocked", "cloudflare_blocked", "blocked_by_cloudflare", + "expired", + "captcha", + "login_issue", + "not_eligible_location", + "not_eligible_salary", + "already_applied", + "account_required", + "not_a_job_application", + "unsafe_permissions", + "unsafe_verification", + "sso_required", + "site_blocked", + "cloudflare_blocked", + "blocked_by_cloudflare", } PERMANENT_PREFIXES: tuple[str, ...] = ("site_blocked", "cloudflare", "blocked_by") @@ -545,10 +527,18 @@ def _is_permanent_failure(result: str) -> bool: # Worker loop # --------------------------------------------------------------------------- -def worker_loop(worker_id: int = 0, limit: int = 1, - target_url: str | None = None, - min_score: int = 7, headless: bool = False, - model: str = "sonnet", dry_run: bool = False) -> tuple[int, int]: + +def worker_loop( + worker_id: int = 0, + limit: int = 1, + target_url: str | None = None, + min_score: int = 7, + headless: bool = False, + model: str | None = None, + agent: str | None = None, + dry_run: bool = False, + backend_name: str | None = None, +) -> tuple[int, int]: """Run jobs sequentially until limit is reached or queue is empty. Args: @@ -557,8 +547,10 @@ def worker_loop(worker_id: int = 0, limit: int = 1, target_url: Apply to a specific URL. min_score: Minimum fit_score threshold. headless: Run Chrome headless. - model: Claude model name. + model: Backend model name override. + agent: OpenCode agent override. dry_run: Don't click Submit. + backend_name: Backend identifier ('claude' or via APPLY_BACKEND env var). Returns: Tuple of (applied_count, failed_count). @@ -570,23 +562,25 @@ def worker_loop(worker_id: int = 0, limit: int = 1, empty_polls = 0 port = BASE_CDP_PORT + worker_id + # Initialize backend for this worker + backend = get_backend(backend_name) + with _backends_lock: + _worker_backends[worker_id] = backend + while not _stop_event.is_set(): if not continuous and jobs_done >= limit: break - update_state(worker_id, status="idle", job_title="", company="", - last_action="waiting for job", actions=0) + update_state(worker_id, status="idle", job_title="", company="", last_action="waiting for job", actions=0) - job = acquire_job(target_url=target_url, min_score=min_score, - worker_id=worker_id) + job = acquire_job(target_url=target_url, min_score=min_score, worker_id=worker_id) if not job: if not continuous: add_event(f"[W{worker_id}] Queue empty") update_state(worker_id, status="done", last_action="queue empty") break empty_polls += 1 - update_state(worker_id, status="idle", - last_action=f"polling ({empty_polls})") + update_state(worker_id, status="idle", last_action=f"polling ({empty_polls})") if empty_polls == 1: add_event(f"[W{worker_id}] Queue empty, polling every {POLL_INTERVAL}s...") # Use Event.wait for interruptible sleep @@ -601,8 +595,23 @@ def worker_loop(worker_id: int = 0, limit: int = 1, add_event(f"[W{worker_id}] Launching Chrome...") chrome_proc = launch_chrome(worker_id, port=port, headless=headless) - result, duration_ms = run_job(job, port=port, worker_id=worker_id, - model=model, dry_run=dry_run) + # Preload the job URL in the launched Chrome instance so the agent + # session starts with the page already loaded. This reduces agent + # startup latency and avoids duplicate navigation attempts by the + # agent itself. If pre-navigation fails we continue anyway; the + # agent will navigate itself as a fallback. + try: + pre_ok = pre_navigate_to_job(job, port=port, worker_id=worker_id) + if pre_ok: + add_event(f"[W{worker_id}] Pre-navigation succeeded") + else: + add_event(f"[W{worker_id}] Pre-navigation skipped/failed") + except Exception as e: + logger.debug("Pre-navigation error for worker %d: %s", worker_id, e) + + result, duration_ms = run_job( + job, port=port, worker_id=worker_id, model=model, agent=agent, dry_run=dry_run, backend=backend + ) if result == "skipped": release_lock(job["url"]) @@ -611,16 +620,14 @@ def worker_loop(worker_id: int = 0, limit: int = 1, elif result == "applied": mark_result(job["url"], "applied", duration_ms=duration_ms) applied += 1 - update_state(worker_id, jobs_applied=applied, - jobs_done=applied + failed) + update_state(worker_id, jobs_applied=applied, jobs_done=applied + failed) else: reason = result.split(":", 1)[-1] if ":" in result else result - mark_result(job["url"], "failed", reason, - permanent=_is_permanent_failure(result), - duration_ms=duration_ms) + mark_result( + job["url"], "failed", reason, permanent=_is_permanent_failure(result), duration_ms=duration_ms + ) failed += 1 - update_state(worker_id, jobs_failed=failed, - jobs_done=applied + failed) + update_state(worker_id, jobs_failed=failed, jobs_done=applied + failed) except KeyboardInterrupt: release_lock(job["url"]) @@ -650,10 +657,20 @@ def worker_loop(worker_id: int = 0, limit: int = 1, # Main entry point (called from cli.py) # --------------------------------------------------------------------------- -def main(limit: int = 1, target_url: str | None = None, - min_score: int = 7, headless: bool = False, model: str = "sonnet", - dry_run: bool = False, continuous: bool = False, - poll_interval: int = 60, workers: int = 1) -> None: + +def main( + limit: int = 1, + target_url: str | None = None, + min_score: int = 7, + headless: bool = False, + model: str | None = None, + agent: str | None = None, + dry_run: bool = False, + continuous: bool = False, + poll_interval: int = 60, + workers: int = 1, + backend_name: str | None = None, +) -> None: """Launch the apply pipeline. Args: @@ -661,16 +678,30 @@ def main(limit: int = 1, target_url: str | None = None, target_url: Apply to a specific URL. min_score: Minimum fit_score threshold. headless: Run Chrome in headless mode. - model: Claude model name. + model: Backend model override. + agent: OpenCode agent override. dry_run: Don't click Submit. continuous: Run forever, polling for new jobs. poll_interval: Seconds between DB polls when queue is empty. workers: Number of parallel workers (default 1). + backend_name: Backend identifier ('claude' or via APPLY_BACKEND env var). """ global POLL_INTERVAL POLL_INTERVAL = poll_interval _stop_event.clear() + resolved_backend_name = resolve_backend_name(backend_name) + resolved_model = model or resolve_default_model(resolved_backend_name) + resolved_agent = agent or resolve_default_agent(resolved_backend_name) + + # Validate backend early to fail fast + try: + get_backend(resolved_backend_name) + except InvalidBackendError as e: + console = Console() + console.print(f"[red bold]Error: {e}[/red bold]") + raise SystemExit(1) + config.ensure_dirs() console = Console() @@ -687,6 +718,11 @@ def main(limit: int = 1, target_url: str | None = None, worker_label = f"{workers} worker{'s' if workers > 1 else ''}" console.print(f"Launching apply pipeline ({mode_label}, {worker_label}, poll every {POLL_INTERVAL}s)...") + console.print( + f"[dim]Backend: {resolved_backend_name} | Model: {resolved_model}" + + (f" | Agent: {resolved_agent}" if resolved_agent else "") + + "[/dim]" + ) console.print("[dim]Ctrl+C = skip current job(s) | Ctrl+C x2 = stop[/dim]") # Double Ctrl+C handler @@ -697,18 +733,20 @@ def _sigint_handler(sig, frame): _ctrl_c_count += 1 if _ctrl_c_count == 1: console.print("\n[yellow]Skipping current job(s)... (Ctrl+C again to STOP)[/yellow]") - # Kill all active Claude processes to skip current jobs - with _claude_lock: - for wid, cproc in list(_claude_procs.items()): - if cproc.poll() is None: - _kill_process_tree(cproc.pid) + # Kill all active backend processes to skip current jobs + with _backends_lock: + for wid, be in list(_worker_backends.items()): + proc = be.get_active_proc(wid) + if proc is not None and proc.poll() is None: + _kill_process_tree(proc.pid) else: console.print("\n[red bold]STOPPING[/red bold]") _stop_event.set() - with _claude_lock: - for wid, cproc in list(_claude_procs.items()): - if cproc.poll() is None: - _kill_process_tree(cproc.pid) + with _backends_lock: + for wid, be in list(_worker_backends.items()): + proc = be.get_active_proc(wid) + if proc is not None and proc.poll() is None: + _kill_process_tree(proc.pid) kill_all_chrome() raise KeyboardInterrupt @@ -735,21 +773,21 @@ def _refresh(): target_url=target_url, min_score=min_score, headless=headless, - model=model, + model=resolved_model, + agent=resolved_agent, dry_run=dry_run, + backend_name=resolved_backend_name, ) else: # Multi-worker — distribute limit across workers if effective_limit: base = effective_limit // workers extra = effective_limit % workers - limits = [base + (1 if i < extra else 0) - for i in range(workers)] + limits = [base + (1 if i < extra else 0) for i in range(workers)] else: limits = [0] * workers # continuous mode - with ThreadPoolExecutor(max_workers=workers, - thread_name_prefix="apply-worker") as executor: + with ThreadPoolExecutor(max_workers=workers, thread_name_prefix="apply-worker") as executor: futures = { executor.submit( worker_loop, @@ -758,8 +796,10 @@ def _refresh(): target_url=target_url, min_score=min_score, headless=headless, - model=model, + model=resolved_model, + agent=resolved_agent, dry_run=dry_run, + backend_name=resolved_backend_name, ): i for i in range(workers) } @@ -781,10 +821,7 @@ def _refresh(): live.update(render_full()) totals = get_totals() - console.print( - f"\n[bold]Done: {total_applied} applied, {total_failed} failed " - f"(${totals['cost']:.3f})[/bold]" - ) + console.print(f"\n[bold]Done: {total_applied} applied, {total_failed} failed (${totals['cost']:.3f})[/bold]") console.print(f"Logs: {config.LOG_DIR}") except KeyboardInterrupt: diff --git a/src/applypilot/apply/prompt.py b/src/applypilot/apply/prompt.py index 37c3790..c43bc87 100644 --- a/src/applypilot/apply/prompt.py +++ b/src/applypilot/apply/prompt.py @@ -525,6 +525,18 @@ def build_prompt(job: dict, tailored_resume: str, Resume PDF (upload this): {pdf_path} Cover Letter PDF (upload if asked): {cl_upload_path or "N/A"} +== TOOL USAGE == +Key browser tools to use: +- browser_navigate: Navigate to a URL +- browser_click: Click an element by description +- browser_fill_form: Fill form fields by placeholder/name +- browser_file_upload: Upload file (must click upload button FIRST) +- browser_snapshot: Read page text content +- browser_evaluate: Run JavaScript to query DOM + +For file upload: 1) Click upload button 2) Then call browser_file_upload with path. +For form fields: Use browser_evaluate to discover all inputs on page first. + == RESUME TEXT (use when filling text fields) == {tailored_resume} @@ -557,32 +569,103 @@ def build_prompt(job: dict, tailored_resume: str, {screening_section} +== DEBUG LOGGING == +You MUST report your progress at each step so we can debug issues. After EACH major action (navigate, snapshot, click, fill), output a DEBUG line: +DEBUG: Step X - Action: [what you did] - Result: [what happened] - Next: [what you plan to do] + +For example: +DEBUG: Step 1 - Action: Navigated to URL - Result: Page loaded successfully - Next: Taking snapshot +DEBUG: Step 2 - Action: Snapshot taken - Result: Found job posting with Apply button visible - Next: Clicking Apply +DEBUG: Step 4 - Action: Clicked Apply button - Result: Form opened in new tab - Next: Switching to form tab + +This helps us identify where issues occur. + == STEP-BY-STEP == 1. browser_navigate to the job URL. -2. browser_snapshot to read the page. Then run CAPTCHA DETECT (see CAPTCHA section). If a CAPTCHA is found, solve it before continuing. -3. LOCATION CHECK. Read the page for location info. If not eligible, output RESULT and stop. -4. Find and click the Apply button. If email-only (page says "email resume to X"): +2. browser_evaluate to get ALL interactive elements AND form fields: + - Get all clickable elements: () => document.querySelectorAll('button, a, [role="button"], input[type="submit"]') + - Get all form fields: () => document.querySelectorAll('input, textarea, select') + This gives you a list of ALL elements without scrolling. Look for Apply/Submit/Next/Continue buttons and form inputs. +3. browser_snapshot ONLY to read text content, NOT to find elements by position. Use the element list from step 2 for clicking. +4. Run CAPTCHA DETECT (see CAPTCHA section). If a CAPTCHA is found, solve it before continuing. +5. LOCATION CHECK. Read the page for location info. If not eligible, output RESULT and stop. + +LINKEDIN EASY APPLY - CRITICAL FAST PATH: + When on ANY LinkedIn job page (url contains linkedin.com/jobs): + - IGNORE everything except finding the "Easy Apply" button + - DO NOT read job description, DO NOT scroll, DO NOT analyze + - IMMEDIATELY use browser_evaluate to find and click Easy Apply: + () => { + const btn = Array.from(document.querySelectorAll('button, a')).find(b => + b.textContent.toLowerCase().includes('easy apply') || + b.getAttribute('aria-label')?.toLowerCase().includes('easy apply') + ); + if (btn) { btn.click(); return 'clicked easy apply'; } + return 'not found'; + } + - If JavaScript fails, use browser_click on "Easy Apply" text + - MAXIMUM 5 seconds from page load to Easy Apply click + + LINKEDIN MODAL HANDLING: + When modal opens: + - DO NOT scroll page behind modal + - Find "Continue" button and click immediately + - If blocked by overlay (#interop-outlet): + () => { + const overlay = document.querySelector('#interop-outlet'); + if (overlay) overlay.style.pointerEvents = 'none'; + const btn = document.querySelector('button[aria-label*="Continue"], button.artdeco-button--primary'); + if (btn) { btn.click(); return 'clicked'; } + return 'not found'; + } + +6. Find and click the Apply button using browser_click with text matching. Common button texts: "Apply", "Apply Now", "Apply for this job", "I'm Interested", "Submit Application", "Start Application". + - If multiple apply buttons exist, click the main one (usually largest/most prominent) + - If you can't find it in your element list, run: browser_evaluate () => {{ return Array.from(document.querySelectorAll('button, a')).filter(b => /apply|submit|interest/i.test(b.textContent)).map(b => ({{ text: b.textContent.trim(), outerHTML: b.outerHTML.substring(0,100) }})) }} + If email-only (page says "email resume to X"): - send_email with subject "Application for {job['title']} -- {display_name}", body = 2-3 sentence pitch + contact info, attach resume PDF: ["{pdf_path}"] - Output RESULT:APPLIED. Done. After clicking Apply: browser_snapshot. Run CAPTCHA DETECT -- many sites trigger CAPTCHAs right after the Apply click. If found, solve before continuing. -5. Login wall? - 5a. FIRST: check the URL. If you landed on {', '.join(blocked_sso)}, or any SSO/OAuth page -> STOP. Output RESULT:FAILED:sso_required. Do NOT try to sign in to Google/Microsoft/SSO. - 5b. Check for popups. Run browser_tabs action "list". If a new tab/window appeared (login popup), switch to it with browser_tabs action "select". Check the URL there too -- if it's SSO -> RESULT:FAILED:sso_required. - 5c. Regular login form (employer's own site)? Try sign in: {personal['email']} / {personal.get('password', '')} - 5d. After clicking Login/Sign-in: run CAPTCHA DETECT. Login pages frequently have invisible CAPTCHAs that silently block form submissions. If found, solve it then retry login. - 5e. Sign in failed? Try sign up with same email and password. - 5f. Need email verification? Use search_emails + read_email to get the code. - 5g. After login, run browser_tabs action "list" again. Switch back to the application tab if needed. - 5h. All failed? Output RESULT:FAILED:login_issue. Do not loop. -6. Upload resume. ALWAYS upload fresh -- delete any existing resume first, then browser_file_upload with the PDF path above. This is the tailored resume for THIS job. Non-negotiable. -7. Upload cover letter if there's a field for it. Text field -> paste the cover letter text. File upload -> use the cover letter PDF path. -8. Check ALL pre-filled fields. ATS systems parse your resume and auto-fill -- it's often WRONG. + 7. Login wall or Auth required? + 7a. LINKEDIN SPECIFIC: If you see the LinkedIn auth wall or sign-up page: + - FIRST: Check if you're already logged in. Look for a profile picture, name, or "Me" dropdown in the top navigation. If logged in elements are visible -> SKIP auth, proceed directly to find and click the "Easy Apply" button on the job page. + - If NOT logged in: This is CRITICAL - you MUST attempt Google authentication FIRST: + 1. Look IMMEDIATELY for "Sign in with Google", "Continue with Google", "Sign in", or Google logo buttons on the auth wall + 2. Also look for text like "Sign in with Google to apply" or similar Google auth options + 3. If you see ANY Google sign-in option, CLICK IT IMMEDIATELY - this is the PRIMARY path + 4. Only if NO Google option is visible on the auth wall, then click "Sign in" link (not "Join now") to see more options + - Google auth flow: After clicking Google sign-in, wait for redirect/popup. If already authenticated with Google in this browser, it may auto-approve. If account selector appears, pick the first account. Once back on LinkedIn, proceed to apply. + - IMPORTANT: The applicant's Google account is already authenticated in this browser. Google sign-in should work automatically or with minimal interaction. DO NOT give up without trying Google auth first. + 7b. OTHER SITES - Check for Google Sign-In: Look for buttons like "Sign in with Google", "Continue with Google", or Google logo buttons. If present: + - Click the Google sign-in button + - Wait for redirect/popup and complete Google auth (auto-approve if already authenticated, otherwise select account) + - Once back on the job site, continue with the application flow + 7c. If you land on SSO/OAuth pages other than Google (Microsoft, Okta, corporate SSO) -> STOP. Output RESULT:FAILED:sso_required. + 7d. Check for popups. Run browser_tabs action "list". If a new tab/window appeared (login popup), switch to it with browser_tabs action "select". Check the URL there too -- if it's non-Google SSO -> RESULT:FAILED:sso_required. + 7e. Regular login form (employer's own site)? Try sign in: {personal['email']} / {personal.get('password', '')} + 7f. After clicking Login/Sign-in: run CAPTCHA DETECT. Login pages frequently have invisible CAPTCHAs that silently block form submissions. If found, solve it then retry login. + 7g. Sign in failed? Try sign up with same email and password. + 7h. Need email verification? Use search_emails + read_email to get the code. + 7i. After login, run browser_tabs action "list" again. Switch back to the application tab if needed. + 7j. All failed? Output RESULT:FAILED:login_issue. Do not loop. +8. Upload resume (EXISTENCE-BASED - ACT FAST). + RULE: If you see the resume step, take action within 2 seconds. DO NOT sit and think. + + a) Look for upload button ("Upload", "Select File", "+" icon) -> CLICK IT IMMEDIATELY. + b) Call browser_file_upload with {{"paths": ["{pdf_path}"]}}. + c) Wait 2 seconds for upload to complete. + d) Click "Next"/"Continue" immediately. DO NOT verify, DO NOT check filename, DO NOT scroll. + + IF CLICK FAILS: Use this JavaScript immediately: + () => {{ const btn = Array.from(document.querySelectorAll('button')).find(b => /next|continue/i.test(b.textContent)); if(btn) {{ btn.click(); return 'ok'; }} return 'none'; }} +9. Upload cover letter if there's a field for it. Text field -> paste the cover letter text. File upload -> click upload button first, then browser_file_upload with the cover letter PDF path. +10. Check ALL pre-filled fields. ATS systems parse your resume and auto-fill -- it's often WRONG. - "Current Job Title" or "Most Recent Title" -> use the title from the TAILORED RESUME summary, NOT whatever the parser guessed. - Compare every other field to the APPLICANT PROFILE. Fix mismatches. Fill empty fields. -9. Answer screening questions using the rules above. -10. {submit_instruction} -11. After submit: browser_snapshot. Run CAPTCHA DETECT -- submit buttons often trigger invisible CAPTCHAs. If found, solve it (the form will auto-submit once the token clears, or you may need to click Submit again). Then check for new tabs (browser_tabs action: "list"). Switch to newest, close old. Snapshot to confirm submission. Look for "thank you" or "application received". -12. Output your result. +11. Answer screening questions using the rules above. +12. {submit_instruction} +13. After submit: browser_snapshot. Run CAPTCHA DETECT -- submit buttons often trigger invisible CAPTCHAs. If found, solve it (the form will auto-submit once the token clears, or you may need to click Submit again). Then check for new tabs (browser_tabs action: "list"). Switch to newest, close old. Snapshot to confirm submission. Look for "thank you" or "application received". +14. Output your result. == RESULT CODES (output EXACTLY one) == RESULT:APPLIED -- submitted successfully @@ -594,17 +677,24 @@ def build_prompt(job: dict, tailored_resume: str, RESULT:FAILED:reason -- any other failure (brief reason) == BROWSER EFFICIENCY == -- browser_snapshot ONCE per page to understand it. Then use browser_take_screenshot to check results (10x less memory). -- Only snapshot again when you need element refs to click/fill. -- Multi-page forms (Workday, Taleo, iCIMS): snapshot each new page, fill all fields, click Next/Continue. Repeat until final review page. -- Fill ALL fields in ONE browser_fill_form call. Not one at a time. +- **SCROLL ONLY WHEN NECESSARY**. Use browser_evaluate to query the DOM programmatically instead of scrolling. Example: () => document.querySelectorAll('button').map(b => b.textContent) +- **ALWAYS use browser_evaluate first** to find elements by selector or text content. This is 100x faster than visual scanning. +- browser_snapshot ONCE per page to read text content, NOT to find element positions. +- Only snapshot again when you need to see the visual layout for verification. +- Use browser_click with text references (element descriptions), not coordinates. +- Multi-page forms (Workday, Taleo, iCIMS): Use browser_evaluate to list all inputs, then fill ALL fields in ONE browser_fill_form call. Not one at a time. - Keep your thinking SHORT. Don't repeat page structure back. -- CAPTCHA AWARENESS: After any navigation, Apply/Submit/Login click, or when a page feels stuck -- run CAPTCHA DETECT (see CAPTCHA section). Invisible CAPTCHAs (Turnstile, reCAPTCHA v3) show NO visual widget but block form submissions silently. The detect script finds them even when invisible. +- CAPTCHA AWARENESS: After any navigation, Apply/Submit/Login click, or when a page feels stuck -- run CAPTCHA DETECT (see CAPTCHA section). == FORM TRICKS == - Popup/new window opened? browser_tabs action "list" to see all tabs. browser_tabs action "select" with the tab index to switch. ALWAYS check for new tabs after clicking login/apply/sign-in buttons. - "Upload your resume" pre-fill page (Workday, Lever, etc.): This is NOT the application form yet. Click "Select file" or the upload area, then browser_file_upload with the resume PDF path. Wait for parsing to finish. Then click Next/Continue to reach the actual form. -- File upload not working? Try: (1) browser_click the upload button/area, (2) browser_file_upload with the path. If still failing, look for a hidden file input or a "Select file" link and click that first. +- File upload workflow (MUST follow this order): + 1. browser_click the upload button/dropzone FIRST - this triggers the file chooser modal + 2. THEN call browser_file_upload with {{"paths": ["/absolute/path/to/file.pdf"]}} + 3. NEVER call browser_file_upload without clicking the upload button first - it will fail + If upload button is hidden: use browser_evaluate to find and click the hidden input, or look for "Select file" link. + - Workday specific: Look for "Resume" section, click "Upload" or file icon, then browser_file_upload. - Dropdown won't fill? browser_click to open it, then browser_click the option. - Checkbox won't check via fill_form? Use browser_click on it instead. Snapshot to verify. - Phone field with country prefix: just type digits {phone_digits} diff --git a/src/applypilot/cli.py b/src/applypilot/cli.py index 6c8be91..ac552ea 100644 --- a/src/applypilot/cli.py +++ b/src/applypilot/cli.py @@ -8,6 +8,8 @@ import typer from rich.console import Console from rich.table import Table +from rich.panel import Panel +from rich import box from applypilot import __version__ @@ -33,6 +35,7 @@ # Helpers # --------------------------------------------------------------------------- + def _bootstrap() -> None: """Common setup: load env, create dirs, init DB.""" from applypilot.config import load_env, ensure_dirs @@ -53,10 +56,13 @@ def _version_callback(value: bool) -> None: # Commands # --------------------------------------------------------------------------- + @app.callback() def main( version: bool = typer.Option( - False, "--version", "-V", + False, + "--version", + "-V", help="Show version and exit.", callback=_version_callback, is_eager=True, @@ -77,11 +83,7 @@ def init() -> None: def run( stages: Optional[list[str]] = typer.Argument( None, - help=( - "Pipeline stages to run. " - f"Valid: {', '.join(VALID_STAGES)}, all. " - "Defaults to 'all' if omitted." - ), + help=(f"Pipeline stages to run. Valid: {', '.join(VALID_STAGES)}, all. Defaults to 'all' if omitted."), ), min_score: int = typer.Option(7, "--min-score", help="Minimum fit score for tailor/cover stages."), workers: int = typer.Option(1, "--workers", "-w", help="Parallel threads for discovery/enrichment stages."), @@ -108,25 +110,20 @@ def run( # Validate stage names for s in stage_list: if s != "all" and s not in VALID_STAGES: - console.print( - f"[red]Unknown stage:[/red] '{s}'. " - f"Valid stages: {', '.join(VALID_STAGES)}, all" - ) + console.print(f"[red]Unknown stage:[/red] '{s}'. Valid stages: {', '.join(VALID_STAGES)}, all") raise typer.Exit(code=1) # Gate AI stages behind Tier 2 llm_stages = {"score", "tailor", "cover"} if any(s in stage_list for s in llm_stages) or "all" in stage_list: from applypilot.config import check_tier + check_tier(2, "AI scoring/tailoring") # Validate the --validation flag value valid_modes = ("strict", "normal", "lenient") if validation not in valid_modes: - console.print( - f"[red]Invalid --validation value:[/red] '{validation}'. " - f"Choose from: {', '.join(valid_modes)}" - ) + console.print(f"[red]Invalid --validation value:[/red] '{validation}'. Choose from: {', '.join(valid_modes)}") raise typer.Exit(code=1) result = run_pipeline( @@ -147,14 +144,18 @@ def apply( limit: Optional[int] = typer.Option(None, "--limit", "-l", help="Max applications to submit."), workers: int = typer.Option(1, "--workers", "-w", help="Number of parallel browser workers."), min_score: int = typer.Option(7, "--min-score", help="Minimum fit score for job selection."), - model: str = typer.Option("haiku", "--model", "-m", help="Claude model name."), + model: Optional[str] = typer.Option(None, "--model", "-m", help="Backend model override."), + agent: Optional[str] = typer.Option(None, "--agent", help="OpenCode agent override."), + backend: Optional[str] = typer.Option(None, "--backend", "-b", help="Backend to use (claude or opencode). Uses APPLY_BACKEND env var or defaults to claude."), continuous: bool = typer.Option(False, "--continuous", "-c", help="Run forever, polling for new jobs."), dry_run: bool = typer.Option(False, "--dry-run", help="Preview actions without submitting."), headless: bool = typer.Option(False, "--headless", help="Run browsers in headless mode."), url: Optional[str] = typer.Option(None, "--url", help="Apply to a specific job URL."), gen: bool = typer.Option(False, "--gen", help="Generate prompt file for manual debugging instead of running."), mark_applied: Optional[str] = typer.Option(None, "--mark-applied", help="Manually mark a job URL as applied."), - mark_failed: Optional[str] = typer.Option(None, "--mark-failed", help="Manually mark a job URL as failed (provide URL)."), + mark_failed: Optional[str] = typer.Option( + None, "--mark-failed", help="Manually mark a job URL as failed (provide URL)." + ), fail_reason: Optional[str] = typer.Option(None, "--fail-reason", help="Reason for --mark-failed."), reset_failed: bool = typer.Option(False, "--reset-failed", help="Reset all failed jobs for retry."), ) -> None: @@ -168,18 +169,21 @@ def apply( if mark_applied: from applypilot.apply.launcher import mark_job + mark_job(mark_applied, "applied") console.print(f"[green]Marked as applied:[/green] {mark_applied}") return if mark_failed: from applypilot.apply.launcher import mark_job + mark_job(mark_failed, "failed", reason=fail_reason) console.print(f"[yellow]Marked as failed:[/yellow] {mark_failed} ({fail_reason or 'manual'})") return if reset_failed: from applypilot.apply.launcher import reset_failed as do_reset + count = do_reset() console.print(f"[green]Reset {count} failed job(s) for retry.[/green]") return @@ -191,10 +195,7 @@ def apply( # Check 2: Profile exists if not _profile_path.exists(): - console.print( - "[red]Profile not found.[/red]\n" - "Run [bold]applypilot init[/bold] to create your profile first." - ) + console.print("[red]Profile not found.[/red]\nRun [bold]applypilot init[/bold] to create your profile first.") raise typer.Exit(code=1) # Check 3: Tailored resumes exist (skip for --gen with --url) @@ -212,11 +213,13 @@ def apply( if gen: from applypilot.apply.launcher import gen_prompt, BASE_CDP_PORT + target = url or "" if not target: console.print("[red]--gen requires --url to specify which job.[/red]") raise typer.Exit(code=1) - prompt_file = gen_prompt(target, min_score=min_score, model=model) + manual_model: str = model if model is not None else "haiku" + prompt_file = gen_prompt(target, min_score=min_score, model=manual_model) if not prompt_file: console.print("[red]No matching job found for that URL.[/red]") raise typer.Exit(code=1) @@ -224,9 +227,7 @@ def apply( console.print(f"[green]Wrote prompt to:[/green] {prompt_file}") console.print(f"\n[bold]Run manually:[/bold]") console.print( - f" claude --model {model} -p " - f"--mcp-config {mcp_path} " - f"--permission-mode bypassPermissions < {prompt_file}" + f" claude --model {manual_model} -p --mcp-config {mcp_path} --permission-mode bypassPermissions < {prompt_file}" ) return @@ -237,7 +238,11 @@ def apply( console.print("\n[bold blue]Launching Auto-Apply[/bold blue]") console.print(f" Limit: {'unlimited' if continuous else effective_limit}") console.print(f" Workers: {workers}") - console.print(f" Model: {model}") + console.print(f" Model: {model or '[backend default]'}") + if backend: + console.print(f" Backend: {backend}") + if agent: + console.print(f" Agent: {agent}") console.print(f" Headless: {headless}") console.print(f" Dry run: {dry_run}") if url: @@ -250,9 +255,11 @@ def apply( min_score=min_score, headless=headless, model=model, + agent=agent, dry_run=dry_run, continuous=continuous, workers=workers, + backend_name=backend, ) @@ -332,13 +339,197 @@ def dashboard() -> None: open_dashboard() + +@app.command() +def analyze( + worker: int = typer.Option(0, "--worker", "-w", help="Worker ID to analyze (default: 0)"), + last: bool = typer.Option(False, "--last", "-l", help="Analyze the most recent run"), + summary: bool = typer.Option(False, "--summary", "-s", help="Show summary only"), +) -> None: + """Analyze a recent apply run: timing, steps, and failures.""" + from pathlib import Path + from datetime import datetime + import json + from applypilot import config + + log_dir = Path(config.LOG_DIR) + + # Find the worker log file + worker_log = log_dir / f"worker-{worker}.log" + audit_log = log_dir / f"worker-{worker}.events.jsonl" + + if not worker_log.exists(): + console.print(f"[red]No worker log found:[/red] {worker_log}") + raise typer.Exit(code=1) + + # Parse the audit log + actions = [] + if audit_log.exists(): + with open(audit_log) as f: + for line in f: + line = line.strip() + if line: + try: + actions.append(json.loads(line)) + except: + pass + + # Parse the worker log for context + with open(worker_log) as f: + log_content = f.read() + + # Extract job info from log header + job_title = "Unknown" + job_url = "Unknown" + job_site = "Unknown" + for line in log_content.split('\n')[:10]: + if '@' in line and 'http' not in line: + job_title = line.split('@')[0].strip().split(']')[-1].strip()[:50] + job_site = line.split('@')[-1].strip().split('\n')[0][:20] + if 'URL:' in line: + job_url = line.split('URL:')[-1].strip()[:60] + + # Analyze timing + if actions: + start_time = datetime.fromisoformat(actions[0]['timestamp']) + end_time = datetime.fromisoformat(actions[-1]['timestamp']) + duration = (end_time - start_time).total_seconds() + else: + duration = 0 + + # Calculate delays between actions + delays = [] + for i in range(1, len(actions)): + t1 = datetime.fromisoformat(actions[i-1]['timestamp']) + t2 = datetime.fromisoformat(actions[i]['timestamp']) + delays.append((t2 - t1).total_seconds()) + + avg_delay = sum(delays) / len(delays) if delays else 0 + max_delay = max(delays) if delays else 0 + + # Check for failure + status = "unknown" + failure_reason = None + if "RESULT:APPLIED" in log_content: + status = "success" + elif "RESULT:CAPTCHA" in log_content or "captcha" in log_content.lower(): + status = "captcha" + failure_reason = "CAPTCHA detected - automatic solving not configured" + elif "RESULT:FAILED" in log_content: + status = "failed" + # Extract failure reason + for line in log_content.split('\n'): + if "RESULT:FAILED" in line: + failure_reason = line.split("RESULT:FAILED")[-1].strip(": ") + break + elif "RESULT:LOGIN_ISSUE" in log_content: + status = "login" + failure_reason = "Login/authentication issue" + elif "RESULT:EXPIRED" in log_content: + status = "expired" + failure_reason = "Job expired or no longer accepting applications" + + # Display results + console.print() + console.print(Panel( + f"[bold blue]{job_title}[/bold blue] @ [cyan]{job_site}[/cyan]\n" + f"[dim]{job_url}[/dim]", + title="Job Application Analysis", + border_style="blue" + )) + + # Status panel + status_colors = { + "success": "green", + "failed": "red", + "captcha": "yellow", + "login": "yellow", + "expired": "dim", + "unknown": "dim" + } + status_text = status.upper() + if status == "success": + status_text = "✓ SUCCESS" + + console.print(Panel( + f"[bold {status_colors.get(status, 'white')}]{status_text}[/bold {status_colors.get(status, 'white')}]\n" + f"Duration: [bold]{duration:.0f}s[/bold] ({len(actions)} actions)\n" + f"Avg delay: [bold]{avg_delay:.1f}s[/bold] | Max delay: [bold]{max_delay:.1f}s[/bold]", + title="Summary", + border_style=status_colors.get(status, "white") + )) + + if failure_reason: + console.print(Panel( + f"[yellow]{failure_reason}[/yellow]", + title="Failure Reason", + border_style="yellow" + )) + + if not summary and actions: + # Show action timeline + console.print() + table = Table(title="Action Timeline", box=box.SIMPLE) + table.add_column("#", style="dim", width=4) + table.add_column("Tool", style="cyan", width=35) + table.add_column("Delay", style="yellow", width=10) + table.add_column("Details", style="white") + + prev_time = None + for i, action in enumerate(actions[:50]): # Limit to 50 actions + ts = datetime.fromisoformat(action['timestamp']) + tool = action.get('tool', 'unknown')[:32] + + if prev_time: + delay = (ts - prev_time).total_seconds() + delay_str = f"{delay:.1f}s" + + # Color code delays + if delay > 10: + delay_str = f"[red]{delay_str}[/red]" + elif delay > 5: + delay_str = f"[yellow]{delay_str}[/yellow]" + else: + delay_str = "-" + + # Extract relevant details + details = "" + inp = action.get('input', {}) + if inp: + if 'url' in inp: + details = f"url: {inp['url'][:40]}..." + elif 'element' in inp: + details = f"element: {str(inp['element'])[:40]}" + elif 'fields' in inp: + details = f"fields: {len(inp['fields'])}" + elif 'text' in inp: + details = f"text: {str(inp['text'])[:40]}" + + table.add_row(str(i+1), tool, delay_str, details) + prev_time = ts + + console.print(table) + + if len(actions) > 50: + console.print(f"[dim]... and {len(actions) - 50} more actions[/dim]") + + console.print() + console.print(f"[dim]Full logs: {worker_log}[/dim]") + console.print(f"[dim]Audit log: {audit_log}[/dim]") + console.print() + @app.command() def doctor() -> None: """Check your setup and diagnose missing requirements.""" import shutil from applypilot.config import ( - load_env, PROFILE_PATH, RESUME_PATH, RESUME_PDF_PATH, - SEARCH_CONFIG_PATH, ENV_PATH, get_chrome_path, + load_env, + PROFILE_PATH, + RESUME_PATH, + RESUME_PDF_PATH, + SEARCH_CONFIG_PATH, + ENV_PATH, + get_chrome_path, ) load_env() @@ -373,13 +564,20 @@ def doctor() -> None: # jobspy (discovery dep installed separately) try: import jobspy # noqa: F401 + results.append(("python-jobspy", ok_mark, "Job board scraping available")) except ImportError: - results.append(("python-jobspy", warn_mark, - "pip install --no-deps python-jobspy && pip install pydantic tls-client requests markdownify regex")) + results.append( + ( + "python-jobspy", + warn_mark, + "pip install --no-deps python-jobspy && pip install pydantic tls-client requests markdownify regex", + ) + ) # --- Tier 2 checks --- import os + has_gemini = bool(os.environ.get("GEMINI_API_KEY")) has_openai = bool(os.environ.get("OPENAI_API_KEY")) has_local = bool(os.environ.get("LLM_URL")) @@ -392,41 +590,47 @@ def doctor() -> None: elif has_local: results.append(("LLM API key", ok_mark, f"Local: {os.environ.get('LLM_URL')}")) else: - results.append(("LLM API key", fail_mark, - "Set GEMINI_API_KEY in ~/.applypilot/.env (run 'applypilot init')")) + results.append(("LLM API key", fail_mark, "Set GEMINI_API_KEY in ~/.applypilot/.env (run 'applypilot init')")) # --- Tier 3 checks --- # Claude Code CLI + # Backend CLIs: check which are installed + opencode_bin = shutil.which("opencode") claude_bin = shutil.which("claude") - if claude_bin: + if opencode_bin: + results.append(("OpenCode CLI", ok_mark, opencode_bin)) + results.append(("Claude Code CLI", warn_mark, "Optional: Claude not required when using OpenCode")) + elif claude_bin: + results.append(("OpenCode CLI", fail_mark, "Install OpenCode CLI for preferred backend (optional)")) results.append(("Claude Code CLI", ok_mark, claude_bin)) else: - results.append(("Claude Code CLI", fail_mark, - "Install from https://claude.ai/code (needed for auto-apply)")) + results.append(("OpenCode CLI", fail_mark, "Install OpenCode CLI and run 'opencode mcp add' to configure MCP")) + results.append(("Claude Code CLI", fail_mark, "Install from https://claude.ai/code ")) # Chrome try: chrome_path = get_chrome_path() results.append(("Chrome/Chromium", ok_mark, chrome_path)) except FileNotFoundError: - results.append(("Chrome/Chromium", fail_mark, - "Install Chrome or set CHROME_PATH env var (needed for auto-apply)")) + results.append( + ("Chrome/Chromium", fail_mark, "Install Chrome or set CHROME_PATH env var (needed for auto-apply)") + ) # Node.js / npx (for Playwright MCP) npx_bin = shutil.which("npx") if npx_bin: results.append(("Node.js (npx)", ok_mark, npx_bin)) else: - results.append(("Node.js (npx)", fail_mark, - "Install Node.js 18+ from nodejs.org (needed for auto-apply)")) + results.append(("Node.js (npx)", fail_mark, "Install Node.js 18+ from nodejs.org (needed for auto-apply)")) # CapSolver (optional) capsolver = os.environ.get("CAPSOLVER_API_KEY") if capsolver: results.append(("CapSolver API key", ok_mark, "CAPTCHA solving enabled")) else: - results.append(("CapSolver API key", "[dim]optional[/dim]", - "Set CAPSOLVER_API_KEY in .env for CAPTCHA solving")) + results.append( + ("CapSolver API key", "[dim]optional[/dim]", "Set CAPSOLVER_API_KEY in .env for CAPTCHA solving") + ) # --- Render results --- console.print() @@ -441,14 +645,19 @@ def doctor() -> None: # Tier summary from applypilot.config import get_tier, TIER_LABELS + tier = get_tier() console.print(f"[bold]Current tier: Tier {tier} — {TIER_LABELS[tier]}[/bold]") if tier == 1: console.print("[dim] → Tier 2 unlocks: scoring, tailoring, cover letters (needs LLM API key)[/dim]") - console.print("[dim] → Tier 3 unlocks: auto-apply (needs Claude Code CLI + Chrome + Node.js)[/dim]") + console.print( + "[dim] → Tier 3 unlocks: auto-apply (needs OpenCode CLI or Claude Code CLI + Chrome + Node.js)[/dim]" + ) elif tier == 2: - console.print("[dim] → Tier 3 unlocks: auto-apply (needs Claude Code CLI + Chrome + Node.js)[/dim]") + console.print( + "[dim] → Tier 3 unlocks: auto-apply (needs OpenCode CLI or Claude Code CLI + Chrome + Node.js)[/dim]" + ) console.print() diff --git a/src/applypilot/config.py b/src/applypilot/config.py index 8c39780..1d8b7bb 100644 --- a/src/applypilot/config.py +++ b/src/applypilot/config.py @@ -44,7 +44,8 @@ def get_chrome_path() -> str: if system == "Windows": candidates = [ Path(os.environ.get("PROGRAMFILES", r"C:\Program Files")) / "Google/Chrome/Application/chrome.exe", - Path(os.environ.get("PROGRAMFILES(X86)", r"C:\Program Files (x86)")) / "Google/Chrome/Application/chrome.exe", + Path(os.environ.get("PROGRAMFILES(X86)", r"C:\Program Files (x86)")) + / "Google/Chrome/Application/chrome.exe", Path(os.environ.get("LOCALAPPDATA", "")) / "Google/Chrome/Application/chrome.exe", ] elif system == "Darwin": @@ -69,9 +70,7 @@ def get_chrome_path() -> str: if found: return found - raise FileNotFoundError( - "Chrome/Chromium not found. Install Chrome or set CHROME_PATH environment variable." - ) + raise FileNotFoundError("Chrome/Chromium not found. Install Chrome or set CHROME_PATH environment variable.") def get_chrome_user_data() -> Path: @@ -94,16 +93,16 @@ def ensure_dirs(): def load_profile() -> dict: """Load user profile from ~/.applypilot/profile.json.""" import json + if not PROFILE_PATH.exists(): - raise FileNotFoundError( - f"Profile not found at {PROFILE_PATH}. Run `applypilot init` first." - ) + raise FileNotFoundError(f"Profile not found at {PROFILE_PATH}. Run `applypilot init` first.") return json.loads(PROFILE_PATH.read_text(encoding="utf-8")) def load_search_config() -> dict: """Load search configuration from ~/.applypilot/searches.yaml.""" import yaml + if not SEARCH_CONFIG_PATH.exists(): # Fall back to package-shipped example example = CONFIG_DIR / "searches.example.yaml" @@ -116,6 +115,7 @@ def load_search_config() -> dict: def load_sites_config() -> dict: """Load sites.yaml configuration (sites list, manual_ats, blocked, etc.).""" import yaml + path = CONFIG_DIR / "sites.yaml" if not path.exists(): return {} @@ -174,6 +174,7 @@ def load_base_urls() -> dict[str, str | None]: def load_env(): """Load environment variables from ~/.applypilot/.env if it exists.""" from dotenv import load_dotenv + if ENV_PATH.exists(): load_dotenv(ENV_PATH) # Also try CWD .env as fallback @@ -210,6 +211,8 @@ def get_tier() -> int: if not has_llm: return 1 + # Support multiple backends for Tier 3: prefer OpenCode, fall back to Claude + has_opencode = shutil.which("opencode") is not None has_claude = shutil.which("claude") is not None try: get_chrome_path() @@ -217,7 +220,7 @@ def get_tier() -> int: except FileNotFoundError: has_chrome = False - if has_claude and has_chrome: + if (has_opencode or has_claude) and has_chrome: return 3 return 2 @@ -235,14 +238,25 @@ def check_tier(required: int, feature: str) -> None: return from rich.console import Console + _console = Console(stderr=True) missing: list[str] = [] if required >= 2 and not any(os.environ.get(k) for k in ("GEMINI_API_KEY", "OPENAI_API_KEY", "LLM_URL")): missing.append("LLM API key — run [bold]applypilot init[/bold] or set GEMINI_API_KEY") if required >= 3: - if not shutil.which("claude"): + # Check which backends are installed + opencode_bin = shutil.which("opencode") + claude_bin = shutil.which("claude") + + # Neither backend present -> actionable guidance for both + if not opencode_bin and not claude_bin: + missing.append("OpenCode CLI — install and run 'opencode mcp add' to configure MCP (preferred)") missing.append("Claude Code CLI — install from [bold]https://claude.ai/code[/bold]") + # OpenCode missing but Claude present -> suggest OpenCode as recommended + elif not opencode_bin and claude_bin: + missing.append("OpenCode CLI (recommended) — install and run 'opencode mcp add' to configure MCP") + # If OpenCode present (regardless of Claude) we don't add missing messages here try: get_chrome_path() except FileNotFoundError: diff --git a/src/applypilot/wizard/init.py b/src/applypilot/wizard/init.py index 0f893c3..f17a924 100644 --- a/src/applypilot/wizard/init.py +++ b/src/applypilot/wizard/init.py @@ -35,6 +35,7 @@ # Resume # --------------------------------------------------------------------------- + def _setup_resume() -> None: """Prompt for resume file and copy into APP_DIR.""" console.print(Panel("[bold]Step 1: Resume[/bold]\nPoint to your master resume file (.txt or .pdf).")) @@ -78,9 +79,14 @@ def _setup_resume() -> None: # Profile # --------------------------------------------------------------------------- + def _setup_profile() -> dict: """Walk through profile questions and return a nested profile dict.""" - console.print(Panel("[bold]Step 2: Profile[/bold]\nTell ApplyPilot about yourself. This powers scoring, tailoring, and auto-fill.")) + console.print( + Panel( + "[bold]Step 2: Profile[/bold]\nTell ApplyPilot about yourself. This powers scoring, tailoring, and auto-fill." + ) + ) profile: dict = {} @@ -109,7 +115,9 @@ def _setup_profile() -> dict: profile["work_authorization"] = { "legally_authorized_to_work": Confirm.ask("Are you legally authorized to work in your target country?"), "require_sponsorship": Confirm.ask("Will you now or in the future need sponsorship?"), - "work_permit_type": Prompt.ask("Work permit type (e.g. Citizen, PR, Open Work Permit — leave blank if N/A)", default=""), + "work_permit_type": Prompt.ask( + "Work permit type (e.g. Citizen, PR, Open Work Permit — leave blank if N/A)", default="" + ), } # -- Compensation -- @@ -128,7 +136,9 @@ def _setup_profile() -> dict: # -- Experience -- console.print("\n[bold cyan]Experience[/bold cyan]") current_title = Prompt.ask("Current/most recent job title", default="") - target_role = Prompt.ask("Target role (what you're applying for, e.g. 'Senior Backend Engineer')", default=current_title) + target_role = Prompt.ask( + "Target role (what you're applying for, e.g. 'Senior Backend Engineer')", default=current_title + ) profile["experience"] = { "years_of_experience_total": Prompt.ask("Years of professional experience", default=""), "education_level": Prompt.ask("Highest education (e.g. Bachelor's, Master's, PhD, Self-taught)", default=""), @@ -184,6 +194,7 @@ def _setup_profile() -> dict: # Search config # --------------------------------------------------------------------------- + def _setup_searches() -> None: """Generate a searches.yaml from user input.""" console.print(Panel("[bold]Step 3: Job Search Config[/bold]\nDefine what you're looking for.")) @@ -195,9 +206,7 @@ def _setup_searches() -> None: except ValueError: distance = 0 - roles_raw = Prompt.ask( - "Target job titles (comma-separated, e.g. 'Backend Engineer, Full Stack Developer')" - ) + roles_raw = Prompt.ask("Target job titles (comma-separated, e.g. 'Backend Engineer, Full Stack Developer')") roles = [r.strip() for r in roles_raw.split(",") if r.strip()] if not roles: @@ -233,13 +242,16 @@ def _setup_searches() -> None: # AI Features # --------------------------------------------------------------------------- + def _setup_ai_features() -> None: """Ask about AI scoring/tailoring — optional LLM configuration.""" - console.print(Panel( - "[bold]Step 4: AI Features (optional)[/bold]\n" - "An LLM powers job scoring, resume tailoring, and cover letters.\n" - "Without this, you can still discover and enrich jobs." - )) + console.print( + Panel( + "[bold]Step 4: AI Features (optional)[/bold]\n" + "An LLM powers job scoring, resume tailoring, and cover letters.\n" + "Without this, you can still discover and enrich jobs." + ) + ) if not Confirm.ask("Enable AI scoring and resume tailoring?", default=True): console.print("[dim]Discovery-only mode. You can configure AI later with [bold]applypilot init[/bold].[/dim]") @@ -279,27 +291,47 @@ def _setup_ai_features() -> None: # Auto-Apply # --------------------------------------------------------------------------- + def _setup_auto_apply() -> None: """Configure autonomous job application (requires Claude Code CLI).""" - console.print(Panel( - "[bold]Step 5: Auto-Apply (optional)[/bold]\n" - "ApplyPilot can autonomously fill and submit job applications\n" - "using Claude Code as the browser agent." - )) + console.print( + Panel( + "[bold]Step 5: Auto-Apply (optional)[/bold]\n" + "ApplyPilot can autonomously fill and submit job applications\n" + "using Claude Code as the browser agent." + ) + ) if not Confirm.ask("Enable autonomous job applications?", default=True): console.print("[dim]You can apply manually using the tailored resumes ApplyPilot generates.[/dim]") return - # Check for Claude Code CLI - if shutil.which("claude"): - console.print("[green]Claude Code CLI detected.[/green]") + # Choose backend + console.print("\nSupported backends: [bold]claude[/bold] (default), [bold]opencode[/bold]") + backend = Prompt.ask("Which backend to use for auto-apply?", choices=["opencode", "claude"], default="opencode") + + if backend == "opencode": + if shutil.which("opencode"): + console.print("[green]OpenCode CLI detected.[/green]") + console.print( + "[dim]Ensure an MCP is configured (e.g. run: opencode mcp add) before running auto-apply.[/dim]" + ) + else: + console.print( + "[yellow]OpenCode CLI not found on PATH.[/yellow]\n" + "Install the OpenCode CLI and configure an MCP (run: [bold]opencode mcp add[/bold]).\n" + "Auto-apply won't work until the OpenCode CLI is installed and an MCP is configured." + ) else: - console.print( - "[yellow]Claude Code CLI not found on PATH.[/yellow]\n" - "Install it from: [bold]https://claude.ai/code[/bold]\n" - "Auto-apply won't work until Claude Code is installed." - ) + # claude selected + if shutil.which("claude"): + console.print("[green]Claude Code CLI detected.[/green]") + else: + console.print( + "[yellow]Claude Code CLI not found on PATH.[/yellow]\n" + "Install it from: [bold]https://claude.ai/code[/bold]\n" + "Auto-apply won't work until Claude Code is installed." + ) # Optional: CapSolver for CAPTCHAs console.print("\n[dim]Some job sites use CAPTCHAs. CapSolver can handle them automatically.[/dim]") @@ -324,6 +356,7 @@ def _setup_auto_apply() -> None: # Main entry # --------------------------------------------------------------------------- + def run_wizard() -> None: """Run the full interactive setup wizard.""" console.print() @@ -385,9 +418,7 @@ def run_wizard() -> None: console.print( Panel.fit( "[bold green]Setup complete![/bold green]\n\n" - f"[bold]Your tier: Tier {tier} — {TIER_LABELS[tier]}[/bold]\n\n" - + "\n".join(tier_lines) - + unlock_hint, + f"[bold]Your tier: Tier {tier} — {TIER_LABELS[tier]}[/bold]\n\n" + "\n".join(tier_lines) + unlock_hint, border_style="green", ) ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2d469fd --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,25 @@ +"""Shared fixtures for ApplyPilot test suite. + +@file conftest.py +@description Provides common fixtures and monkeypatch helpers for offline, + deterministic testing. No live network or API calls. +""" + +from __future__ import annotations + + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_env(monkeypatch, tmp_path): + """Ensure every test gets a clean environment. + + - Removes APPLY_BACKEND so default-selection tests are deterministic. + - Points APPLYPILOT_DIR to a temp directory to avoid touching real data. + - Creates required sub-directories so config.ensure_dirs() is unnecessary. + """ + monkeypatch.delenv("APPLY_BACKEND", raising=False) + monkeypatch.setenv("APPLYPILOT_DIR", str(tmp_path / "applypilot")) + (tmp_path / "applypilot" / "logs").mkdir(parents=True, exist_ok=True) + (tmp_path / "applypilot" / "apply-workers").mkdir(parents=True, exist_ok=True) diff --git a/tests/test_backend_selection.py b/tests/test_backend_selection.py new file mode 100644 index 0000000..7f9e713 --- /dev/null +++ b/tests/test_backend_selection.py @@ -0,0 +1,224 @@ +"""Tests for backend selection via get_backend() factory. + +@file test_backend_selection.py +@description Validates backend routing: default fallback, explicit selection, + case-insensitive names, and actionable error on invalid names. + All tests are offline and deterministic (no subprocess or network). +""" + +from __future__ import annotations + + +import pytest + +from applypilot.apply.backends import ( + DEFAULT_BACKEND, + VALID_BACKENDS, + AgentBackend, + ClaudeBackend, + InvalidBackendError, + OpenCodeBackend, + get_available_backends, + get_backend, + resolve_default_agent, + resolve_default_model, +) + + +# --------------------------------------------------------------------------- +# 1. Default backend when APPLY_BACKEND env var is unset +# --------------------------------------------------------------------------- + + +class TestDefaultBackend: + """get_backend(None) with no env var should return the code default.""" + + def test_returns_claude_when_env_unset(self): + """Default backend is 'claude' per DEFAULT_BACKEND constant.""" + backend = get_backend() + assert isinstance(backend, ClaudeBackend) + assert backend.name == "claude" + + def test_default_constant_is_claude(self): + """DEFAULT_BACKEND constant matches expectations.""" + assert DEFAULT_BACKEND == "claude" + + +# --------------------------------------------------------------------------- +# 2. Explicit backend selection (positional arg) +# --------------------------------------------------------------------------- + + +class TestExplicitSelection: + """get_backend('name') should return the matching backend class.""" + + def test_select_claude_explicitly(self): + backend = get_backend("claude") + assert isinstance(backend, ClaudeBackend) + assert backend.name == "claude" + + def test_select_opencode_explicitly(self): + backend = get_backend("opencode") + assert isinstance(backend, OpenCodeBackend) + assert backend.name == "opencode" + + +# --------------------------------------------------------------------------- +# 3. Case-insensitive and whitespace-trimmed selection +# --------------------------------------------------------------------------- + + +class TestCaseInsensitive: + """Backend names are normalized to lowercase and stripped.""" + + def test_uppercase_claude(self): + backend = get_backend("CLAUDE") + assert isinstance(backend, ClaudeBackend) + + def test_mixed_case_opencode(self): + backend = get_backend("OpenCode") + assert isinstance(backend, OpenCodeBackend) + + def test_padded_whitespace(self): + backend = get_backend(" claude ") + assert isinstance(backend, ClaudeBackend) + + def test_uppercase_with_whitespace(self): + backend = get_backend(" OPENCODE ") + assert isinstance(backend, OpenCodeBackend) + + +# --------------------------------------------------------------------------- +# 4. APPLY_BACKEND environment variable +# --------------------------------------------------------------------------- + + +class TestEnvVarSelection: + """get_backend(None) reads APPLY_BACKEND from env.""" + + def test_env_var_claude(self, monkeypatch): + monkeypatch.setenv("APPLY_BACKEND", "claude") + backend = get_backend() + assert isinstance(backend, ClaudeBackend) + + def test_env_var_opencode(self, monkeypatch): + monkeypatch.setenv("APPLY_BACKEND", "opencode") + backend = get_backend() + assert isinstance(backend, OpenCodeBackend) + + def test_env_var_case_insensitive(self, monkeypatch): + monkeypatch.setenv("APPLY_BACKEND", "OpenCode") + backend = get_backend() + assert isinstance(backend, OpenCodeBackend) + + def test_explicit_arg_overrides_env(self, monkeypatch): + """Explicit argument takes priority over env var.""" + monkeypatch.setenv("APPLY_BACKEND", "opencode") + backend = get_backend("claude") + assert isinstance(backend, ClaudeBackend) + + +# --------------------------------------------------------------------------- +# 5. Invalid backend raises InvalidBackendError with actionable message +# --------------------------------------------------------------------------- + + +class TestInvalidBackend: + """Unsupported names produce clear, helpful errors.""" + + def test_raises_invalid_backend_error(self): + with pytest.raises(InvalidBackendError) as exc_info: + get_backend("nonexistent") + err = exc_info.value + assert err.backend == "nonexistent" + assert err.available == VALID_BACKENDS + + def test_error_message_includes_name(self): + with pytest.raises(InvalidBackendError, match="nonexistent"): + get_backend("nonexistent") + + def test_error_message_lists_supported_backends(self): + with pytest.raises(InvalidBackendError, match="claude") as exc_info: + get_backend("gpt4") + msg = str(exc_info.value) + assert "opencode" in msg + assert "claude" in msg + + def test_error_message_mentions_env_var(self): + with pytest.raises(InvalidBackendError, match="APPLY_BACKEND"): + get_backend("wrong") + + def test_empty_string_raises(self): + with pytest.raises(InvalidBackendError): + get_backend("") + + def test_env_var_invalid_raises(self, monkeypatch): + monkeypatch.setenv("APPLY_BACKEND", "llama") + with pytest.raises(InvalidBackendError, match="llama"): + get_backend() + + +# --------------------------------------------------------------------------- +# 6. Backend interface contract +# --------------------------------------------------------------------------- + + +class TestBackendInterface: + """All backends satisfy the AgentBackend abstract interface.""" + + @pytest.mark.parametrize("name", sorted(VALID_BACKENDS)) + def test_all_valid_backends_instantiate(self, name): + backend = get_backend(name) + assert isinstance(backend, AgentBackend) + assert backend.name == name + + def test_get_available_backends_matches_valid(self): + assert get_available_backends() == VALID_BACKENDS + + def test_claude_has_required_methods(self): + b = ClaudeBackend() + assert callable(b.run_job) + assert callable(b.get_active_proc) + assert hasattr(b, "name") + + def test_opencode_has_required_methods(self): + b = OpenCodeBackend() + assert callable(b.run_job) + assert callable(b.get_active_proc) + assert hasattr(b, "name") + + def test_active_proc_none_by_default(self): + """No process active until run_job called.""" + for name in VALID_BACKENDS: + b = get_backend(name) + assert b.get_active_proc(0) is None + assert b.get_active_proc(99) is None + + +class TestBackendDefaults: + """Backend-aware model and agent default resolution.""" + + def test_default_model_for_claude(self, monkeypatch): + monkeypatch.delenv("APPLY_CLAUDE_MODEL", raising=False) + assert resolve_default_model("claude") == "haiku" + + def test_default_model_for_claude_env_override(self, monkeypatch): + monkeypatch.setenv("APPLY_CLAUDE_MODEL", "sonnet") + assert resolve_default_model("claude") == "sonnet" + + def test_default_model_for_opencode_uses_llm_model(self, monkeypatch): + monkeypatch.delenv("APPLY_OPENCODE_MODEL", raising=False) + monkeypatch.setenv("LLM_MODEL", "gh/claude-sonnet-4.5") + assert resolve_default_model("opencode") == "gh/claude-sonnet-4.5" + + def test_default_model_for_opencode_env_override(self, monkeypatch): + monkeypatch.setenv("APPLY_OPENCODE_MODEL", "o4-mini") + monkeypatch.setenv("LLM_MODEL", "ignored") + assert resolve_default_model("opencode") == "o4-mini" + + def test_default_agent_for_opencode_env(self, monkeypatch): + monkeypatch.setenv("APPLY_OPENCODE_AGENT", "coder") + assert resolve_default_agent("opencode") == "coder" + + def test_default_agent_for_claude(self): + assert resolve_default_agent("claude") is None diff --git a/tests/test_parser_edge_cases.py b/tests/test_parser_edge_cases.py new file mode 100644 index 0000000..dada5aa --- /dev/null +++ b/tests/test_parser_edge_cases.py @@ -0,0 +1,554 @@ +"""Tests for output parsing and runner edge behavior in backends. + +@file test_parser_edge_cases.py +@description Validates that malformed subprocess output, non-JSON lines, + partial RESULT: lines, and process failures produce controlled + status strings (not crashes). Uses mock subprocess to stay offline. +""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from applypilot.apply.backends import BackendError, ClaudeBackend, OpenCodeBackend + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_job(title: str = "Test Engineer", site: str = "testco") -> dict: + """Build a minimal job dict for backend.run_job().""" + return { + "url": "https://example.com/jobs/1", + "title": title, + "site": site, + "application_url": "https://example.com/apply/1", + "tailored_resume_path": None, + "fit_score": 8, + "location": "Remote", + "full_description": "Test job description", + "cover_letter_path": None, + } + + +def _fake_popen(stdout_lines: list[str], returncode: int = 0): + """Create a mock Popen that yields the given stdout lines. + + Returns a mock that behaves like subprocess.Popen: + - stdin.write/close are no-ops + - stdout iterates over provided lines + - wait() returns immediately + - returncode set as specified + - poll() returns returncode + """ + mock_proc = MagicMock(spec=subprocess.Popen) + mock_proc.stdin = MagicMock() + mock_proc.stdout = iter(line + "\n" for line in stdout_lines) + mock_proc.returncode = returncode + mock_proc.wait.return_value = returncode + mock_proc.poll.return_value = returncode + mock_proc.pid = 12345 + return mock_proc + + +# Patch targets — dashboard and config are side-effect-heavy; mock them out +_DASHBOARD_PATCHES = { + "applypilot.apply.dashboard.update_state": MagicMock(), + "applypilot.apply.dashboard.add_event": MagicMock(), + "applypilot.apply.dashboard.get_state": MagicMock(return_value=None), +} + + +def _run_claude_with_output( + stdout_lines: list[str], + returncode: int = 0, + tmp_path: Path | None = None, +) -> tuple[str, int]: + """Run ClaudeBackend.run_job() with mocked subprocess output.""" + backend = ClaudeBackend() + worker_dir = tmp_path or Path("/tmp/test-worker") + mcp_config = worker_dir / "mcp.json" if tmp_path else Path("/tmp/mcp.json") + + mock_proc = _fake_popen(stdout_lines, returncode) + + with ( + patch("subprocess.Popen", return_value=mock_proc), + patch.dict("applypilot.apply.backends.__dict__", {}), + patch("applypilot.apply.dashboard.update_state"), + patch("applypilot.apply.dashboard.add_event"), + patch("applypilot.apply.dashboard.get_state", return_value=None), + ): + return backend.run_job( + job=_make_job(), + port=9222, + worker_id=0, + model="test-model", + agent=None, + dry_run=True, + prompt="test prompt", + mcp_config_path=mcp_config, + worker_dir=worker_dir, + ) + + +def _run_opencode_with_output( + stdout_lines: list[str], + returncode: int = 0, + tmp_path: Path | None = None, +) -> tuple[str, int]: + """Run OpenCodeBackend.run_job() with mocked subprocess output.""" + backend = OpenCodeBackend() + worker_dir = tmp_path or Path("/tmp/test-worker") + mcp_config = worker_dir / "mcp.json" if tmp_path else Path("/tmp/mcp.json") + + mock_proc = _fake_popen(stdout_lines, returncode) + + with ( + patch("subprocess.Popen", return_value=mock_proc), + patch.object(backend, "_find_binary", return_value="/usr/bin/opencode"), + patch("applypilot.apply.dashboard.update_state"), + patch("applypilot.apply.dashboard.add_event"), + patch("applypilot.apply.dashboard.get_state", return_value=None), + ): + return backend.run_job( + job=_make_job(), + port=9222, + worker_id=0, + model="test-model", + agent=None, + dry_run=True, + prompt="test prompt", + mcp_config_path=mcp_config, + worker_dir=worker_dir, + ) + + +# --------------------------------------------------------------------------- +# Claude Backend: Output parsing edge cases +# --------------------------------------------------------------------------- + + +class TestClaudeMalformedOutput: + """ClaudeBackend should never crash on bad output; return controlled status.""" + + def test_empty_output_returns_no_result(self, tmp_path): + """No output at all => failed:no_result_line.""" + status, duration = _run_claude_with_output([], tmp_path=tmp_path) + assert status == "failed:no_result_line" + assert duration >= 0 + + def test_only_garbage_lines(self, tmp_path): + """Non-JSON garbage output => failed:no_result_line (not crash).""" + lines = [ + "this is not json", + "neither is this!!!", + "{{broken json{", + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "failed:no_result_line" + + def test_valid_json_but_no_result_marker(self, tmp_path): + """Valid JSON messages but no RESULT: in text => failed:no_result_line.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Working on it..."}]}, + } + ), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Done, I think."}]}, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "failed:no_result_line" + + def test_result_applied_parsed(self, tmp_path): + """RESULT:APPLIED in output => 'applied' status.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "RESULT:APPLIED"}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 100, "output_tokens": 50}, + "total_cost_usd": 0.01, + "num_turns": 3, + "result": "RESULT:APPLIED", + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "applied" + + def test_result_expired_parsed(self, tmp_path): + """RESULT:EXPIRED in output => 'expired' status.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "RESULT:EXPIRED"}]}, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "expired" + + def test_result_captcha_parsed(self, tmp_path): + """RESULT:CAPTCHA in output => 'captcha' status.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "RESULT:CAPTCHA"}]}, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "captcha" + + def test_result_login_issue_parsed(self, tmp_path): + """RESULT:LOGIN_ISSUE in output => 'login_issue' status.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "RESULT:LOGIN_ISSUE"}]}, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "login_issue" + + def test_result_failed_with_reason(self, tmp_path): + """RESULT:FAILED:some_reason => 'failed:some_reason'.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "RESULT:FAILED:form_not_found"}, + ] + }, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "failed:form_not_found" + + def test_result_failed_reason_promoted_to_captcha(self, tmp_path): + """RESULT:FAILED:captcha promotes to 'captcha' status.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "RESULT:FAILED:captcha"}, + ] + }, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "captcha" + + def test_mixed_json_and_garbage(self, tmp_path): + """Mix of valid JSON and garbage lines — garbage appended, no crash.""" + lines = [ + "garbage before json", + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "RESULT:APPLIED"}]}, + } + ), + "trailing garbage", + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "applied" + + def test_negative_returncode_means_skipped(self, tmp_path): + """Negative return code (signal kill) => 'skipped'.""" + lines = ["some output"] + status, _ = _run_claude_with_output(lines, returncode=-9, tmp_path=tmp_path) + assert status == "skipped" + + def test_result_failed_no_colon_after_failed(self, tmp_path): + """RESULT:FAILED without trailing colon => 'failed:unknown'.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "RESULT:FAILED"}, + ] + }, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status.startswith("failed:") + + def test_result_failed_reason_cleaned_of_markdown(self, tmp_path): + """Trailing markdown chars (*`\") stripped from failure reason.""" + lines = [ + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": 'RESULT:FAILED:timeout_error**"`'}, + ] + }, + } + ), + ] + status, _ = _run_claude_with_output(lines, tmp_path=tmp_path) + assert status == "failed:timeout_error" + + +# --------------------------------------------------------------------------- +# OpenCode Backend: Output parsing edge cases +# --------------------------------------------------------------------------- + + +class TestOpenCodeMalformedOutput: + """OpenCodeBackend should handle bad output identically to Claude.""" + + def test_empty_output_returns_no_result(self, tmp_path): + status, duration = _run_opencode_with_output([], tmp_path=tmp_path) + assert status == "failed:no_result_line" + assert duration >= 0 + + def test_only_garbage_lines(self, tmp_path): + lines = ["not json at all", "!!!broken!!!"] + status, _ = _run_opencode_with_output(lines, tmp_path=tmp_path) + assert status == "failed:no_result_line" + + def test_opencode_text_event_with_result(self, tmp_path): + """OpenCode 'text' event type with RESULT:APPLIED.""" + lines = [ + json.dumps( + { + "type": "text", + "part": {"text": "RESULT:APPLIED"}, + } + ), + ] + status, _ = _run_opencode_with_output(lines, tmp_path=tmp_path) + assert status == "applied" + + def test_opencode_step_finish_token_stats(self, tmp_path): + """step_finish event parsed without crash even with no RESULT.""" + lines = [ + json.dumps( + { + "type": "step_finish", + "part": { + "tokens": {"input": 500, "output": 200, "cache": {"read": 10, "write": 5}}, + "cost": 0.005, + }, + } + ), + ] + status, _ = _run_opencode_with_output(lines, tmp_path=tmp_path) + assert status == "failed:no_result_line" + + def test_opencode_failed_with_reason(self, tmp_path): + lines = [ + json.dumps( + { + "type": "text", + "part": {"text": "RESULT:FAILED:page_load_error"}, + } + ), + ] + status, _ = _run_opencode_with_output(lines, tmp_path=tmp_path) + assert status == "failed:page_load_error" + + def test_opencode_negative_returncode(self, tmp_path): + status, _ = _run_opencode_with_output(["output"], returncode=-15, tmp_path=tmp_path) + assert status == "skipped" + + def test_opencode_tool_use_event_no_crash(self, tmp_path): + """tool_use events processed without crash, even with minimal data.""" + lines = [ + json.dumps( + { + "type": "tool_use", + "part": {"name": "mcp__playwright__navigate", "input": {"url": "https://example.com"}}, + } + ), + json.dumps( + { + "type": "text", + "part": {"text": "RESULT:APPLIED"}, + } + ), + ] + status, _ = _run_opencode_with_output(lines, tmp_path=tmp_path) + assert status == "applied" + + def test_opencode_mixed_garbage_and_events(self, tmp_path): + """Non-JSON lines mixed with valid events — no crash.""" + lines = [ + "warning: something", + json.dumps({"type": "text", "part": {"text": "Working..."}}), + "another warning", + json.dumps({"type": "text", "part": {"text": "RESULT:EXPIRED"}}), + ] + status, _ = _run_opencode_with_output(lines, tmp_path=tmp_path) + assert status == "expired" + + +# --------------------------------------------------------------------------- +# Process failure edge cases (shared patterns) +# --------------------------------------------------------------------------- + + +class TestProcessFailureEdgeCases: + """Backend run_job handles subprocess exceptions gracefully.""" + + def test_claude_popen_exception(self, tmp_path): + """If Popen raises, run_job returns failed: with error text.""" + backend = ClaudeBackend() + with ( + patch("subprocess.Popen", side_effect=OSError("binary not found")), + patch("applypilot.apply.dashboard.update_state"), + patch("applypilot.apply.dashboard.add_event"), + patch("applypilot.apply.dashboard.get_state", return_value=None), + ): + status, duration = backend.run_job( + job=_make_job(), + port=9222, + worker_id=0, + model="test", + agent=None, + dry_run=True, + prompt="test", + mcp_config_path=tmp_path / "mcp.json", + worker_dir=tmp_path, + ) + assert status.startswith("failed:") + assert "binary not found" in status + + def test_opencode_missing_binary(self): + """OpenCodeBackend raises BackendError when opencode not on PATH.""" + from applypilot.apply.backends import BackendError + + backend = OpenCodeBackend() + with patch("shutil.which", return_value=None): + with pytest.raises(BackendError, match="OpenCode CLI not found"): + backend._find_binary() + + def test_opencode_popen_exception(self, tmp_path): + """If Popen raises on opencode, run_job returns failed: error.""" + backend = OpenCodeBackend() + with ( + patch("subprocess.Popen", side_effect=OSError("spawn failed")), + patch.object(backend, "_find_binary", return_value="/usr/bin/opencode"), + patch("applypilot.apply.dashboard.update_state"), + patch("applypilot.apply.dashboard.add_event"), + patch("applypilot.apply.dashboard.get_state", return_value=None), + ): + status, duration = backend.run_job( + job=_make_job(), + port=9222, + worker_id=0, + model="test", + agent=None, + dry_run=True, + prompt="test", + mcp_config_path=tmp_path / "mcp.json", + worker_dir=tmp_path, + ) + assert status.startswith("failed:") + assert "spawn failed" in status + + +class TestOpenCodeMcpParity: + """OpenCode backend enforces MCP baseline parity with Claude flow.""" + + def test_build_command_includes_agent_when_set(self): + backend = OpenCodeBackend() + with patch.object(backend, "_find_binary", return_value="/usr/bin/opencode"): + cmd = backend._build_command("o4-mini", Path("/tmp/w"), "coder") + assert "--agent" in cmd + assert "coder" in cmd + + def test_missing_required_mcp_servers_raises(self): + backend = OpenCodeBackend() + with patch.object(backend, "_list_mcp_servers", return_value={"search"}): + with pytest.raises(BackendError, match="Missing server"): + backend._ensure_required_mcp_servers(["playwright", "gmail"]) + + +class TestPromptParity: + """Both backends receive the exact same launcher-built prompt string.""" + + def test_claude_prompt_forwarded_to_stdin(self, tmp_path): + backend = ClaudeBackend() + prompt = "PROMPT_PAYLOAD_CLAUDE" + mock_proc = _fake_popen(["RESULT:FAILED:manual"], returncode=0) + with ( + patch("subprocess.Popen", return_value=mock_proc), + patch("applypilot.apply.dashboard.update_state"), + patch("applypilot.apply.dashboard.add_event"), + patch("applypilot.apply.dashboard.get_state", return_value=None), + ): + backend.run_job( + job=_make_job(), + port=9222, + worker_id=0, + model="haiku", + agent=None, + dry_run=True, + prompt=prompt, + mcp_config_path=tmp_path / "mcp.json", + worker_dir=tmp_path, + required_mcp_servers=["playwright", "gmail"], + ) + mock_proc.stdin.write.assert_called_once_with(prompt) + + def test_opencode_prompt_forwarded_to_stdin(self, tmp_path): + backend = OpenCodeBackend() + prompt = "PROMPT_PAYLOAD_OPENCODE" + mock_proc = _fake_popen(["RESULT:FAILED:manual"], returncode=0) + with ( + patch("subprocess.Popen", return_value=mock_proc), + patch.object(backend, "_find_binary", return_value="/usr/bin/opencode"), + patch.object(backend, "_list_mcp_servers", return_value={"playwright", "gmail"}), + patch("applypilot.apply.dashboard.update_state"), + patch("applypilot.apply.dashboard.add_event"), + patch("applypilot.apply.dashboard.get_state", return_value=None), + ): + backend.run_job( + job=_make_job(), + port=9222, + worker_id=0, + model="gh/claude-sonnet-4.5", + agent="coder", + dry_run=True, + prompt=prompt, + mcp_config_path=tmp_path / "mcp.json", + worker_dir=tmp_path, + required_mcp_servers=["playwright", "gmail"], + ) + mock_proc.stdin.write.assert_called_once_with(prompt)