diff --git a/CHANGELOG.md b/CHANGELOG.md index e91a94e..45f9cdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 output. With this, essentially the whole CLI works whether or not the daemon is running (the only exceptions: `search games --moves-stats` and the admin-only `players dedup`/`update-game-counts`/`apply-corrections`, which need `--local`). +- **Background activity view** — a new indicator in the header expands into a + panel showing the daemon's whole job pipeline: the active job, the queue + behind it, and recent finishes, across every job type (source syncs, the + scheduled update, and manual maintenance like dedup/index/backup). Background + work runs on the daemon even with the app closed, so this is its always-visible + home; running, interruptible jobs can be cancelled from here. +- **Enabling a source imports it automatically** — turning a source on in + Maintenance → Sources no longer requires pressing "Sync now". The daemon's + scheduler picks up enabled-but-not-yet-synced sources on its next tick (~1 min) + and runs the import in the background, even with the GUI closed. "Sync now" + remains as an optional manual trigger; a sync that fails or is cancelled is not + auto-retried, while one interrupted by a restart resumes. ## [0.4.0] - 2026-06-21 diff --git a/chess-client/src/App.tsx b/chess-client/src/App.tsx index ceb4869..c5f39f1 100644 --- a/chess-client/src/App.tsx +++ b/chess-client/src/App.tsx @@ -12,6 +12,7 @@ import DirectoryBrowser from "./components/local/DirectoryBrowser"; import LocalGameList from "./components/local/LocalGameList"; import HomeEmptyState from "./components/HomeEmptyState"; import UpdateBanner from "./components/UpdateBanner"; +import ActivityIndicator from "./components/ActivityIndicator"; import { loadMyPlayer } from "./components/MyStatsWidget"; import { useUpdateCheck } from "./hooks/useUpdateCheck"; import { GameSummary, LocalGame, MoveStats, PlayerInfo, PrepContext, StatusInfo } from "./types"; @@ -447,6 +448,7 @@ export default function App() { title="Database maintenance" >Maintenance + diff --git a/chess-client/src/api.ts b/chess-client/src/api.ts index f05aa16..eb9c109 100644 --- a/chess-client/src/api.ts +++ b/chess-client/src/api.ts @@ -58,6 +58,12 @@ export async function cancelJob(jobId: string): Promise { await fetch(apiUrl(`/jobs/${encodeURIComponent(jobId)}/cancel`), { method: "POST" }); } +/** The daemon's whole job pipeline (active + queued + finished), newest last — + * what the global Activity view reads. */ +export function getJobs(): Promise { + return apiGet("/jobs"); +} + // ── Schedule (server-owned auto-update) ────────────────────────────────────── export interface ScheduleInfo { @@ -93,7 +99,7 @@ export async function runUpdateNow(): Promise { // ── Sources (multi-source import catalog, #40) ──────────────────────────────── -import type { SourceStatus } from "./types"; +import type { SourceStatus, Job } from "./types"; /** The curated source catalog + this database's state for each. */ export function getSources(): Promise { diff --git a/chess-client/src/components/ActivityIndicator.tsx b/chess-client/src/components/ActivityIndicator.tsx new file mode 100644 index 0000000..b1b8449 --- /dev/null +++ b/chess-client/src/components/ActivityIndicator.tsx @@ -0,0 +1,190 @@ +import { useState, useEffect, useRef } from "react"; +import { getJobs, cancelJob } from "../api"; +import type { Job } from "../types"; + +// ── Global background-activity view (#40 Phase C3) ──────────────────────────── +// +// A header indicator that expands into a panel showing the daemon's whole job +// pipeline — the active job, the queue behind it, and recent finishes — across +// every job type (source syncs, the scheduled update, manual maintenance like +// dedup/index/backup). Background work runs serially on the daemon and even +// continues with the GUI closed, so it deserves one always-visible home. +// Source cards keep their own inline per-source progress (C1); this is the +// global view that also covers the non-source and post-import jobs. + +const POLL_MS = 1500; + +// Catalog source keys → display names, so a sync job reads as the source name. +const SOURCE_NAMES: Record = { + "twic": "The Week in Chess", + "lichess-broadcasts": "Lichess Broadcasts", + "ajedrez-otb": "Ajedrez Data — OTB", +}; + +/** A human label for a job, derived from its type + params. */ +function jobLabel(j: Job): string { + const p = (j.params ?? {}) as Record; + const src = p.source ? SOURCE_NAMES[p.source] ?? p.source : ""; + switch (j.type) { + case "sources_sync": return `Sync ${src}`; + case "sources_set_enabled":return `Update ${src || "source"}`; + case "sources_set_window": return `Window ${src || "source"}`; + case "update": return "Scheduled update"; + case "index_positions": return "Build position index"; + case "dedup_games": return "Deduplicate games"; + case "cleanup": return "Clean up games"; + case "normalise": return "Normalise player names"; + case "import": return "Import"; + case "import_pgn": return p.collection ? `Import PGN → ${p.collection}` : "Import PGN"; + case "players_import": return "Import players"; + case "players_export": return "Export players"; + case "backup": return p.collection ? `Backup ${p.collection}` : "Backup"; + default: return j.type; + } +} + +function pct(j: Job): number { + return j.total > 0 ? Math.min(100, (j.value / j.total) * 100) : 0; +} + +function ActiveRow({ job, onCancel }: { job: Job; onCancel: (id: string) => void }) { + const queued = job.status === "queued"; + const known = job.total > 0; + return ( +
+
+ {jobLabel(job)} + {/* Cancel only running, interruptible jobs — an appender (fast) write + can corrupt the DB if killed mid-flight, so it isn't cancelable. */} + {job.status === "running" && job.interruptible && ( + + )} +
+ {queued ? ( +
Queued
+ ) : ( + <> +
+ {job.message || "Working…"} + {known && {Math.round(pct(job))}%} +
+
+
+
+ + )} +
+ ); +} + +function RecentRow({ job }: { job: Job }) { + const ok = job.status === "done"; + return ( +
+ {ok ? "✓" : "✕"} +
+
{jobLabel(job)}
+ {!ok && job.error &&
{job.error}
} + {ok && job.message &&
{job.message}
} +
+
+ ); +} + +export default function ActivityIndicator() { + const [jobs, setJobs] = useState(null); + const [open, setOpen] = useState(false); + const ref = useRef(null); + + // Poll the pipeline. It's a cheap local read, and the daemon keeps working + // even when this view is closed, so a steady poll keeps the badge honest. + useEffect(() => { + let stop = false; + const poll = () => { getJobs().then((j) => { if (!stop) setJobs(j); }).catch(() => { /* offline — leave last known */ }); }; + poll(); + const id = setInterval(poll, POLL_MS); + return () => { stop = true; clearInterval(id); }; + }, []); + + // Dismiss the panel on an outside click. + useEffect(() => { + if (!open) return; + function onDown(e: MouseEvent) { + if (ref.current && !ref.current.contains(e.target as Node)) setOpen(false); + } + document.addEventListener("mousedown", onDown); + return () => document.removeEventListener("mousedown", onDown); + }, [open]); + + const all = jobs ?? []; + // Submission order (oldest→newest); active oldest-first reads as the pipeline, + // recent newest-first so the latest finish is on top. + const active = all.filter((j) => j.status === "running" || j.status === "queued"); + const recent = all.filter((j) => j.status === "done" || j.status === "error").slice(-8).reverse(); + const busy = active.length > 0; + + function handleCancel(id: string) { + void cancelJob(id); + } + + return ( +
+ + + {open && ( +
+
+ Activity + + {busy ? `${active.length} active` : "Idle"} + +
+ + {jobs === null ? ( +
Loading…
+ ) : active.length === 0 && recent.length === 0 ? ( +
No background activity.
+ ) : ( + <> + {active.length > 0 && ( +
+ {active.map((j) => )} +
+ )} + {recent.length > 0 && ( + <> +
Recent
+
+ {recent.map((j) => )} +
+ + )} + + )} +
+ )} +
+ ); +} diff --git a/chess-client/src/types.ts b/chess-client/src/types.ts index 4eae575..8394802 100644 --- a/chess-client/src/types.ts +++ b/chess-client/src/types.ts @@ -244,3 +244,22 @@ export interface SourceStatus { /** Items (issues/files) imported for this source. */ items: number; } + +// ── Background jobs (the daemon's job pipeline, #40 C3) ──────────────────────── + +/** One job tracked by the daemon's JobManager. Mirrors `GET /jobs`. */ +export interface Job { + id: string; + /** Job kind, e.g. "sources_sync" | "update" | "index_positions" | "backup". */ + type: string; + status: "queued" | "running" | "done" | "error"; + value: number; + total: number; + message: string; + /** False for appender (fast) writes that must not be interrupted. */ + interruptible: boolean; + path?: string; + error?: string; + /** Submission params, used to label a job by what it operates on. */ + params?: Record; +} diff --git a/chess-db/src/jobs.rs b/chess-db/src/jobs.rs index 29a3b47..6523e40 100644 --- a/chess-db/src/jobs.rs +++ b/chess-db/src/jobs.rs @@ -273,6 +273,9 @@ pub struct JobSnapshot { pub path: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, + /// The submission params, so the UI can label a job by what it touches (e.g. + /// which source/collection) and the scheduler can de-dupe by it. + pub params: serde_json::Value, } /// Whether a job uses DuckDB's appender path, which is not crash-safe: killing @@ -300,6 +303,10 @@ struct JobState { pub struct JobSlot { id: String, job_type: String, + /// The job's submission params (e.g. `{ "source": "ajedrez-otb" }`), kept so + /// the scheduler can de-dupe an auto-sync against an in-flight one and the + /// Activity view can label a job by what it operates on. + params: serde_json::Value, interruptible: bool, state: Mutex, events: broadcast::Sender, @@ -320,6 +327,7 @@ impl JobSlot { interruptible: self.interruptible, path: s.path.clone(), error: s.error.clone(), + params: self.params.clone(), } } @@ -407,6 +415,7 @@ impl JobManager { let slot = Arc::new(JobSlot { id: id.clone(), job_type: job_type.clone(), + params: params.clone(), interruptible: !uses_appender(&job_type, ¶ms), state: Mutex::new(JobState { status: "queued".into(), @@ -619,13 +628,34 @@ fn run_job( let dir = crate::source_dir(source_key); std::fs::create_dir_all(&dir)?; let step = reporter.sub_step(); - reporter.log(format!("{}: download", src.name)); - rt.block_on(crate::sources::download_feed(conn, src, None, None, &dir, &step))?; - if reporter.is_cancelled() { return Ok(()); } - reporter.log(format!("{}: import", src.name)); - importer::import(conn, &dir, src.key, src.collection, Some(40), 10, fast, false, &step)?; - crate::sources::record_run(conn, src.key, "ok")?; - reporter.done(format!("{} synced.", src.name)); + let sync = (|| -> Result<()> { + reporter.log(format!("{}: download", src.name)); + rt.block_on(crate::sources::download_feed(conn, src, None, None, &dir, &step))?; + if reporter.is_cancelled() { return Ok(()); } + reporter.log(format!("{}: import", src.name)); + importer::import(conn, &dir, src.key, src.collection, Some(40), 10, fast, false, &step)?; + Ok(()) + })(); + // Record the run's outcome so the enable→auto-sync scheduler doesn't + // re-fire a source that finished or failed (a still-NULL last_run is + // what marks a source as "never synced"). A user cancellation also + // records, so it isn't auto-restarted; only a crash/restart mid-sync + // leaves last_run NULL, so an interrupted sync resumes. Errors still + // propagate (via `?`) so a DuckDB invalidation triggers recovery (#82). + if reporter.is_cancelled() { + let _ = crate::sources::record_run(conn, src.key, "cancelled"); + return Ok(()); + } + match sync { + Ok(()) => { + crate::sources::record_run(conn, src.key, "ok")?; + reporter.done(format!("{} synced.", src.name)); + } + Err(e) => { + let _ = crate::sources::record_run(conn, src.key, &format!("error: {e}")); + return Err(e); + } + } } "import_pgn" => { let path = path_param(p, "path")?; diff --git a/chess-db/src/scheduler.rs b/chess-db/src/scheduler.rs index ce93fdc..053171e 100644 --- a/chess-db/src/scheduler.rs +++ b/chess-db/src/scheduler.rs @@ -54,6 +54,17 @@ pub fn spawn(jobs: Arc, reads: ReadPool, writer: ConnActor) { } async fn tick(jobs: &Arc, reads: &ReadPool, writer: &ConnActor) -> anyhow::Result<()> { + // Enable→auto-sync (#40 C3): independent of the daily update. Enabling a + // source in the Sources screen just sets its flag; the scheduler imports it + // here in the background — so it works even with the GUI closed. Skipped + // while a full update is in flight, since that update syncs every enabled + // feed itself and would otherwise double-import the same source. + if !update_in_flight(jobs) { + if let Err(e) = auto_sync_pending(jobs, reads).await { + eprintln!("scheduler: auto-sync: {e:#}"); + } + } + let s = read_schedule(reads).await?; // 1. If a run is recorded as in progress, settle it once the job finishes. @@ -125,6 +136,36 @@ fn update_in_flight(jobs: &Arc) -> bool { .any(|j| j.job_type == "update" && (j.status == "running" || j.status == "queued")) } +/// Submit a background `sources_sync` for every enabled-but-never-synced source, +/// skipping any that already has a sync queued or running (matched by its +/// `source` param). De-duping this way keeps repeated ticks from piling up +/// duplicate jobs while a long first import is still running. +async fn auto_sync_pending(jobs: &Arc, reads: &ReadPool) -> anyhow::Result<()> { + let candidates = reads.run(crate::sources::auto_sync_candidates).await?; + if candidates.is_empty() { + return Ok(()); + } + let in_flight = sources_sync_in_flight(jobs); + for src in candidates { + if in_flight.contains(src.key) { + continue; + } + // The same job the Sources screen's "Sync now" submits (transactional + // import — crash-safe, since this runs unattended). + jobs.submit("sources_sync".into(), serde_json::json!({ "source": src.key })); + } + Ok(()) +} + +/// Source keys with a `sources_sync` job currently queued or running. +fn sources_sync_in_flight(jobs: &Arc) -> std::collections::HashSet { + jobs.list() + .into_iter() + .filter(|j| j.job_type == "sources_sync" && (j.status == "queued" || j.status == "running")) + .filter_map(|j| j.params.get("source").and_then(|v| v.as_str()).map(str::to_owned)) + .collect() +} + // ── Clock math (all in local wall-clock time) ───────────────────────────────── fn parse_dt(s: &str) -> Option { diff --git a/chess-db/src/sources/mod.rs b/chess-db/src/sources/mod.rs index 4415191..85ca16d 100644 --- a/chess-db/src/sources/mod.rs +++ b/chess-db/src/sources/mod.rs @@ -337,6 +337,25 @@ pub fn enabled_feeds(conn: &Connection) -> Result> { .collect()) } +/// Enabled sources that have never recorded a sync run (`last_run IS NULL`) — the +/// ones the scheduler auto-imports in the background once they're enabled (#40 +/// C3). A source that synced ok, failed, or was cancelled has `last_run` set, so +/// it is *not* auto-retried (manual "Sync now" stays available); a sync +/// interrupted by a crash/restart never recorded a run, so it resumes. +pub fn auto_sync_candidates(conn: &Connection) -> Result> { + let pending: HashSet = { + let mut stmt = + conn.prepare("SELECT key FROM sources WHERE enabled = TRUE AND last_run IS NULL")?; + stmt.query_map([], |r| r.get::<_, String>(0))? + .filter_map(|r| r.ok()) + .collect() + }; + Ok(CATALOG + .iter() + .filter(|s| pending.contains(s.key) && Feed::for_key(s.key).is_some()) + .collect()) +} + /// Enable or disable a source, creating its state row on first use. pub fn set_enabled(conn: &Connection, key: &str, enabled: bool) -> Result<()> { if get(key).is_none() { @@ -580,3 +599,36 @@ mod window_tests { assert!(w.admits(None)); } } + +#[cfg(test)] +mod auto_sync_tests { + use super::*; + use duckdb::Connection; + + fn keys(conn: &Connection) -> Vec<&'static str> { + auto_sync_candidates(conn).unwrap().iter().map(|s| s.key).collect() + } + + #[test] + fn candidates_are_enabled_and_never_run() { + let conn = Connection::open_in_memory().unwrap(); + crate::db::schema::init(&conn).unwrap(); + + // Seeded state: only TWIC is enabled, and nothing has synced yet. + assert_eq!(keys(&conn), vec!["twic"], "the enabled, never-synced source"); + + // Enabling another joins the set; recording TWIC's run drops it out. + set_enabled(&conn, "ajedrez-otb", true).unwrap(); + record_run(&conn, "twic", "ok").unwrap(); + assert_eq!(keys(&conn), vec!["ajedrez-otb"], "synced excluded, newly enabled included"); + + // A failed (or cancelled) run also sets last_run, so it is NOT + // auto-retried — manual "Sync now" remains the way to try again. + record_run(&conn, "ajedrez-otb", "error: boom").unwrap(); + assert!(keys(&conn).is_empty(), "errored source is not auto-retried"); + + // Disabled sources are never candidates, even with a NULL last_run. + set_enabled(&conn, "lichess-broadcasts", false).unwrap(); + assert!(!keys(&conn).contains(&"lichess-broadcasts")); + } +}