sdk/go(emitter): thin fire-and-forget client for daemon socket#334
Open
ojongerius wants to merge 5 commits intomainfrom
Open
sdk/go(emitter): thin fire-and-forget client for daemon socket#334ojongerius wants to merge 5 commits intomainfrom
ojongerius wants to merge 5 commits intomainfrom
Conversation
First of five thin emitters that move signing, canonicalisation, and chain state out of in-process SDKs and into the agent-receipts daemon (ADR-0010, daemon process separation). The package contains no crypto and no chain state; Emit just marshals a frame, dials the daemon's local AF_UNIX socket lazily, and writes one length-prefixed JSON message per event. The daemon captures peer credentials, canonicalises (RFC 8785), signs (Ed25519), and persists. Wire format mirrors daemon/internal/pipeline.EmitterFrame field-for- field; the type is redeclared locally so this package does not import a daemon-internal package, but the JSON contract is the source of truth (the daemon would reject any deviation at validateFrame time). session_id is allocated once at New() and held for the lifetime of the Emitter, including across daemon reconnects (ADR-0010 OQ4 amendment, 2026-05-06). The rule prevents fragmenting one logical agent session into N receipts with N session_ids if Emit ever re-rolled the value per call. WithSessionID forwards a host-provided identifier when the parent process already owns one; an empty string falls back to a generated UUID since the daemon rejects empty values. Failure model: dial and write failures log a debug-level drop and return nil within milliseconds. A 25ms dial timeout and a 100ms write deadline enforce the fire-and-forget contract from ADR-0010 even on a frozen daemon. The conn is closed and rebuilt on the next Emit after a write failure (per the ADR; an inline retry would double the worst-case latency on every actual outage). The local EAGAIN drop counter and events_dropped flush mechanism described in ADR-0010 §"Permissions and trust" are deliberately deferred to a follow-up commit on this same section-3 branch; for now every drop reason takes the same silent log+return path. Tests run end-to-end against an in-process daemon via the repo-root go.work link. Coverage: - TestEmit_FrameRoundTrip — channel/tool/decision land in the receipt with monotonic chain.sequence. - TestEmit_SessionIDStableAcrossEmits — generated UUID survives every emit on the instance. - TestEmit_WithSessionIDOverride — host-supplied id reaches receipt.Issuer.SessionID untouched (OQ4 forwarding path). - TestEmit_HashDeterminism — differing whitespace + key order in Input must produce identical parameters_hash; guards that the emitter does not re-encode bytes the daemon needs to canonicalise. - TestEmit_FireAndForgetWhenDaemonDown — a missing socket returns nil within 50ms (the fire-and-forget contract). - TestEmit_ReconnectAfterDaemonRestart — emitter recovers when the daemon goes away mid-session and comes back, with the same session_id. - TestEmit_ReturnsErrorAfterClose — post-Close Emit surfaces an error rather than silently dropping; pins idempotent Close. Section 3 of #236; bundles with the four remaining thin-emitter modules per ADR-0010 OQ3 (single-shot release).
Contributor
There was a problem hiding this comment.
Pull request overview
Adds a new Go SDK package (sdk/go/emitter) that acts as a thin, fire-and-forget AF_UNIX client for sending tool-call events to the agent-receipts daemon using the daemon’s length-prefixed JSON frame format (no signing/canonicalization/chain state in the SDK).
Changes:
- Introduces
Emitterwith lazy dial, length-prefixed framing, and bounded dial/write timeouts. - Implements per-emitter stable
session_idallocation (UUID default; optional host override). - Adds end-to-end tests that run against an in-process daemon (round-trip, determinism, reconnect, fire-and-forget behavior, close semantics).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| sdk/go/emitter/emitter.go | New emitter client implementation (frame JSON + length-prefix, timeouts, default socket path resolution). |
| sdk/go/emitter/emitter_test.go | End-to-end integration tests validating wire compatibility and operational behavior against a daemon instance. |
Comment on lines
+293
to
+294
| return err | ||
| } |
Comment on lines
+308
to
+323
| // DefaultSocketPath returns the per-OS default daemon socket path. Mirrors | ||
| // daemon.DefaultSocketPath exactly so emitter and daemon agree without | ||
| // either importing the other: | ||
| // | ||
| // - AGENTRECEIPTS_SOCKET wins when set (any platform). | ||
| // - macOS: $TMPDIR/agentreceipts/events.sock (TMPDIR defaults to /tmp). | ||
| // - Linux with $XDG_RUNTIME_DIR set: $XDG_RUNTIME_DIR/agentreceipts/ | ||
| // events.sock — per-user, unprivileged. | ||
| // - Linux fallback: /run/agentreceipts/events.sock (system-install path). | ||
| // - Other platforms: empty string. New returns an error in that case | ||
| // so callers must pass WithSocketPath explicitly. | ||
| func DefaultSocketPath() string { | ||
| if p := os.Getenv("AGENTRECEIPTS_SOCKET"); p != "" { | ||
| return p | ||
| } | ||
| switch runtime.GOOS { |
Comment on lines
+186
to
+206
| // Emit sends one event to the daemon. Returns nil even when the daemon | ||
| // is unreachable: dial and write failures are logged at debug level and | ||
| // the conn is reset for re-dial on the next Emit. Returns an error only | ||
| // for caller bugs (Emitter closed, oversized frame, malformed input | ||
| // causing JSON marshal to fail) — situations a retry could not fix. | ||
| func (e *Emitter) Emit(ctx context.Context, ev Event) error { | ||
| if err := ctx.Err(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| body, err := json.Marshal(frame{ | ||
| Version: SupportedFrameVersion, | ||
| TsEmit: time.Now().UTC().Format(time.RFC3339Nano), | ||
| SessionID: e.sessionID, | ||
| Channel: ev.Channel, | ||
| Tool: frameTool{Server: ev.Tool.Server, Name: ev.Tool.Name}, | ||
| Input: ev.Input, | ||
| Output: ev.Output, | ||
| Error: ev.Error, | ||
| Decision: ev.Decision, | ||
| }) |
Three issues from the Copilot review on PR #334: 1. writeAll: guard n==0 with io.ErrShortWrite to prevent infinite spin when Write returns (0, nil). The daemon-side framing helper already does this; keeping both sides consistent. 2. DefaultSocketPath comment: remove "Mirrors daemon.DefaultSocketPath exactly" — the daemon reads AGENTRECEIPTS_SOCKET in main.go, not in DefaultSocketPath, so the two functions are not identical. The comment now describes the difference explicitly. 3. Emit validation: return errors for caller-bug inputs that the daemon would hard-reject — empty channel, empty tool.name, invalid decision, and non-nil Input/Output that fails json.Valid. Fits the documented contract (errors reserved for caller bugs, not transient failures). TestEmit_ValidatesEvent covers all six cases.
Comment on lines
+277
to
+281
| func (e *Emitter) dialIfNeededLocked() error { | ||
| if e.conn != nil { | ||
| return nil | ||
| } | ||
| conn, err := net.DialTimeout("unix", e.socketPath, dialTimeout) |
Comment on lines
+289
to
+300
| func (e *Emitter) writeFrameLocked(body []byte) error { | ||
| if err := e.conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil { | ||
| return err | ||
| } | ||
| defer func() { _ = e.conn.SetWriteDeadline(time.Time{}) }() | ||
|
|
||
| var hdr [4]byte | ||
| binary.BigEndian.PutUint32(hdr[:], uint32(len(body))) | ||
| if err := writeAll(e.conn, hdr[:]); err != nil { | ||
| return err | ||
| } | ||
| return writeAll(e.conn, body) |
Comment on lines
+207
to
+212
| if len(ev.Input) > 0 && !json.Valid(ev.Input) { | ||
| return errors.New("emitter: Input is not valid JSON") | ||
| } | ||
| if len(ev.Output) > 0 && !json.Valid(ev.Output) { | ||
| return errors.New("emitter: Output is not valid JSON") | ||
| } |
Comment on lines
+1
to
+7
| //go:build linux || darwin | ||
|
|
||
| // Tests run end-to-end against an in-process agent-receipts daemon. | ||
| // Build-tag-gated to linux/darwin because the emitter speaks AF_UNIX and the | ||
| // daemon refuses to start on other platforms — running these tests on a | ||
| // Windows builder would fail at daemon.Run, not at the emitter we want to | ||
| // exercise. |
…, JSON depth Four issues from the ultrareview pass on PR #334: 1. emitter_test.go: gate with `integration && (linux || darwin)` instead of just `linux || darwin`. The test imports github.com/agent-receipts/ar/daemon, but sdk/go's published go.mod cannot require daemon (daemon already requires sdk/go — adding a back-edge would create an import cycle). Under GOWORK=off the import would not resolve and `go test ./emitter` would fail. Matches the existing pattern in sdk/go/integration_test.go and sdk/go/cross_language_test.go which import daemon-side packages and gate the same way. Verified with `cd sdk/go && GOWORK=off go vet ./...`. 2. dialIfNeededLocked now takes ctx and uses `net.Dialer{Timeout: dialTimeout}.DialContext(ctx, ...)` instead of `net.DialTimeout`. The previous code ignored the caller's ctx, so a caller with a tighter deadline (or an already-cancelled ctx) could still see Emit block up to the full 25ms. 3. writeFrameLocked now takes ctx and computes `min(now+writeTimeout, ctx.Deadline())` as the SetWriteDeadline value. Without honouring ctx, a caller's tighter deadline could be silently extended up to the 100ms writeTimeout. 4. Replace `json.Valid(ev.Input)` / `json.Valid(ev.Output)` with `json.Unmarshal(..., new(interface{}))`. json.Valid only checks lexical syntax, so values like `1e400` (parses as a token but overflows float64) pass here yet fail the daemon's RFC 8785 canonicalisation re-unmarshal. Validate against what the daemon can actually canonicalise instead, surfacing the failure to the caller rather than dropping silently on the daemon side. Verification: - `GOWORK=off go vet ./sdk/go/...` passes. - `go test -race ./sdk/go/emitter` (default, integration tag off) is a no-test-files pass — emitter_test.go is correctly excluded from the publish path. - `go test -race -tags=integration ./sdk/go/emitter` passes (1.95s). - `go vet ./daemon/...` and `go test -race ./daemon/...` still pass.
Comment on lines
+245
to
+265
| defer e.mu.Unlock() | ||
|
|
||
| if e.closed { | ||
| return errors.New("emitter: closed") | ||
| } | ||
|
|
||
| if err := e.dialIfNeededLocked(ctx); err != nil { | ||
| e.logDrop(ctx, "dial", err) | ||
| return nil | ||
| } | ||
| if err := e.writeFrameLocked(ctx, body); err != nil { | ||
| e.logDrop(ctx, "write", err) | ||
| // A failed write almost always means the daemon went away. Drop | ||
| // the conn so the next Emit re-dials transparently. Per ADR-0010 | ||
| // the redial happens on the FOLLOWING Emit, not as an inline | ||
| // retry — an inline retry would double the worst-case Emit | ||
| // latency on every actual outage. | ||
| _ = e.conn.Close() | ||
| e.conn = nil | ||
| return nil | ||
| } |
| } | ||
| err := e.conn.Close() | ||
| e.conn = nil | ||
| return err |
Issue 1: Emit held e.mu across dialIfNeededLocked + writeFrameLocked, so
a slow accept (up to 25ms dialTimeout + 100ms writeTimeout) blocked every
sibling Emit on the mutex — violating the per-call fire-and-forget budget
under concurrency. Restructured to:
- snapshot e.conn and e.closed under a brief lock,
- dial outside the lock,
- install the dialled conn with a check-and-set (loser of a dial race
closes the redundant conn after releasing the mutex so the kernel
close path does not block siblings),
- re-acquire e.mu only for the framed write so length-prefix + body
cannot interleave on a single conn.
Issue 2: Close returned the raw e.conn.Close() error. Wrapped as
"emitter: close: %w" so callers get a contextual error consistent with
the rest of the package.
Other:
- dialIfNeededLocked -> dial, writeFrameLocked -> writeFrame: take conn
explicitly so the helpers no longer reach into e.conn under the
assumption that the caller holds e.mu.
- Package doc updated to spell out the new locking discipline (dial
outside, write inside).
Comment on lines
+260
to
+265
| if needDial { | ||
| dialed, err := e.dial(ctx) | ||
| if err != nil { | ||
| e.logDrop(ctx, "dial", err) | ||
| return nil | ||
| } |
Comment on lines
+355
to
+374
| func (e *Emitter) writeFrame(ctx context.Context, conn net.Conn, body []byte) error { | ||
| // The effective write deadline is min(now+writeTimeout, ctx.Deadline()). | ||
| // Without honouring ctx here, a caller's tighter deadline could be | ||
| // silently extended up to writeTimeout, breaking the fire-and-forget | ||
| // budget callers expect when they pass a ctx with a deadline. | ||
| deadline := time.Now().Add(writeTimeout) | ||
| if d, ok := ctx.Deadline(); ok && d.Before(deadline) { | ||
| deadline = d | ||
| } | ||
| if err := conn.SetWriteDeadline(deadline); err != nil { | ||
| return err | ||
| } | ||
| defer func() { _ = conn.SetWriteDeadline(time.Time{}) }() | ||
|
|
||
| var hdr [4]byte | ||
| binary.BigEndian.PutUint32(hdr[:], uint32(len(body))) | ||
| if err := writeAll(conn, hdr[:]); err != nil { | ||
| return err | ||
| } | ||
| return writeAll(conn, body) |
…rrors
- Check ctx.Err() after dial failure: a context-cancelled dial returns the
context error (caller signal) rather than a silent drop. Transient
outages (daemon down) still return nil per the fire-and-forget contract.
- Wrap writeFrame errors with operation context ("set write deadline",
"write header", "write body") so logDrop messages pinpoint what failed.
Comment on lines
+390
to
+396
| if err != nil { | ||
| return err | ||
| } | ||
| if n == 0 { | ||
| return io.ErrShortWrite | ||
| } | ||
| buf = buf[n:] |
Comment on lines
+311
to
+320
| if err := e.writeFrame(ctx, conn, body); err != nil { | ||
| e.logDrop(ctx, "write", err) | ||
| // A failed write almost always means the daemon went away. Drop | ||
| // the conn so the next Emit re-dials transparently. Per ADR-0010 | ||
| // the redial happens on the FOLLOWING Emit, not as an inline | ||
| // retry — an inline retry would double the worst-case Emit | ||
| // latency on every actual outage. | ||
| _ = conn.Close() | ||
| e.conn = nil | ||
| return nil |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
New package
sdk/go/emitter— a thin client that forwards tool-call events to the agent-receipts daemon over its local AF_UNIX socket. The package contains no crypto, no canonicalisation, and no chain state;Emitjust marshals anEmitterFrame-shaped JSON, dials the daemon's socket lazily, and writes one length-prefixed frame per event. The daemon (post #322 + #333) does the rest: peer-credential capture, RFC 8785 canonicalisation, Ed25519 signing, persistence.Public API:
Wire format mirrors
daemon/internal/pipeline.EmitterFramefield-for-field (the type is redeclared locally to avoid a daemon-internal import; the JSON contract is the source of truth — the daemon would reject any deviation atvalidateFrame).Why
Section 3 of the daemon-process-separation rollout (#236) replaces five in-process emitters with thin clients pointed at the daemon. This is the first of those five (Go SDK); the four remaining modules (mcp-proxy, sdk/ts, sdk/py, openclaw) follow as separate small PRs rather than one bundled drop, so each gets focused review.
Per ADR-0010 OQ4 amendment (2026-05-06),
session_idis allocated once atNew()and held for the lifetime of theEmitter— including across daemon reconnects.WithSessionIDforwards a host-supplied id when the parent process owns one (Claude Code's session id, an agent-loop context id); empty string falls back to a generated UUID since the daemon rejects empty values.Failure model is fire-and-forget per ADR-0010 §"Permissions and trust": a 25ms dial timeout and a 100ms write deadline cap
Emiteven on a frozen daemon. Failed writes close the conn so the nextEmitre-dials transparently. The EAGAIN-driven local drop counter andevents_droppedflush mechanism described in the same ADR section are deferred to a follow-up commit onmainonce this lands; for now every drop reason takes aslog.Debuglog +return nilpath. That deferral is the only explicit gap; everything else in this PR is the full intended design.Tests
End-to-end against an in-process daemon via the repo-root
go.worklink:TestEmit_FrameRoundTrip— channel/tool/decision land in the receipt with monotonicchain.sequence.TestEmit_SessionIDStableAcrossEmits— generated UUID survives every emit on the instance.TestEmit_WithSessionIDOverride— host-supplied id reachesreceipt.Issuer.SessionIDuntouched (OQ4 forwarding path).TestEmit_HashDeterminism— differing whitespace and key order inInputMUST produce identicalparameters_hash. Guards that the emitter does not re-encode bytes the daemon needs to canonicalise.TestEmit_FireAndForgetWhenDaemonDown— a missing socket returns nil within 50ms (the fire-and-forget contract).TestEmit_ReconnectAfterDaemonRestart— emitter recovers when the daemon goes away mid-session and comes back, with the samesession_id.TestEmit_ReturnsErrorAfterClose— post-CloseEmitsurfaces an error rather than silently dropping; pins idempotentClose.Verified locally:
New dependency
None. The package uses
github.com/google/uuid(already insdk/go/go.mod) plus the standard library.Checklist
sdk/go,daemon(incl.-tags=integration -race),cross-sdk-tests,mcp-proxyall greengo vet ./...clean insdk/godaemon/integration_test.godaemon/internal/pipeline.EmitterFrame)Follow-ups (separate PRs on
main)events_droppedmechanism — non-blocking conn, EAGAIN handling, atomic drop counter, flush field onEmitterFrame(or a control frame), daemon-side synthesis ofevents_droppedreceipts. Schema decision required.daemon/internal/daemontestcompanion tosockettest— once mcp-proxy's emitter tests need the samestartDaemon/writeTestKeyhelpers, consolidate the three copies.