Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ A tiny Go module containing the shared wire protocol types used by both the [Age
| File | Types | Purpose |
|------|-------|---------|
| `envelope.go` | `RelayEnvelope` | Encrypted message envelope (sid, seq, enc, tid) |
| `control.go` | `ControlMessage`, `ControlType` (16 constants), 15 payload structs | Relay control protocol |
| `control.go` | `ControlMessage`, `ControlType` (18 constants), 17 payload structs | Relay control protocol |
| `policy.go` | `PolicyJSON`, `PolicyMatchJSON` | Policy rule wire format |
| `capabilities.go` | `AgentCapability` | Per-agent feature flags the daemon emits to the PWA (MCP reconnect, session-scoped approvals, free-text replies, etc.) |
| `codex_sandbox.go` | `CodexSandboxMode` | Shared Codex per-session sandbox literals for daemon and PWA runtime controls |
| `replay.go` | `ReplayRequest`, `ReplayCompletePayload` | Durable session journal replay recovery using inner per-session `AgentMessage.seq` values |
| `replay.go` | `ReplayRequest`, `ReplayCompletePayload`, `SessionSnapshotRequest`, `SessionSnapshotPayload` | Durable session journal replay recovery and daemon-owned active-session snapshots |
| `trace.go` | `NewTraceID()`, `ValidTraceID()` | W3C-compatible trace ID generation |

### Control Types
Expand All @@ -39,13 +39,15 @@ register · join · heartbeat · ack · error · sync_policies
status_update · audit_entry · deactivate_developer · client_connected
client_count · key_rotate · entitlement_update · entitlement_violation
push_notify · push_notify_result
terminate_session · terminate_session_ack
```

### Payload Types

```
RegisterPayload · JoinPayload · AckPayload · ErrorPayload
StatusUpdatePayload · AuditEntryPayload · DeactivateDeveloperPayload
TerminateSessionPayload · TerminateSessionAckPayload
ClientConnectedPayload · ClientCountPayload · KeyRotatePayload
SyncPoliciesPayload · EntitlementUpdatePayload · EntitlementViolationPayload
PushNotifyPayload · PushNotifyResultPayload
Expand All @@ -66,7 +68,9 @@ env := protocol.RelayEnvelope{

Both `agentd` and `agentd-relay` use type aliases to re-export these types under their `internal/relay` package, so existing code using `relay.RelayEnvelope` continues to work unchanged.

Replay recovery is daemon-owned: the PWA sends `ReplayRequest{after_seq}` when it detects a gap in the inner per-session `AgentMessage.seq`, and the daemon responds with replayed `AgentMessage` frames followed by `replay_complete`. `RelayEnvelope.seq` remains the outer transport sequence for relay delivery and is not the cursor used for UI replay.
Replay recovery is daemon-owned: the PWA sends `ReplayRequest{after_seq}` when it detects a gap in the inner per-session `AgentMessage.seq`, and the daemon responds with replayed `AgentMessage` frames followed by `replay_complete`. Route-bound reconnects can request `SessionSnapshotRequest` first so the active chat view is rebuilt from daemon state before broad replay. `RelayEnvelope.seq` remains the outer transport sequence for relay delivery and is not the cursor used for UI replay.

Relay joins may also carry optional `nav_session_id` metadata. The relay forwards that value in `ClientConnectedPayload` so the daemon can prioritize the active session snapshot, while legacy clients omit the field and keep the existing replay behavior.

## Design Principles

Expand Down
10 changes: 6 additions & 4 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type RegisterPayload struct {
// JoinPayload is sent by the mobile client to join a session.
// JWT carries the clientToken from QRPayload.token.
type JoinPayload struct {
SessionID string `json:"sid"`
JWT string `json:"jwt"`
ClientID string `json:"client_id,omitempty"`
SessionID string `json:"sid"`
JWT string `json:"jwt"`
ClientID string `json:"client_id,omitempty"`
NavSessionID string `json:"nav_session_id,omitempty"`
}

// AckPayload is the relay's acknowledgement of a successful registration or join.
Expand Down Expand Up @@ -121,7 +122,8 @@ type TerminateSessionAckPayload struct {
// ClientConnectedPayload is sent by the relay to the daemon when a PWA client
// connects or reconnects. The daemon uses this to replay message history.
type ClientConnectedPayload struct {
SessionID string `json:"session_id"`
SessionID string `json:"session_id"`
NavSessionID string `json:"nav_session_id,omitempty"`
}

// ClientCountPayload is sent by the relay to the daemon after every client
Expand Down
59 changes: 56 additions & 3 deletions protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,38 @@ func TestRegisterPayloadRoundtrip(t *testing.T) {
func TestJoinPayloadRoundtrip(t *testing.T) {
t.Parallel()
original := protocol.JoinPayload{
SessionID: "sess-123",
JWT: "eyJ...",
SessionID: "sess-123",
JWT: "eyJ...",
ClientID: "client-1",
NavSessionID: "nav-456",
}
assertRoundtrip(t, original)
}

func TestJoinPayloadBackwardCompatibleWithoutNavSessionID(t *testing.T) {
t.Parallel()
const legacyJSON = `{"sid":"sess-123","jwt":"eyJ..."}`

var got protocol.JoinPayload
if err := json.Unmarshal([]byte(legacyJSON), &got); err != nil {
t.Fatalf("unmarshal legacy JoinPayload: %v", err)
}
if got.SessionID != "sess-123" || got.JWT != "eyJ..." {
t.Fatalf("legacy JoinPayload = %+v", got)
}
if got.NavSessionID != "" {
t.Fatalf("NavSessionID = %q, want empty for legacy payload", got.NavSessionID)
}

raw, err := json.Marshal(protocol.JoinPayload{SessionID: "sess-123", JWT: "eyJ..."})
if err != nil {
t.Fatalf("marshal JoinPayload: %v", err)
}
if string(raw) != legacyJSON {
t.Fatalf("JoinPayload without nav_session_id JSON = %s, want %s", raw, legacyJSON)
}
}

func TestAckPayloadRoundtrip(t *testing.T) {
t.Parallel()
assertRoundtrip(t, protocol.AckPayload{SessionID: "sess-123"})
Expand Down Expand Up @@ -351,7 +377,34 @@ func TestTerminateSessionPayloadRoundtrip(t *testing.T) {

func TestClientConnectedPayloadRoundtrip(t *testing.T) {
t.Parallel()
assertRoundtrip(t, protocol.ClientConnectedPayload{SessionID: "sess-123"})
assertRoundtrip(t, protocol.ClientConnectedPayload{
SessionID: "sess-123",
NavSessionID: "nav-456",
})
}

func TestClientConnectedPayloadBackwardCompatibleWithoutNavSessionID(t *testing.T) {
t.Parallel()
const legacyJSON = `{"session_id":"sess-123"}`

var got protocol.ClientConnectedPayload
if err := json.Unmarshal([]byte(legacyJSON), &got); err != nil {
t.Fatalf("unmarshal legacy ClientConnectedPayload: %v", err)
}
if got.SessionID != "sess-123" {
t.Fatalf("SessionID = %q, want sess-123", got.SessionID)
}
if got.NavSessionID != "" {
t.Fatalf("NavSessionID = %q, want empty for legacy payload", got.NavSessionID)
}

raw, err := json.Marshal(protocol.ClientConnectedPayload{SessionID: "sess-123"})
if err != nil {
t.Fatalf("marshal ClientConnectedPayload: %v", err)
}
if string(raw) != legacyJSON {
t.Fatalf("ClientConnectedPayload without nav_session_id JSON = %s, want %s", raw, legacyJSON)
}
}

func TestSyncPoliciesPayloadRoundtrip(t *testing.T) {
Expand Down
35 changes: 35 additions & 0 deletions replay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Package protocol - durable session replay recovery wire protocol.
package protocol

import "encoding/json"

// Message-type constants for daemon-owned control-journal recovery.
const (
// MsgReplayRequest is a PWA->daemon request for daemon messages whose
Expand All @@ -10,6 +12,14 @@ const (
// MsgReplayComplete is the daemon->PWA sentinel emitted after a requested
// replay range has been drained.
MsgReplayComplete = "replay_complete"

// MsgSessionSnapshotRequest is a PWA->daemon request for the daemon-owned
// read model of a route-bound active session.
MsgSessionSnapshotRequest = "session_snapshot_request"

// MsgSessionSnapshot is the daemon->PWA response containing active-session
// transcript and pending state. Historical pending entries are state only.
MsgSessionSnapshot = "session_snapshot"
)

// ReplayRequest is sent by the PWA when it detects a gap in the inner
Expand All @@ -29,3 +39,28 @@ type ReplayCompletePayload struct {
Count int `json:"count"`
Error string `json:"error,omitempty"`
}

// SessionSnapshotRequest is sent by the PWA when a route-bound active session
// should be hydrated from daemon-authoritative state before broad replay.
type SessionSnapshotRequest struct {
Type string `json:"type"`
SessionID string `json:"session_id"`
Reason string `json:"reason,omitempty"`
AfterCursor string `json:"after_cursor,omitempty"`
}

// SessionSnapshotPayload is carried in a daemon->PWA AgentMessage with Type
// MsgSessionSnapshot. Message, session, approval, and prompt entries are raw
// JSON here because their full structs are daemon/PWA-owned; this package pins
// the cross-repo envelope and state semantics.
type SessionSnapshotPayload struct {
SessionID string `json:"session_id"`
Session json.RawMessage `json:"session,omitempty"`
Messages []json.RawMessage `json:"messages"`
PendingApprovals []json.RawMessage `json:"pending_approvals"`
PendingPrompts []json.RawMessage `json:"pending_prompts"`
Cursor string `json:"cursor,omitempty"`
Sequence uint64 `json:"sequence,omitempty"`
Complete bool `json:"complete"`
Error string `json:"error,omitempty"`
}
88 changes: 88 additions & 0 deletions replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ func TestReplayMessageTypeConstants(t *testing.T) {
if MsgReplayComplete != "replay_complete" {
t.Fatalf("MsgReplayComplete = %q, want replay_complete", MsgReplayComplete)
}
if MsgSessionSnapshotRequest != "session_snapshot_request" {
t.Fatalf("MsgSessionSnapshotRequest = %q, want session_snapshot_request", MsgSessionSnapshotRequest)
}
if MsgSessionSnapshot != "session_snapshot" {
t.Fatalf("MsgSessionSnapshot = %q, want session_snapshot", MsgSessionSnapshot)
}
if MsgReplayRequest == MsgReplayComplete {
t.Fatal("replay request and completion message types must differ")
}
Expand Down Expand Up @@ -84,3 +90,85 @@ func TestReplayCompletePayloadOmitsEmptyError(t *testing.T) {
t.Fatalf("ReplayCompletePayload JSON = %s, want %s", raw, wantJSON)
}
}

func TestSessionSnapshotRequestJSONRoundtrip(t *testing.T) {
in := SessionSnapshotRequest{
Type: MsgSessionSnapshotRequest,
SessionID: "session-123",
Reason: "route_reopen",
AfterCursor: "99",
}

raw, err := json.Marshal(in)
if err != nil {
t.Fatalf("marshal SessionSnapshotRequest: %v", err)
}
const wantJSON = `{"type":"session_snapshot_request","session_id":"session-123","reason":"route_reopen","after_cursor":"99"}`
if string(raw) != wantJSON {
t.Fatalf("SessionSnapshotRequest JSON = %s, want %s", raw, wantJSON)
}

var out SessionSnapshotRequest
if err := json.Unmarshal(raw, &out); err != nil {
t.Fatalf("unmarshal SessionSnapshotRequest: %v", err)
}
if out != in {
t.Fatalf("SessionSnapshotRequest roundtrip = %+v, want %+v", out, in)
}
}

func TestSessionSnapshotPayloadJSONRoundtrip(t *testing.T) {
in := SessionSnapshotPayload{
SessionID: "session-123",
Session: json.RawMessage(`{"id":"session-123","state":"running"}`),
Messages: []json.RawMessage{json.RawMessage(`{"type":"output","session_id":"session-123","seq":1}`)},
PendingApprovals: []json.RawMessage{json.RawMessage(`{"approval_id":"approval-1","session_id":"session-123"}`)},
PendingPrompts: []json.RawMessage{json.RawMessage(`{"question_id":"question-1"}`)},
Cursor: "1",
Sequence: 1,
Complete: true,
}

raw, err := json.Marshal(in)
if err != nil {
t.Fatalf("marshal SessionSnapshotPayload: %v", err)
}
const wantJSON = `{"session_id":"session-123","session":{"id":"session-123","state":"running"},"messages":[{"type":"output","session_id":"session-123","seq":1}],"pending_approvals":[{"approval_id":"approval-1","session_id":"session-123"}],"pending_prompts":[{"question_id":"question-1"}],"cursor":"1","sequence":1,"complete":true}`
if string(raw) != wantJSON {
t.Fatalf("SessionSnapshotPayload JSON = %s, want %s", raw, wantJSON)
}

var out SessionSnapshotPayload
if err := json.Unmarshal(raw, &out); err != nil {
t.Fatalf("unmarshal SessionSnapshotPayload: %v", err)
}
if out.SessionID != in.SessionID || out.Cursor != in.Cursor || out.Sequence != in.Sequence || out.Complete != in.Complete {
t.Fatalf("SessionSnapshotPayload scalar roundtrip = %+v, want %+v", out, in)
}
if string(out.Session) != string(in.Session) || len(out.Messages) != 1 || string(out.Messages[0]) != string(in.Messages[0]) {
t.Fatalf("SessionSnapshotPayload raw state roundtrip = %+v, want %+v", out, in)
}
if len(out.PendingApprovals) != 1 || string(out.PendingApprovals[0]) != string(in.PendingApprovals[0]) {
t.Fatalf("PendingApprovals = %s, want %s", out.PendingApprovals, in.PendingApprovals)
}
if len(out.PendingPrompts) != 1 || string(out.PendingPrompts[0]) != string(in.PendingPrompts[0]) {
t.Fatalf("PendingPrompts = %s, want %s", out.PendingPrompts, in.PendingPrompts)
}
}

func TestSessionSnapshotPayloadOmitsOptionalFields(t *testing.T) {
raw, err := json.Marshal(SessionSnapshotPayload{
SessionID: "session-123",
Messages: []json.RawMessage{},
PendingApprovals: []json.RawMessage{},
PendingPrompts: []json.RawMessage{},
Complete: true,
})
if err != nil {
t.Fatalf("marshal SessionSnapshotPayload: %v", err)
}
const wantJSON = `{"session_id":"session-123","messages":[],"pending_approvals":[],"pending_prompts":[],"complete":true}`
if string(raw) != wantJSON {
t.Fatalf("SessionSnapshotPayload JSON = %s, want %s", raw, wantJSON)
}
}