diff --git a/src/web/client_registry.gleam b/src/web/client_registry.gleam new file mode 100644 index 0000000..7a5b3de --- /dev/null +++ b/src/web/client_registry.gleam @@ -0,0 +1,313 @@ +//// Per-client outbox registry. Owns the `Outbox` for every +//// distinct WebSocket `client_id`, surviving WS process death so +//// reconnects under the same `client_id` can replay frames the +//// previous WS process emitted. +//// +//// **Threading model.** Single actor; all reads/writes go through +//// it. Per-client outboxes are mutated in turn. The actor's +//// `Append` / `Ack` / `ReplaySince` are blocking-RPC calls (they +//// `process.call` and the WS handler waits for the result) — fine +//// at single-operator scale; would want sharding past hundreds of +//// concurrent clients. +//// +//// **Lifecycle.** A client_id's outbox is created on first +//// `Append` and persists indefinitely. A periodic `Janitor` tick +//// prunes outboxes whose last activity was over an hour ago — no +//// chance the original tab is still listening. +//// +//// **What's a "frame".** The opaque JSON body of a server message +//// (without the `seq` prefix). The registry assigns the seq and +//// returns it; the WS handler is responsible for splicing the seq +//// into the wire JSON. This keeps the registry oblivious to +//// protocol shape. + +// Copyright (C) 2026 Seamus Brady +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +import gleam/dict.{type Dict} +import gleam/erlang/process.{type Subject} +import gleam/option.{type Option, None} +import slog +import web/outbox.{type Outbox, type ReplayResult} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +pub type ClientId = + String + +pub type RegistryMessage { + /// Append a server-bound JSON body for a client_id. Reply carries + /// the assigned seq. + Append(client_id: ClientId, body_json: String, reply_to: Subject(Int)) + /// Mark a client's frames up to `seq` acked. Best-effort; reply is + /// just an ack (so the caller can know it landed) but doesn't + /// carry data. + Ack(client_id: ClientId, up_to: Int) + /// Replay everything past `since` for a reconnecting client. + ReplaySince(client_id: ClientId, since: Int, reply_to: Subject(ReplayResult)) + /// Read the current snapshot for diagnostics: last_seq, acked_seq, + /// pending_count. + Stats(client_id: ClientId, reply_to: Subject(StatsSnapshot)) + /// Periodic age-prune across every outbox. Sent by janitor timer. + JanitorTick + /// Drop a client's state. Used when the operator's session is + /// definitively over (tab close on the last connection holding + /// the id, etc.). + Forget(client_id: ClientId) + /// Stop the actor. Tests only. + Shutdown +} + +pub type StatsSnapshot { + StatsSnapshot( + exists: Bool, + last_seq: Int, + acked_seq: Int, + pending_count: Int, + oldest_kept: Option(Int), + ) +} + +/// Public handle on the actor. +pub opaque type Registry { + Registry(subject: Subject(RegistryMessage)) +} + +pub fn subject(r: Registry) -> Subject(RegistryMessage) { + r.subject +} + +// --------------------------------------------------------------------------- +// Lifecycle +// --------------------------------------------------------------------------- + +const janitor_tick_ms: Int = 60_000 + +const idle_drop_age_ms: Int = 3_600_000 + +@external(erlang, "springdrift_ffi", "monotonic_now_ms") +fn now_ms() -> Int + +pub fn start() -> Registry { + let setup: Subject(Subject(RegistryMessage)) = process.new_subject() + process.spawn_unlinked(fn() { + let self: Subject(RegistryMessage) = process.new_subject() + process.send(setup, self) + let _ = process.send_after(self, janitor_tick_ms, JanitorTick) + loop(self, State(outboxes: dict.new(), last_touch: dict.new())) + }) + case process.receive(setup, 5000) { + Ok(s) -> Registry(subject: s) + Error(_) -> panic as "ClientRegistry failed to start within 5s" + } +} + +pub fn shutdown(r: Registry) -> Nil { + process.send(r.subject, Shutdown) +} + +// --------------------------------------------------------------------------- +// Convenience wrappers — typed RPCs +// --------------------------------------------------------------------------- + +/// Append a body, return the assigned seq. Blocks briefly on the +/// registry actor; the WS handler then splices the seq into the +/// JSON it sends over the wire. +pub fn append(r: Registry, client_id: ClientId, body_json: String) -> Int { + let reply: Subject(Int) = process.new_subject() + process.send(r.subject, Append(client_id:, body_json:, reply_to: reply)) + case process.receive(reply, 5000) { + Ok(seq) -> seq + Error(_) -> 0 + } +} + +pub fn ack(r: Registry, client_id: ClientId, up_to: Int) -> Nil { + process.send(r.subject, Ack(client_id:, up_to:)) +} + +pub fn replay_since( + r: Registry, + client_id: ClientId, + since: Int, +) -> ReplayResult { + let reply: Subject(ReplayResult) = process.new_subject() + process.send(r.subject, ReplaySince(client_id:, since:, reply_to: reply)) + case process.receive(reply, 5000) { + Ok(r) -> r + Error(_) -> outbox.UpToDate + } +} + +pub fn stats(r: Registry, client_id: ClientId) -> StatsSnapshot { + let reply: Subject(StatsSnapshot) = process.new_subject() + process.send(r.subject, Stats(client_id:, reply_to: reply)) + case process.receive(reply, 5000) { + Ok(s) -> s + Error(_) -> + StatsSnapshot( + exists: False, + last_seq: 0, + acked_seq: 0, + pending_count: 0, + oldest_kept: None, + ) + } +} + +pub fn forget(r: Registry, client_id: ClientId) -> Nil { + process.send(r.subject, Forget(client_id:)) +} + +// --------------------------------------------------------------------------- +// Internal — actor state + loop +// --------------------------------------------------------------------------- + +type State { + State(outboxes: Dict(ClientId, Outbox), last_touch: Dict(ClientId, Int)) +} + +fn loop(self: Subject(RegistryMessage), state: State) -> Nil { + let selector = + process.new_selector() + |> process.select(self) + let msg = process.selector_receive_forever(selector) + case msg { + Shutdown -> Nil + _ -> { + let next = handle(msg, state, self) + loop(self, next) + } + } +} + +fn handle( + msg: RegistryMessage, + state: State, + self: Subject(RegistryMessage), +) -> State { + case msg { + Shutdown -> state + + Append(client_id:, body_json:, reply_to:) -> { + let now = now_ms() + let existing = case dict.get(state.outboxes, client_id) { + Ok(o) -> o + Error(_) -> outbox.new() + } + let #(updated, seq) = outbox.append(existing, body_json, now) + process.send(reply_to, seq) + State( + outboxes: dict.insert(state.outboxes, client_id, updated), + last_touch: dict.insert(state.last_touch, client_id, now), + ) + } + + Ack(client_id:, up_to:) -> { + case dict.get(state.outboxes, client_id) { + Error(_) -> state + Ok(o) -> { + let acked = outbox.ack(o, up_to) + State( + outboxes: dict.insert(state.outboxes, client_id, acked), + last_touch: dict.insert(state.last_touch, client_id, now_ms()), + ) + } + } + } + + ReplaySince(client_id:, since:, reply_to:) -> { + let result = case dict.get(state.outboxes, client_id) { + Ok(o) -> outbox.replay_since(o, since) + Error(_) -> outbox.UpToDate + } + process.send(reply_to, result) + State( + ..state, + last_touch: dict.insert(state.last_touch, client_id, now_ms()), + ) + } + + Stats(client_id:, reply_to:) -> { + let snap = case dict.get(state.outboxes, client_id) { + Ok(o) -> + StatsSnapshot( + exists: True, + last_seq: outbox.last_seq(o), + acked_seq: outbox.acked_seq(o), + pending_count: outbox.pending_count(o), + oldest_kept: outbox.oldest_kept_seq(o), + ) + Error(_) -> + StatsSnapshot( + exists: False, + last_seq: 0, + acked_seq: 0, + pending_count: 0, + oldest_kept: None, + ) + } + process.send(reply_to, snap) + state + } + + JanitorTick -> { + let now = now_ms() + let cutoff = now - idle_drop_age_ms + // Age-prune every outbox. + let pruned_outboxes = + dict.map_values(state.outboxes, fn(_, o) { outbox.prune_age(o, now) }) + // Drop clients whose last activity is past the idle cutoff. + let active_ids = + dict.fold(state.last_touch, [], fn(acc, k, t) { + case t < cutoff { + True -> acc + False -> [k, ..acc] + } + }) + let active_set = active_ids + let dropped = + dict.filter(pruned_outboxes, fn(k, _) { + case list_contains(active_set, k) { + True -> True + False -> { + slog.debug( + "client_registry", + "janitor", + "dropping idle outbox " <> k, + option.None, + ) + False + } + } + }) + let kept_touch = + dict.filter(state.last_touch, fn(k, _) { list_contains(active_set, k) }) + let _ = process.send_after(self, janitor_tick_ms, JanitorTick) + State(outboxes: dropped, last_touch: kept_touch) + } + + Forget(client_id:) -> + State( + outboxes: dict.delete(state.outboxes, client_id), + last_touch: dict.delete(state.last_touch, client_id), + ) + } +} + +fn list_contains(xs: List(String), x: String) -> Bool { + case xs { + [] -> False + [h, ..rest] -> + case h == x { + True -> True + False -> list_contains(rest, x) + } + } +} diff --git a/src/web/gui.gleam b/src/web/gui.gleam index d29a650..b52cc55 100644 --- a/src/web/gui.gleam +++ b/src/web/gui.gleam @@ -53,7 +53,9 @@ import skills/proposal_log import slog import tools/knowledge as knowledge_tools import web/auth +import web/client_registry import web/html +import web/outbox import web/protocol // --------------------------------------------------------------------------- @@ -65,8 +67,26 @@ type WsMsg { GotDelivery(Delivery) GotNotification(agent_types.Notification) SendHistory(String) + /// Replay payload from the outbox registry — emitted once during + /// `ws_on_init` when the client reconnected with `?since=N` and + /// the registry confirmed it can replay the gap. The handler emits + /// each frame with its *original* seq (not a fresh one) so the + /// client's local seq cursor advances correctly. + ReplayReady(outbox.ReplayResult) + /// Keepalive tick — server sends a Ping frame and reschedules + /// itself. If the connection has gone silent (idle proxy timeout, + /// OS-level NAT close, sleeping laptop) the write fails and the + /// WS goes through `mist.Closed`; the client reconnects fast + /// instead of waiting on its own timeouts. + KeepaliveTick } +/// Server-driven keepalive interval. Anthropic's free LLM cycles +/// can run several minutes; idle proxies and OS-level NAT often +/// drop a TCP connection that's seen no traffic for 30-60s. 25s +/// keeps us comfortably below those windows. +const keepalive_tick_ms: Int = 25_000 + // --------------------------------------------------------------------------- // Notification relay — main process forwards to per-connection subjects // --------------------------------------------------------------------------- @@ -101,9 +121,59 @@ type WsState { /// Frontdoor and the reply routes back to this browser's /// delivery sink. source_id: String, + /// Stable client_id from the browser's localStorage. Survives + /// refresh, mid-cycle WS blips, sleep/resume. Keys the per-client + /// outbox so reconnects under the same id can replay frames the + /// previous WS process emitted. + client_id: String, + /// Per-client outbox registry. Every server-bound frame goes + /// through `ws_send` which calls `client_registry.append` to get + /// the seq, then mist.send_text_frame ships it. Acks from the + /// client prune the outbox; on reconnect the new WS process + /// replays missed frames. + registry: client_registry.Registry, + /// Self-feed for the keepalive timer. The handler re-arms it + /// each tick via process.send_after. + keepalive_subject: Subject(Nil), ) } +// --------------------------------------------------------------------------- +// Wire-level send helper +// --------------------------------------------------------------------------- + +/// Send a server message over the WS, going through the per-client +/// outbox. The body JSON is appended to the outbox under this +/// connection's client_id, the registry assigns a monotonic seq, and +/// the seq-prefixed JSON is shipped over the wire. On disconnect the +/// outbox survives in the registry; on reconnect under the same +/// client_id, the new WS process can `replay_since` to fill the gap. +/// +/// `Ping` is the lone exception — it's flow control, not a payload, +/// and shouldn't sit in the outbox waiting for replay. +fn ws_send( + state: WsState, + conn: mist.WebsocketConnection, + msg: protocol.ServerMessage, +) -> Nil { + case msg { + protocol.Ping -> { + let _ = + mist.send_text_frame( + conn, + protocol.encode_server_message_with_seq(0, msg), + ) + Nil + } + _ -> { + let body = protocol.encode_server_message_body(msg) + let seq = client_registry.append(state.registry, state.client_id, body) + let _ = mist.send_text_frame(conn, protocol.splice_seq(seq, body)) + Nil + } + } +} + // --------------------------------------------------------------------------- // FFI // --------------------------------------------------------------------------- @@ -154,6 +224,9 @@ pub fn start( auth.RefuseToStart(_) -> #(None, True) } let relay: Subject(RelayMsg) = process.new_subject() + // Per-client outbox registry. Lives for the GUI process's lifetime; + // each WS connection looks up / creates its outbox by client_id. + let registry = client_registry.start() let auth_label = case auth_token { Some(_) -> "with bearer auth" None -> "WITHOUT auth (localhost-only opt-out)" @@ -170,6 +243,7 @@ pub fn start( req, cognitive, relay, + registry, initial_messages, narrative_dir, auth_token, @@ -205,6 +279,7 @@ fn handle_request( req: Request(Connection), cognitive: Subject(agent_types.CognitiveMessage), relay: Subject(RelayMsg), + registry: client_registry.Registry, initial_messages: List(Message), narrative_dir: String, auth_token: Option(String), @@ -229,6 +304,7 @@ fn handle_request( req, cognitive, relay, + registry, initial_messages, narrative_dir, auth_token, @@ -249,6 +325,7 @@ fn handle_authenticated_request( req: Request(Connection), cognitive: Subject(agent_types.CognitiveMessage), relay: Subject(RelayMsg), + registry: client_registry.Registry, initial_messages: List(Message), narrative_dir: String, auth_token: Option(String), @@ -353,12 +430,29 @@ fn handle_authenticated_request( } Error(_) -> option.None } + // ?since=N — client tells us "I have applied every frame + // through seq N; replay everything after". Absent or 0 + // means a fresh connection (no replay) and we send a full + // session_history rebuild. + let since = case request.get_query(req) { + Ok(params) -> + case list.key_find(params, "since") { + Ok(s) -> + case int.parse(s) { + Ok(n) -> n + Error(_) -> 0 + } + Error(_) -> 0 + } + Error(_) -> 0 + } mist.websocket( request: req, on_init: fn(_conn) { ws_on_init( cognitive, relay, + registry, initial_messages, narrative_dir, lib, @@ -366,6 +460,7 @@ fn handle_authenticated_request( scheduler, frontdoor, client_id, + since, ) }, on_close: fn(state) { @@ -394,6 +489,7 @@ fn handle_authenticated_request( fn ws_on_init( cognitive: Subject(agent_types.CognitiveMessage), relay: Subject(RelayMsg), + registry: client_registry.Registry, initial_messages: List(Message), narrative_dir: String, lib: Option(Subject(LibrarianMessage)), @@ -401,21 +497,26 @@ fn ws_on_init( scheduler: Option(Subject(scheduler_types.SchedulerMessage)), frontdoor: Subject(FrontdoorMessage), client_id: option.Option(String), + since: Int, ) -> #(WsState, option.Option(Selector(WsMsg))) { // Per-connection subjects owned by this WebSocket handler process. let delivery_subject: Subject(Delivery) = process.new_subject() let notify_subject: Subject(agent_types.Notification) = process.new_subject() let history_subject: Subject(String) = process.new_subject() - - // source_id derived from the browser's stable client_id when supplied, - // so a refresh / reconnect under the same browser reuses the same - // Frontdoor routing key. Without client_id we fall back to a fresh - // per-socket UUID for backward compat — legacy clients keep working - // but lose the cross-reconnect benefits. - let source_id = case client_id { - option.Some(id) -> "ws:" <> id - option.None -> "ws:" <> generate_uuid() + let replay_subject: Subject(outbox.ReplayResult) = process.new_subject() + + // Resolve the stable client_id; fall back to a fresh UUID for + // legacy clients without localStorage support. The fallback breaks + // outbox replay (no continuity across reconnects), but the + // connection still works otherwise. + let resolved_client_id = case client_id { + option.Some(id) -> id + option.None -> generate_uuid() } + // source_id is the Frontdoor routing key. We prefix "ws:" so + // Frontdoor's Subscribe / pending tables aren't keyed on bare UUIDs + // (avoids collisions with hypothetical other source-id schemes). + let source_id = "ws:" <> resolved_client_id // Subscribe this connection with Frontdoor. Every reply / question for // a cycle claimed by this source_id will arrive on `delivery_subject`. @@ -427,11 +528,19 @@ fn ws_on_init( // Register for broadcast notifications (tool-calling, status, affect, etc.). process.send(relay, Register(notify_subject)) + // Keepalive subject — periodic Ping ticks come back here so the + // selector can route them to the handler. process.send_after is + // re-armed on each tick by the handler. + let keepalive_subject: Subject(Nil) = process.new_subject() + let _ = process.send_after(keepalive_subject, keepalive_tick_ms, Nil) + let selector: Selector(WsMsg) = process.new_selector() |> process.select_map(delivery_subject, fn(d) { GotDelivery(d) }) |> process.select_map(notify_subject, fn(n) { GotNotification(n) }) |> process.select_map(history_subject, fn(h) { SendHistory(h) }) + |> process.select_map(replay_subject, fn(r) { ReplayReady(r) }) + |> process.select_map(keepalive_subject, fn(_) { KeepaliveTick }) let state = WsState( @@ -445,9 +554,54 @@ fn ws_on_init( scheduler:, ws_max_bytes:, source_id:, + client_id: resolved_client_id, + registry:, + keepalive_subject:, ) - // Query live messages from the cognitive loop (not the static boot snapshot) + // Decide what to send first: + // - since > 0 AND outbox can replay → ReplayReady drives the WS + // forward through the gap (no fresh session_history needed) + // - since == 0 OR replay says TooOld → fresh session_history + // rebuild so the client repopulates from authoritative state + case since > 0 { + True -> { + let r = client_registry.replay_since(registry, resolved_client_id, since) + case r { + outbox.UpToDate -> { + // Client is fully up to date. Just emit history so the + // chat tab repopulates if it needed to (e.g. fresh tab + // joined an existing client_id session). + spawn_history_query(cognitive, initial_messages, history_subject) + } + outbox.Replay(_) -> { + // Replay path: deliver the gap frames directly via the + // ReplayReady selector branch. ws_handler emits each + // frame's body with its original seq. + process.send(replay_subject, r) + } + outbox.TooOld(_) -> { + // Gap too wide; fall back to session_history. Client's + // local state is wiped + rebuilt, no replay attempted. + spawn_history_query(cognitive, initial_messages, history_subject) + } + } + } + False -> spawn_history_query(cognitive, initial_messages, history_subject) + } + + #(state, Some(selector)) +} + +/// Async helper — fires a GetMessages at cognitive and pushes the +/// JSON onto the per-WS history channel. Pulled out of `ws_on_init` +/// because it's the same shape whether we're starting fresh or +/// falling back from a too-old replay. +fn spawn_history_query( + cognitive: Subject(agent_types.CognitiveMessage), + initial_messages: List(Message), + history_subject: Subject(String), +) -> Nil { process.spawn_unlinked(fn() { process.sleep(50) let msg_subject: Subject(List(Message)) = process.new_subject() @@ -455,7 +609,6 @@ fn ws_on_init( let selector = process.new_selector() |> process.select(msg_subject) - // Wait up to 2s for cognitive loop to respond let live_messages = case process.selector_receive(selector, 2000) { Ok(msgs) -> msgs Error(_) -> initial_messages @@ -475,8 +628,7 @@ fn ws_on_init( ) process.send(history_subject, messages_json) }) - - #(state, Some(selector)) + Nil } fn ws_handler( @@ -502,22 +654,17 @@ fn ws_handler( case client_msg_id { option.Some(id) -> { let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.UserMessageAck( - client_msg_id: id, - )), + protocol.UserMessageAck(client_msg_id: id), ) Nil } option.None -> Nil } // Send thinking indicator - let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.Thinking), - ) + let _ = ws_send(state, conn, protocol.Thinking) // If the operator attached files since their last // message, inline them as an // block so the agent sees the file paths as part of the @@ -543,11 +690,7 @@ fn ws_handler( } Ok(protocol.RequestLogData) -> { let entries = slog.load_entries() - let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.LogData(entries:)), - ) + let _ = ws_send(state, conn, protocol.LogData(entries:)) mist.continue(state) } Ok(protocol.RequestNarrativeData) -> { @@ -562,21 +705,14 @@ fn ws_handler( } let entries_json = json.to_string(json.array(entries, narrative_log.encode_entry)) - let msg = - protocol.encode_server_message(protocol.NarrativeData( - entries_json:, - )) slog.info( "web/gui", "narrative", - "Sending " - <> int.to_string(list.length(entries)) - <> " entries (" - <> int.to_string(string.byte_size(msg)) - <> " bytes)", + "Sending " <> int.to_string(list.length(entries)) <> " entries", None, ) - let _ = mist.send_text_frame(conn, msg) + let _ = + ws_send(state, conn, protocol.NarrativeData(entries_json:)) mist.continue(state) } Ok(protocol.RequestSchedulerData) -> { @@ -595,12 +731,7 @@ fn ws_handler( scheduler_types.encode_job, )) let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.SchedulerData( - jobs_json:, - )), - ) + ws_send(state, conn, protocol.SchedulerData(jobs_json:)) Nil } Error(_) -> Nil @@ -626,11 +757,10 @@ fn ws_handler( let cycles_json = json.to_string(json.array(cycles, encode_cycle_node)) let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message( - protocol.SchedulerCyclesData(cycles_json:), - ), + protocol.SchedulerCyclesData(cycles_json:), ) Nil } @@ -651,12 +781,10 @@ fn ws_handler( let endeavours_json = json.to_string(json.array(endeavours, encode_endeavour)) let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.PlannerData( - tasks_json:, - endeavours_json:, - )), + protocol.PlannerData(tasks_json:, endeavours_json:), ) Nil } @@ -683,12 +811,7 @@ fn ws_handler( encode_dprime_gate, )) let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.DprimeData( - gates_json:, - )), - ) + ws_send(state, conn, protocol.DprimeData(gates_json:)) Nil } Error(_) -> Nil @@ -715,12 +838,7 @@ fn ws_handler( False -> base_json } let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.DprimeConfigData( - config_json:, - )), - ) + ws_send(state, conn, protocol.DprimeConfigData(config_json:)) mist.continue(state) } Ok(protocol.RequestCommsData) -> { @@ -757,13 +875,7 @@ fn ws_handler( ]) }), ) - let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.CommsData( - messages_json:, - )), - ) + let _ = ws_send(state, conn, protocol.CommsData(messages_json:)) mist.continue(state) } Ok(protocol.RequestAffectData) -> { @@ -773,13 +885,7 @@ fn ws_handler( snapshots, affect_types.encode_snapshot, )) - let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.AffectData( - snapshots_json:, - )), - ) + let _ = ws_send(state, conn, protocol.AffectData(snapshots_json:)) mist.continue(state) } Ok(protocol.RequestHistoryIndex) -> { @@ -787,13 +893,7 @@ fn ws_handler( // per-day summaries: count, last activity, one-line headline. // Newest day first. let days_json = history_index_json(state.narrative_dir) - let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.HistoryIndex( - days_json:, - )), - ) + let _ = ws_send(state, conn, protocol.HistoryIndex(days_json:)) mist.continue(state) } Ok(protocol.RequestHistoryDay(date:)) -> { @@ -801,13 +901,7 @@ fn ws_handler( let entries_json = json.to_string(json.array(entries, narrative_log.encode_entry)) let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.HistoryDay( - date:, - entries_json:, - )), - ) + ws_send(state, conn, protocol.HistoryDay(date:, entries_json:)) mist.continue(state) } Ok(protocol.RequestSkillsData) -> { @@ -852,12 +946,10 @@ fn ws_handler( proposal_log.load_lines_for_date(paths.skills_log_dir(), today) let log_json = "[" <> string.join(log_lines, ",") <> "]" let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.SkillsData( - skills_json:, - log_json:, - )), + protocol.SkillsData(skills_json:, log_json:), ) mist.continue(state) } @@ -889,11 +981,7 @@ fn ws_handler( ]) }), ) - let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.MemoryData(runs_json:)), - ) + let _ = ws_send(state, conn, protocol.MemoryData(runs_json:)) mist.continue(state) } Ok(protocol.RequestChatHistoryDay(date:)) -> { @@ -916,25 +1004,24 @@ fn ws_handler( }), ) let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.ChatHistoryDay( - date:, - pairs_json:, - )), + protocol.ChatHistoryDay(date:, pairs_json:), ) mist.continue(state) } Ok(protocol.RequestRewind(index: _)) -> { // Rewind is no longer supported — agent starts fresh each session let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.AssistantMessage( + protocol.AssistantMessage( text: "[Rewind not available — agent starts fresh each session. Use recall_recent to check history.]", model: "system", usage: None, - )), + ), ) mist.continue(state) } @@ -943,12 +1030,7 @@ fn ws_handler( let documents_json = json.to_string(json.array(docs, encode_doc_meta)) let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.DocumentListData( - documents_json:, - )), - ) + ws_send(state, conn, protocol.DocumentListData(documents_json:)) mist.continue(state) } Ok(protocol.RequestDocumentView(doc_id:)) -> { @@ -972,12 +1054,10 @@ fn ws_handler( } } let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.DocumentViewData( - doc_id:, - document_json:, - )), + protocol.DocumentViewData(doc_id:, document_json:), ) mist.continue(state) } @@ -1011,12 +1091,10 @@ fn ws_handler( let results_json = json.to_string(json.array(results, encode_search_result)) let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.SearchResultsData( - query:, - results_json:, - )), + protocol.SearchResultsData(query:, results_json:), ) mist.continue(state) } @@ -1034,13 +1112,10 @@ fn ws_handler( Error(reason) -> #("error", reason) } let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.ApprovalResult( - slug:, - status:, - message:, - )), + protocol.ApprovalResult(slug:, status:, message:), ) mist.continue(state) } @@ -1048,13 +1123,14 @@ fn ws_handler( case string.trim(reason) { "" -> { let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.ApprovalResult( + protocol.ApprovalResult( slug:, status: "error", message: "Rejection reason must not be empty", - )), + ), ) mist.continue(state) } @@ -1072,18 +1148,26 @@ fn ws_handler( Error(r) -> #("error", r) } let _ = - mist.send_text_frame( + ws_send( + state, conn, - protocol.encode_server_message(protocol.ApprovalResult( - slug:, - status:, - message:, - )), + protocol.ApprovalResult(slug:, status:, message:), ) mist.continue(state) } } } + Ok(protocol.Ack(seq:)) -> { + client_registry.ack(state.registry, state.client_id, seq) + mist.continue(state) + } + Ok(protocol.Pong) -> { + // Keepalive reply — no action needed. The mere fact + // that we received any text frame means the connection + // is alive; mist's idle-timeout (if any) reset on + // receive. No state change required. + mist.continue(state) + } Error(_) -> mist.continue(state) } } @@ -1095,8 +1179,7 @@ fn ws_handler( case delivery { DeliverReply(cycle_id: _, response:, model:, usage:, tools_fired: _) -> { let msg = protocol.AssistantMessage(text: response, model:, usage:) - let _ = - mist.send_text_frame(conn, protocol.encode_server_message(msg)) + let _ = ws_send(state, conn, msg) mist.continue(state) } DeliverQuestion(cycle_id: _, question_id: _, question:, origin:) -> { @@ -1105,8 +1188,7 @@ fn ws_handler( AgentOrigin(agent_name:) -> protocol.agent_source(agent_name) } let msg = protocol.Question(text: question, source: source_str) - let _ = - mist.send_text_frame(conn, protocol.encode_server_message(msg)) + let _ = ws_send(state, conn, msg) mist.continue(state) } DeliverClosed -> mist.stop() @@ -1116,18 +1198,48 @@ fn ws_handler( // Notification arrived via selector mist.Custom(GotNotification(notification)) -> { let server_msg = notification_to_server_message(notification) - let _ = - mist.send_text_frame(conn, protocol.encode_server_message(server_msg)) + let _ = ws_send(state, conn, server_msg) mist.continue(state) } // Session history on connect mist.Custom(SendHistory(messages_json)) -> { + let _ = ws_send(state, conn, protocol.SessionHistory(messages_json:)) + mist.continue(state) + } + + // Outbox replay on reconnect with `?since=N`. Each frame ships + // with its ORIGINAL seq (from when the previous WS process + // emitted it), not a fresh one — the client uses these seqs to + // advance its `lastSeenSeq` cursor as if it had received them + // live. Frames go straight to the wire; we don't go through + // ws_send because that would re-append to the outbox and double + // the sequence numbers. + mist.Custom(ReplayReady(replay)) -> { + case replay { + outbox.Replay(frames) -> { + list.each(frames, fn(frame) { + let _ = + mist.send_text_frame( + conn, + protocol.splice_seq(frame.seq, frame.body_json), + ) + Nil + }) + } + _ -> Nil + } + mist.continue(state) + } + + // Keepalive — emit a Ping (seq:0, doesn't enter outbox) and + // re-arm the timer. If the connection has gone silent, the + // mist.send_text_frame fails and the process moves to + // mist.Closed; the client reconnects fast. + mist.Custom(KeepaliveTick) -> { + let _ = ws_send(state, conn, protocol.Ping) let _ = - mist.send_text_frame( - conn, - protocol.encode_server_message(protocol.SessionHistory(messages_json:)), - ) + process.send_after(state.keepalive_subject, keepalive_tick_ms, Nil) mist.continue(state) } diff --git a/src/web/html.gleam b/src/web/html.gleam index 5fa920e..632d364 100644 --- a/src/web/html.gleam +++ b/src/web/html.gleam @@ -569,13 +569,16 @@ pub fn chat_page(agent_name: String, agent_version: String) -> String { renderChatHistoryDay(data.date, data.pairs); break; case 'user_message_ack': - // Server has accepted our message frame. Drop the in-flight - // entry so it won't be re-rendered if a SessionHistory rebuild - // comes through later. The bubble itself is already in the DOM - // from sendMessage; nothing more to do visually here. - inFlightUserMessages = inFlightUserMessages.filter(function(m) { - return m.id !== data.client_msg_id; - }); + // Server has accepted our message frame. Historically this + // dropped the entry from inFlightUserMessages immediately, + // but that raced with reconnect-mid-cycle: ack arrives, + // entry dropped, then session_history rebuild wipes the + // DOM, message not in server's view yet → bubble vanishes. + // + // The fix: keep the entry in inFlightUserMessages until the + // text-match drop in renderSessionHistory confirms cog has + // actually persisted it. The ack stays useful as a 'sending + // → sent' UI hint (decoupled from the durable tracking). break; case 'assistant_message': removeThinking(); @@ -1453,20 +1456,46 @@ pub fn mobile_page(agent_name: String, agent_version: String) -> String { }); input.addEventListener('input', autoGrow); - // Server tags every frame with a monotonic `seq`. Track the last one we - // saw so we can console.warn if frames arrive out of order — useful - // signal if the wire path ever does reorder things. Reset on reconnect. + // Per-client outbox cursor (mobile view). Server assigns a + // monotonic seq to every frame; we track the highest applied + // locally so reconnect can tell the server 'I have through seq N + // — replay from N+1'. Persisted in sessionStorage so a tab + // refresh under the same browser doesn't lose the cursor. var lastSeenSeq = 0; + var lastAckedSeq = 0; + var ackInterval = null; + try { + var stored = sessionStorage.getItem('springdrift_last_seq_mobile'); + if (stored) lastSeenSeq = parseInt(stored, 10) || 0; + } catch (e) {} + + function buildWsUrlWithSince() { + var base = wsUrl(); + if (lastSeenSeq <= 0) return base; + var sep = base.indexOf('?') >= 0 ? '&' : '?'; + return base + sep + 'since=' + lastSeenSeq; + } function connect() { - lastSeenSeq = 0; - ws = new WebSocket(wsUrl()); + ws = new WebSocket(buildWsUrlWithSince()); ws.onopen = function() { setStatus('connected'); reconnectDelay = 1000; + if (ackInterval) clearInterval(ackInterval); + ackInterval = setInterval(function() { + if (!ws || ws.readyState !== WebSocket.OPEN) return; + if (lastSeenSeq > lastAckedSeq) { + ws.send(JSON.stringify({ type: 'ack', seq: lastSeenSeq })); + lastAckedSeq = lastSeenSeq; + } + }, 5000); }; ws.onclose = function() { setStatus('disconnected'); hideThinking(); + if (ackInterval) { clearInterval(ackInterval); ackInterval = null; } + try { + sessionStorage.setItem('springdrift_last_seq_mobile', String(lastSeenSeq)); + } catch (e) {} setTimeout(connect, reconnectDelay); reconnectDelay = Math.min(reconnectDelay * 2, 30000); }; @@ -1474,11 +1503,23 @@ pub fn mobile_page(agent_name: String, agent_version: String) -> String { ws.onmessage = function(evt) { var msg; try { msg = JSON.parse(evt.data); } catch (e) { return; } - if (typeof msg.seq === 'number') { + // Server keepalive — echo pong, take no further action. + if (msg.type === 'ping') { + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'pong' })); + } + return; + } + if (typeof msg.seq === 'number' && msg.seq > 0) { if (lastSeenSeq > 0 && msg.seq < lastSeenSeq) { console.warn('ws frame arrived out of order: seq=' + msg.seq + ' after ' + lastSeenSeq + ' (type=' + msg.type + ')'); } - if (msg.seq > lastSeenSeq) lastSeenSeq = msg.seq; + if (msg.seq > lastSeenSeq) { + lastSeenSeq = msg.seq; + try { + sessionStorage.setItem('springdrift_last_seq_mobile', String(lastSeenSeq)); + } catch (e) {} + } } switch (msg.type) { case 'session_history': @@ -1522,12 +1563,12 @@ pub fn mobile_page(agent_name: String, agent_version: String) -> String { } break; case 'user_message_ack': - // Server confirmed the message frame. Drop the in-flight - // entry so it won't be re-rendered on the next history - // rebuild. - inFlightUserMessages = inFlightUserMessages.filter(function(im) { - return im.id !== msg.client_msg_id; - }); + // Server confirmed the WS frame was received but cog may + // not have persisted the message yet — see the chat-page + // handler comment for the disappearing-bubble race. The + // in-flight entry is dropped only when renderSessionHistory + // observes the message in the server's authoritative view + // (text-match drop in the session_history handler above). break; case 'thinking': showThinking(); @@ -4389,17 +4430,36 @@ fn ws_connect_js() -> String { } var clientId = getClientId(); - // Server tags every frame with a monotonic seq. Detect reordering in - // console for diagnostics. Reset on reconnect. + // Per-client outbox cursor. Server assigns a monotonic seq to + // every frame; we track the highest applied locally so reconnect + // can tell the server 'I have through seq N — replay from N+1'. + // Persisted in sessionStorage so a tab refresh under the same + // browser doesn't lose the cursor and trigger a needless full + // session_history rebuild. var lastSeenSeq = 0; + try { + var stored = sessionStorage.getItem('springdrift_last_seq_' + clientId); + if (stored) lastSeenSeq = parseInt(stored, 10) || 0; + } catch (e) {} + // Track the highest seq we've ack'd to the server. Avoids spamming + // duplicate acks when no new frames have arrived. + var lastAckedSeq = 0; + // Periodic ack loop. Every ~5s, if our cursor has moved past the + // last ack we sent, ship an ack frame. Server prunes its outbox + // up to the acked seq. + var ackInterval = null; + function connect() { - lastSeenSeq = 0; var proto = location.protocol === 'https:' ? 'wss:' : 'ws:'; var params = new URLSearchParams(location.search); var token = params.get('token'); var qsParts = []; if (token) qsParts.push('token=' + encodeURIComponent(token)); qsParts.push('client_id=' + encodeURIComponent(clientId)); + // Only add since=N when we have a non-zero cursor — a fresh tab + // (no sessionStorage) should get a session_history rebuild, not + // try to replay from seq 0. + if (lastSeenSeq > 0) qsParts.push('since=' + lastSeenSeq); var qs = qsParts.length ? '?' + qsParts.join('&') : ''; ws = new WebSocket(proto + '//' + location.host + '/ws' + qs); @@ -4428,11 +4488,26 @@ fn ws_connect_js() -> String { else if (tab === 'dprime') requestDprimeData(); else if (tab === 'dprime-config') requestDprimeConfig(); } + // Start the periodic ack loop on every (re)connect. + if (ackInterval) clearInterval(ackInterval); + ackInterval = setInterval(function() { + if (!ws || ws.readyState !== WebSocket.OPEN) return; + if (lastSeenSeq > lastAckedSeq) { + ws.send(JSON.stringify({ type: 'ack', seq: lastSeenSeq })); + lastAckedSeq = lastSeenSeq; + } + }, 5000); }; ws.onclose = function() { statusEl.textContent = 'reconnecting...'; statusDot.className = 'dot disconnected'; + if (ackInterval) { clearInterval(ackInterval); ackInterval = null; } + // Persist the last-seen cursor so reconnect (or a refresh + // before reconnect lands) opens with ?since=N. + try { + sessionStorage.setItem('springdrift_last_seq_' + clientId, String(lastSeenSeq)); + } catch (e) {} setTimeout(connect, reconnectDelay); reconnectDelay = Math.min(reconnectDelay * 2, 10000); }; @@ -4442,11 +4517,24 @@ fn ws_connect_js() -> String { ws.onmessage = function(evt) { var data; try { data = JSON.parse(evt.data); } catch(e) { console.error('WS parse error:', e.message, 'data length:', evt.data.length); return; } - if (typeof data.seq === 'number') { + // Server-side keepalive — reply pong, do nothing else. Doesn't + // affect lastSeenSeq (server emits ping with seq:0 explicitly). + if (data.type === 'ping') { + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'pong' })); + } + return; + } + if (typeof data.seq === 'number' && data.seq > 0) { if (lastSeenSeq > 0 && data.seq < lastSeenSeq) { console.warn('ws frame arrived out of order: seq=' + data.seq + ' after ' + lastSeenSeq + ' (type=' + data.type + ')'); } - if (data.seq > lastSeenSeq) lastSeenSeq = data.seq; + if (data.seq > lastSeenSeq) { + lastSeenSeq = data.seq; + try { + sessionStorage.setItem('springdrift_last_seq_' + clientId, String(lastSeenSeq)); + } catch (e) {} + } } handleServerMessage(data); }; diff --git a/src/web/outbox.gleam b/src/web/outbox.gleam new file mode 100644 index 0000000..489defd --- /dev/null +++ b/src/web/outbox.gleam @@ -0,0 +1,223 @@ +//// Per-client bounded outbox of server→client WebSocket frames. +//// +//// **What this gives the wire protocol.** Every server-pushed frame +//// gets a monotonic `seq`. The client tracks the last `seq` it has +//// applied locally and acks back. The server prunes the outbox up +//// to the acked seq. On reconnect, the client opens with +//// `?since=N`; the server replays from `N + 1`. If `N` is older +//// than the oldest seq still in the outbox the server falls back +//// to a full `session_history` rebuild. +//// +//// **Why per-client.** The pre-existing `seq` was per-node monotonic +//// — fine for "detect reordering" diagnostics, useless for replay. +//// A reconnect under the same `client_id` (stable across browser +//// tabs / refreshes / mid-cycle blips) needs to know what the +//// *client* has seen, not what the *node* has emitted. So the seq +//// counter and outbox both key on `client_id`. +//// +//// **Bounds.** The outbox is a ring buffer. Default cap is 500 +//// frames. We additionally drop entries older than 5 minutes — past +//// that horizon a reconnect is safer with a fresh `session_history` +//// rebuild than with a partial replay. +//// +//// **What we deliberately do NOT do here.** +//// +//// * No persistence. Process death loses the outbox; the next +//// connect under that client_id starts a fresh seq sequence. +//// A redirected operator who closes the laptop and reconnects +//// a day later gets `session_history`, not replay. That's +//// correct. +//// * No cross-tab fan-out. Each WS process owns its own connection; +//// the outbox is shared across reconnects of the *same* client_id +//// but not across separate browser tabs (which mint distinct +//// client_ids). This is intentional — different tabs are +//// different conversations. +//// * No back-pressure on append. If the client never acks, the +//// outbox fills, hits the cap, and the oldest entries get +//// dropped. The client will fall back to `session_history` on +//// its next reconnect anyway. + +// Copyright (C) 2026 Seamus Brady +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +import gleam/list +import gleam/option.{type Option, None, Some} + +/// Default cap on outbox length. Past this, the oldest frame is +/// dropped on every append. 500 is generous for a single WS that +/// reconnects within seconds; reconnects after the 500-frame backlog +/// fall back to `session_history`. +pub const default_max_size: Int = 500 + +/// Default age cap (ms). Frames older than this are dropped on +/// `prune_age/2`. Five minutes covers the long-cycle case (operator +/// laptop sleeps, agent runs for 3 min, operator wakes) while +/// keeping memory bounded under sustained idle. +pub const default_max_age_ms: Int = 300_000 + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/// A single frame held in the outbox. Holds the encoded JSON body +/// (without the `seq` prefix — `replay_since` re-emits seq-prefixed +/// JSON when handing frames back). +pub type Frame { + Frame(seq: Int, body_json: String, sent_at_ms: Int) +} + +/// Result of trying to replay from a particular seq. `TooOld` means +/// the requested `since` is older than anything we still have — the +/// caller should fall back to a `session_history` rebuild. +pub type ReplayResult { + Replay(frames: List(Frame)) + TooOld(oldest_kept: Int) + /// Client is current — nothing to replay. + UpToDate +} + +/// Configuration overrides. Pass `default_config()` for the standard +/// behaviour described above. +pub type Config { + Config(max_size: Int, max_age_ms: Int) +} + +pub fn default_config() -> Config { + Config(max_size: default_max_size, max_age_ms: default_max_age_ms) +} + +/// Per-client append-only buffer with bounded retention. `next_seq` +/// is the seq that *will be assigned* on the next `append`. Held +/// chronologically (oldest at head, newest at tail) so prune_age +/// drops from the head. +pub opaque type Outbox { + Outbox( + config: Config, + next_seq: Int, + /// Chronological order: head = oldest, tail = newest. + frames: List(Frame), + /// Monotonic record of the highest seq the client has acked. We + /// keep frames > acked_seq; anything ≤ acked_seq is pruned on + /// the next `ack` and is no longer replayable. + acked_seq: Int, + ) +} + +// --------------------------------------------------------------------------- +// Construction +// --------------------------------------------------------------------------- + +pub fn new() -> Outbox { + with_config(default_config()) +} + +pub fn with_config(config: Config) -> Outbox { + Outbox(config: config, next_seq: 1, frames: [], acked_seq: 0) +} + +// --------------------------------------------------------------------------- +// Reads +// --------------------------------------------------------------------------- + +pub fn last_seq(outbox: Outbox) -> Int { + outbox.next_seq - 1 +} + +pub fn acked_seq(outbox: Outbox) -> Int { + outbox.acked_seq +} + +pub fn pending_count(outbox: Outbox) -> Int { + list.length(outbox.frames) +} + +pub fn oldest_kept_seq(outbox: Outbox) -> Option(Int) { + case outbox.frames { + [first, ..] -> Some(first.seq) + [] -> None + } +} + +// --------------------------------------------------------------------------- +// Mutations +// --------------------------------------------------------------------------- + +/// Append a fresh frame. Returns the updated outbox + the assigned +/// seq. Caller's responsibility to embed the seq into the JSON it +/// actually emits over the wire. +pub fn append(outbox: Outbox, body_json: String, now_ms: Int) -> #(Outbox, Int) { + let assigned = outbox.next_seq + let frame = Frame(seq: assigned, body_json: body_json, sent_at_ms: now_ms) + let new_frames = list.append(outbox.frames, [frame]) + // Cap-prune: drop oldest while over the size limit. + let trimmed = trim_to_size(new_frames, outbox.config.max_size) + #(Outbox(..outbox, next_seq: assigned + 1, frames: trimmed), assigned) +} + +/// Mark every frame with seq ≤ `up_to` as acknowledged and drop them +/// from the buffer. Idempotent. `up_to` below the current +/// `acked_seq` is a no-op (a stale ack from a slow network shouldn't +/// rewind the cursor). +pub fn ack(outbox: Outbox, up_to: Int) -> Outbox { + case up_to <= outbox.acked_seq { + True -> outbox + False -> { + let kept = list.filter(outbox.frames, fn(f) { f.seq > up_to }) + Outbox(..outbox, frames: kept, acked_seq: up_to) + } + } +} + +/// Drop frames older than `max_age_ms` from the oldest end. Called +/// periodically by the registry's janitor; not on every append. +pub fn prune_age(outbox: Outbox, now_ms: Int) -> Outbox { + let cutoff = now_ms - outbox.config.max_age_ms + let kept = list.filter(outbox.frames, fn(f) { f.sent_at_ms >= cutoff }) + Outbox(..outbox, frames: kept) +} + +// --------------------------------------------------------------------------- +// Replay on reconnect +// --------------------------------------------------------------------------- + +/// Return all frames with seq > `since`. The caller then re-emits +/// each frame's `body_json` with its `seq` to the new WS connection. +/// The contract: +/// +/// * `since` ≥ `last_seq` → `UpToDate` (no replay needed) +/// * `since` < `oldest_kept_seq` → `TooOld(oldest_kept)` (caller +/// should fall back to `session_history`) +/// * otherwise → `Replay(frames)` with frames in chronological +/// order +pub fn replay_since(outbox: Outbox, since: Int) -> ReplayResult { + case since >= last_seq(outbox) { + True -> UpToDate + False -> + case oldest_kept_seq(outbox) { + None -> UpToDate + Some(oldest) -> + case since < oldest - 1 { + True -> TooOld(oldest_kept: oldest) + False -> { + let to_send = list.filter(outbox.frames, fn(f) { f.seq > since }) + Replay(frames: to_send) + } + } + } + } +} + +// --------------------------------------------------------------------------- +// Internal +// --------------------------------------------------------------------------- + +fn trim_to_size(frames: List(Frame), max_size: Int) -> List(Frame) { + case list.length(frames) > max_size { + True -> list.drop(frames, list.length(frames) - max_size) + False -> frames + } +} diff --git a/src/web/protocol.gleam b/src/web/protocol.gleam index 2cc5ce6..2d768fb 100644 --- a/src/web/protocol.gleam +++ b/src/web/protocol.gleam @@ -26,10 +26,9 @@ import gleam/string import llm/types.{type Usage, Usage} import slog -/// Monotonic positive integer — tagged onto every outbound ServerMessage -/// JSON as a `seq` field so the client can detect reordering / gaps. -@external(erlang, "springdrift_ffi", "monotonic_seq") -fn monotonic_seq() -> Int +// Per-client seq is assigned by `web/client_registry.gleam` at the +// send site; this module just splices it into the JSON body via +// `encode_server_message_with_seq/2`. // --------------------------------------------------------------------------- // Types @@ -99,6 +98,16 @@ pub type ClientMessage { RequestApproveExport(slug: String, note: String) /// Operator rejects a Promoted export. reason is required. RequestRejectExport(slug: String, reason: String) + /// Outbox flow control. The client sends `{"type":"ack","seq":N}` + /// telling the server "I have received and applied every frame + /// through seq N". The server prunes its outbox up to N. Acks are + /// idempotent — duplicates and stale acks are no-ops. + Ack(seq: Int) + /// WebSocket keepalive reply. The server pings every ~25s to keep + /// the line alive across idle proxies; the client replies with + /// pong. A missed pong over `keepalive_grace_ms` triggers a + /// server-side close so the client reconnects fast. + Pong } pub type ServerMessage { @@ -177,6 +186,9 @@ pub type ServerMessage { /// Documents tab — toast feedback for approve/reject actions. /// status: "ok" | "error". slug + message for display. ApprovalResult(slug: String, status: String, message: String) + /// WebSocket keepalive — server sends Ping every ~25s to keep + /// the line alive across idle proxies. Client replies with Pong. + Ping } pub type CycleDataJson { @@ -271,6 +283,11 @@ pub fn decode_client_message(json_string: String) -> Result(ClientMessage, Nil) use reason <- decode.field("reason", decode.string) decode.success(RequestRejectExport(slug:, reason:)) } + "ack" -> { + use seq <- decode.field("seq", decode.int) + decode.success(Ack(seq:)) + } + "pong" -> decode.success(Pong) _ -> decode.failure(UserMessage("", None, []), "Unknown client message type") } @@ -293,21 +310,39 @@ fn attachment_decoder() -> decode.Decoder(Attachment) { // Encode (server → client) // --------------------------------------------------------------------------- -/// Encode a server message as JSON with a monotonic `seq` field injected -/// as the first key. The seq is a per-node monotonic integer so a client -/// can detect reordering or gaps in delivered frames. +/// Encode a server message as JSON with a `seq` field injected as +/// the first key. The seq is per-client and is assigned by the +/// outbox registry at send time. Pass `0` if you genuinely want a +/// seq-less frame (e.g. ping/pong). +pub fn encode_server_message_with_seq(seq: Int, msg: ServerMessage) -> String { + with_seq(seq, encode_body(msg)) +} + +/// Back-compat alias for tests / callers that pre-date the +/// per-client seq design. Emits a body with `seq:0`. Production +/// callers should use `encode_server_message_with_seq` (via +/// `web/gui.ws_send`) so the seq comes from the per-client outbox. pub fn encode_server_message(msg: ServerMessage) -> String { - with_seq(encode_body(msg)) + encode_server_message_with_seq(0, msg) +} + +/// Encode the JSON body of a server message *without* a seq prefix. +/// Used by the outbox registry: the WS handler asks the registry to +/// assign a seq for this body, then re-emits the body with the +/// returned seq spliced in via `encode_server_message_with_seq/2`. +pub fn encode_server_message_body(msg: ServerMessage) -> String { + encode_body(msg) } -/// Inject a `"seq": N,` field at the start of the JSON object. All body -/// encoders produce an object starting with `{` — we splice between the -/// `{` and the first existing field so the resulting JSON stays valid. -fn with_seq(body: String) -> String { - "{\"seq\":" - <> int.to_string(monotonic_seq()) - <> "," - <> string.drop_start(body, 1) +/// Splice the assigned seq into a body. All body encoders produce +/// an object starting with `{` — we splice between the `{` and the +/// first existing field so the resulting JSON stays valid. +pub fn splice_seq(seq: Int, body: String) -> String { + with_seq(seq, body) +} + +fn with_seq(seq: Int, body: String) -> String { + "{\"seq\":" <> int.to_string(seq) <> "," <> string.drop_start(body, 1) } fn encode_body(msg: ServerMessage) -> String { @@ -541,6 +576,10 @@ fn encode_body(msg: ServerMessage) -> String { #("message", json.string(message)), ]) |> json.to_string + + Ping -> + json.object([#("type", json.string("ping"))]) + |> json.to_string } } diff --git a/test/web/outbox_test.gleam b/test/web/outbox_test.gleam new file mode 100644 index 0000000..94fa0ee --- /dev/null +++ b/test/web/outbox_test.gleam @@ -0,0 +1,217 @@ +// Copyright (C) 2026 Seamus Brady +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +import gleam/list +import gleam/option.{None, Some} +import gleeunit/should +import web/outbox + +// ── Construction ────────────────────────────────────────────────────────── + +pub fn new_starts_at_seq_zero_test() { + let o = outbox.new() + outbox.last_seq(o) |> should.equal(0) + outbox.acked_seq(o) |> should.equal(0) + outbox.pending_count(o) |> should.equal(0) + outbox.oldest_kept_seq(o) |> should.equal(None) +} + +// ── Append assigns monotonic seq ────────────────────────────────────────── + +pub fn append_returns_monotonic_seq_test() { + let o = outbox.new() + let #(o, s1) = outbox.append(o, "{\"type\":\"a\"}", 1000) + let #(o, s2) = outbox.append(o, "{\"type\":\"b\"}", 1001) + let #(_o, s3) = outbox.append(o, "{\"type\":\"c\"}", 1002) + s1 |> should.equal(1) + s2 |> should.equal(2) + s3 |> should.equal(3) +} + +pub fn append_grows_pending_count_test() { + let o = outbox.new() + let #(o, _) = outbox.append(o, "x", 0) + let #(o, _) = outbox.append(o, "y", 0) + outbox.pending_count(o) |> should.equal(2) + outbox.last_seq(o) |> should.equal(2) +} + +// ── Ack prunes from below ───────────────────────────────────────────────── + +pub fn ack_prunes_acked_frames_test() { + let o = outbox.new() + let #(o, _) = outbox.append(o, "a", 0) + let #(o, _) = outbox.append(o, "b", 0) + let #(o, _) = outbox.append(o, "c", 0) + let o = outbox.ack(o, 2) + outbox.pending_count(o) |> should.equal(1) + outbox.acked_seq(o) |> should.equal(2) + // Oldest remaining should be seq 3. + outbox.oldest_kept_seq(o) |> should.equal(Some(3)) +} + +pub fn ack_at_or_below_current_is_noop_test() { + let o = outbox.new() + let #(o, _) = outbox.append(o, "a", 0) + let #(o, _) = outbox.append(o, "b", 0) + let o = outbox.ack(o, 2) + // Stale ack — shouldn't rewind anything. + let o2 = outbox.ack(o, 1) + outbox.acked_seq(o2) |> should.equal(2) + outbox.pending_count(o2) |> should.equal(0) +} + +// ── Replay semantics ────────────────────────────────────────────────────── + +pub fn replay_up_to_date_when_since_is_current_test() { + let o = outbox.new() + let #(o, _) = outbox.append(o, "a", 0) + // Client says it has seq 1 — we have seq 1 — no replay. + case outbox.replay_since(o, 1) { + outbox.UpToDate -> Nil + _ -> { + should.fail() + Nil + } + } +} + +pub fn replay_returns_frames_after_since_test() { + let o = outbox.new() + let #(o, _) = outbox.append(o, "a", 0) + let #(o, _) = outbox.append(o, "b", 0) + let #(o, _) = outbox.append(o, "c", 0) + // Client has 1, asks for everything after — should get 2 + 3. + case outbox.replay_since(o, 1) { + outbox.Replay(frames) -> { + list.length(frames) |> should.equal(2) + case frames { + [f2, f3] -> { + f2.seq |> should.equal(2) + f3.seq |> should.equal(3) + f2.body_json |> should.equal("b") + f3.body_json |> should.equal("c") + } + _ -> { + should.fail() + Nil + } + } + } + _ -> { + should.fail() + Nil + } + } +} + +pub fn replay_too_old_when_since_below_oldest_kept_test() { + // Append 3 frames, ack the first two so they prune. Client comes + // back asking for since=0 → its requested cursor is older than + // anything we have, return TooOld. + let o = outbox.new() + let #(o, _) = outbox.append(o, "a", 0) + let #(o, _) = outbox.append(o, "b", 0) + let #(o, _) = outbox.append(o, "c", 0) + let o = outbox.ack(o, 2) + case outbox.replay_since(o, 0) { + outbox.TooOld(oldest_kept) -> oldest_kept |> should.equal(3) + _ -> { + should.fail() + Nil + } + } +} + +pub fn replay_at_oldest_kept_minus_one_returns_replay_test() { + // Boundary: client cursor is exactly oldest_kept - 1 → still + // replayable (no gap). + let o = outbox.new() + let #(o, _) = outbox.append(o, "a", 0) + let #(o, _) = outbox.append(o, "b", 0) + let #(o, _) = outbox.append(o, "c", 0) + // No ack yet; oldest_kept is 1. + case outbox.replay_since(o, 0) { + outbox.Replay(frames) -> list.length(frames) |> should.equal(3) + _ -> { + should.fail() + Nil + } + } +} + +// ── Size cap drops oldest on overflow ───────────────────────────────────── + +pub fn append_respects_max_size_cap_test() { + let small = + outbox.with_config(outbox.Config(max_size: 3, max_age_ms: 1_000_000)) + let small = + list.fold([1, 2, 3, 4, 5], small, fn(o, _) { + let #(o, _) = outbox.append(o, "x", 0) + o + }) + outbox.pending_count(small) |> should.equal(3) + // The oldest kept should be seq 3 (1 and 2 dropped). + outbox.oldest_kept_seq(small) |> should.equal(Some(3)) + outbox.last_seq(small) |> should.equal(5) +} + +// ── Age prune drops stale frames from the head ──────────────────────────── + +pub fn prune_age_drops_old_frames_test() { + let o = outbox.with_config(outbox.Config(max_size: 100, max_age_ms: 1000)) + let #(o, _) = outbox.append(o, "a", 0) + // 500ms passes — a still inside window + let #(o, _) = outbox.append(o, "b", 500) + // 1500ms passes — first frame ages out + let #(o, _) = outbox.append(o, "c", 1500) + let pruned = outbox.prune_age(o, 1500) + // Frames at sent_at < 500ms have aged out (cutoff = 1500 - 1000 = 500). + // Frame 'a' (sent_at=0) drops; 'b' (500) and 'c' (1500) survive. + outbox.pending_count(pruned) |> should.equal(2) + outbox.oldest_kept_seq(pruned) |> should.equal(Some(2)) +} + +// ── End-to-end: typical reconnect flow ──────────────────────────────────── + +pub fn reconnect_flow_replays_missed_frames_test() { + // Simulate: client connects, server sends 3 frames, client acks 2, + // connection drops, server sends 2 more, client reconnects with + // since=2, expects 2 frames replayed. + let o = outbox.new() + let #(o, s1) = outbox.append(o, "msg1", 0) + let #(o, s2) = outbox.append(o, "msg2", 0) + let #(o, _s3) = outbox.append(o, "msg3", 0) + // Client ack: "I have through seq 2". + let o = outbox.ack(o, s2) + // Connection drops; server keeps publishing. + let #(o, _s4) = outbox.append(o, "msg4", 0) + let #(o, _s5) = outbox.append(o, "msg5", 0) + // Client reconnects with since=2 (last acked). + case outbox.replay_since(o, s2) { + outbox.Replay(frames) -> { + list.length(frames) |> should.equal(3) + case frames { + [f3, f4, f5] -> { + f3.seq |> should.equal(3) + f3.body_json |> should.equal("msg3") + f4.seq |> should.equal(4) + f5.seq |> should.equal(5) + } + _ -> { + should.fail() + Nil + } + } + } + _ -> { + should.fail() + Nil + } + } + s1 |> should.equal(1) +} diff --git a/test/web/protocol_test.gleam b/test/web/protocol_test.gleam index 273cfaf..c362496 100644 --- a/test/web/protocol_test.gleam +++ b/test/web/protocol_test.gleam @@ -548,14 +548,16 @@ pub fn encode_includes_seq_field_test() { } } -pub fn seq_increments_between_calls_test() { - let a = protocol.encode_server_message(protocol.Thinking) - let b = protocol.encode_server_message(protocol.Thinking) - // Two back-to-back encodes must produce different seq values (monotonic). - // Extract the seq value from each and confirm b > a. - let seq_a = extract_seq(a) - let seq_b = extract_seq(b) - { seq_b > seq_a } |> should.be_true +pub fn seq_passes_through_explicit_value_test() { + // Per-client seq is assigned by the outbox registry (see + // web/client_registry.gleam) and threaded through + // encode_server_message_with_seq. The protocol module just splices + // whatever value the caller hands it. Confirm the round-trip: + // pass seq=42, pull seq=42 back out. + let a = protocol.encode_server_message_with_seq(42, protocol.Thinking) + let b = protocol.encode_server_message_with_seq(43, protocol.Thinking) + extract_seq(a) |> should.equal(42) + extract_seq(b) |> should.equal(43) } fn extract_seq(json_str: String) -> Int {