From 257ea6a4aaa444370c69757ee5be4e24d3bb3fff Mon Sep 17 00:00:00 2001 From: Djordje Lukic Date: Wed, 29 Apr 2026 10:39:40 +0000 Subject: [PATCH 1/3] Add turn_end hook Symmetric counterpart of turn_start. Fires once per turn no matter how the turn ended: normal stop, error, hook-blocked, loop-detected, max-iterations, or context cancellation. Reason is reported via hooks.Input.Reason. The runtime extracts the per-iteration body into runTurn() so a deferred dispatch fires turn_end on every exit path. Uses context.WithoutCancel so handlers run to completion on Ctrl+C. Assisted-By: docker-agent --- agent-schema.json | 7 + examples/hooks.yaml | 32 +++ pkg/config/latest/types.go | 18 ++ pkg/hooks/dispatch_test.go | 1 + pkg/hooks/executor.go | 1 + pkg/hooks/hooks_test.go | 11 +- pkg/hooks/types.go | 12 + pkg/runtime/hooks.go | 45 ++++ pkg/runtime/loop.go | 423 ++++++++++++++++++++++------------- pkg/runtime/turn_end_test.go | 384 +++++++++++++++++++++++++++++++ 10 files changed, 777 insertions(+), 157 deletions(-) create mode 100644 pkg/runtime/turn_end_test.go diff --git a/agent-schema.json b/agent-schema.json index cb2c8b544..f43c3c8f1 100644 --- a/agent-schema.json +++ b/agent-schema.json @@ -620,6 +620,13 @@ "$ref": "#/definitions/HookDefinition" } }, + "turn_end": { + "type": "array", + "description": "Hooks that run once per agent turn when the turn finishes — the symmetric counterpart of turn_start. Fires no matter why the turn ended: a normal stop, an error, a hook-driven shutdown, the max_iterations limit, the loop detector, or context cancellation. The reason is reported via the hook input's reason field ('normal', 'continue', 'steered', 'error', 'canceled', 'hook_blocked', 'max_iterations', 'loop_detected'). Observational; output is ignored.", + "items": { + "$ref": "#/definitions/HookDefinition" + } + }, "before_llm_call": { "type": "array", "description": "Hooks that run just before each model call (after turn_start has assembled the messages). Use for observability, cost guardrails, or auditing without contributing system messages — turn_start is the right event for the latter.", diff --git a/examples/hooks.yaml b/examples/hooks.yaml index 5d7cc89b1..2392a66fd 100644 --- a/examples/hooks.yaml +++ b/examples/hooks.yaml @@ -16,6 +16,7 @@ # session_start - one-time setup; AdditionalContext PERSISTS in the session # user_prompt_submit- runs once per user message, before the first LLM call # turn_start - per-turn context; AdditionalContext is TRANSIENT +# turn_end - per-turn finalizer; fires no matter why the turn ended # before_llm_call - just before each model call (observability, guardrails) # after_llm_call - just after a successful model call # session_end - cleanup when the session terminates @@ -62,6 +63,7 @@ # /tmp/agent-session.log (session_start, session_end) # /tmp/agent-prompts.log (user_prompt_submit) # /tmp/agent-llm-calls.log (before_llm_call, after_llm_call) +# /tmp/agent-turns.log (turn_end) # /tmp/agent-tool-results.log (post_tool_use) # /tmp/agent-permissions.log (permission_request) # /tmp/agent-compactions.log (pre_compact) @@ -225,6 +227,36 @@ agents: - GUIDELINES.md - PROJECT.md + # ==================================================================== + # TURN-END - runs ONCE per turn after the iteration finishes — the + # symmetric counterpart of turn_start. Fires no matter why the turn + # ended: a normal stop, an error, a hook-driven shutdown, the + # max_iterations limit, the loop detector, or context cancellation. + # The reason is reported via the .reason field: + # + # normal - model finished cleanly, no follow-up + # continue - more iterations to come (e.g. tool calls) + # steered - drained steered messages prompted a re-entry + # error - model call failed (handleStreamError) + # canceled - context cancellation (Ctrl+C, parent ctx done) + # hook_blocked - before_llm_call or post_tool_use signalled stop + # loop_detected - degenerate consecutive-tool-call loop + # max_iterations - iteration cap was reached during this turn + # + # Observational; the result is ignored. Use it to time turns, + # accumulate per-turn metrics (token usage, tool counts), or notify + # external observability pipelines. + # ==================================================================== + turn_end: + - type: command + timeout: 5 + command: | + INPUT=$(cat) + SESSION_ID=$(echo "$INPUT" | jq -r '.session_id // "unknown"') + AGENT=$(echo "$INPUT" | jq -r '.agent_name // "unknown"') + REASON=$(echo "$INPUT" | jq -r '.reason // "unknown"') + echo "[$(date)] [←] $SESSION_ID $AGENT turn ended (reason=$REASON)" >> /tmp/agent-turns.log + # ==================================================================== # BEFORE-LLM-CALL - fires just before every model invocation, AFTER # turn_start has assembled the messages slice. Use for observability diff --git a/pkg/config/latest/types.go b/pkg/config/latest/types.go index 0408642f1..924c83b9c 100644 --- a/pkg/config/latest/types.go +++ b/pkg/config/latest/types.go @@ -1709,6 +1709,16 @@ type HooksConfig struct { // turn instead of bloating the message history on every resume. TurnStart []HookDefinition `json:"turn_start,omitempty" yaml:"turn_start,omitempty"` + // TurnEnd hooks run once per agent turn when the turn finishes — + // the symmetric counterpart of TurnStart. Fires no matter why the + // turn ended: a normal stop, an error, a hook-driven shutdown, the + // max_iterations limit, the loop detector, or context cancellation. + // The reason is reported in the hook input's reason field + // ("normal", "continue", "steered", "error", "canceled", + // "hook_blocked", "max_iterations", "loop_detected"). Observational; + // output is ignored. + TurnEnd []HookDefinition `json:"turn_end,omitempty" yaml:"turn_end,omitempty"` + // BeforeLLMCall hooks run just before each model call (after // turn_start). Use this for observability, cost guardrails, or // auditing without contributing system messages — turn_start is the @@ -1815,6 +1825,7 @@ func (h *HooksConfig) IsEmpty() bool { len(h.SessionStart) == 0 && len(h.UserPromptSubmit) == 0 && len(h.TurnStart) == 0 && + len(h.TurnEnd) == 0 && len(h.BeforeLLMCall) == 0 && len(h.AfterLLMCall) == 0 && len(h.SessionEnd) == 0 && @@ -1971,6 +1982,13 @@ func (h *HooksConfig) validate() error { } } + // Validate TurnEnd hooks + for i, hook := range h.TurnEnd { + if err := hook.validate("turn_end", i); err != nil { + return err + } + } + // Validate BeforeLLMCall hooks for i, hook := range h.BeforeLLMCall { if err := hook.validate("before_llm_call", i); err != nil { diff --git a/pkg/hooks/dispatch_test.go b/pkg/hooks/dispatch_test.go index b492f301a..957c427e1 100644 --- a/pkg/hooks/dispatch_test.go +++ b/pkg/hooks/dispatch_test.go @@ -25,6 +25,7 @@ var onlyHooks = map[EventType]*Config{ EventPostToolUse: {PostToolUse: matcherWildcard}, EventSessionStart: {SessionStart: trueHook}, EventTurnStart: {TurnStart: trueHook}, + EventTurnEnd: {TurnEnd: trueHook}, EventBeforeLLMCall: {BeforeLLMCall: trueHook}, EventAfterLLMCall: {AfterLLMCall: trueHook}, EventSessionEnd: {SessionEnd: trueHook}, diff --git a/pkg/hooks/executor.go b/pkg/hooks/executor.go index cb96546c3..e6b0e0b8f 100644 --- a/pkg/hooks/executor.go +++ b/pkg/hooks/executor.go @@ -79,6 +79,7 @@ func compileEvents(c *Config) map[EventType][]matcher { EventSessionStart: flat(c.SessionStart), EventUserPromptSubmit: flat(c.UserPromptSubmit), EventTurnStart: flat(c.TurnStart), + EventTurnEnd: flat(c.TurnEnd), EventBeforeLLMCall: flat(c.BeforeLLMCall), EventAfterLLMCall: flat(c.AfterLLMCall), EventSessionEnd: flat(c.SessionEnd), diff --git a/pkg/hooks/hooks_test.go b/pkg/hooks/hooks_test.go index 7ba3cd9bb..3cbc32de1 100644 --- a/pkg/hooks/hooks_test.go +++ b/pkg/hooks/hooks_test.go @@ -124,6 +124,13 @@ func TestConfigIsEmpty(t *testing.T) { }, expected: false, }, + { + name: "with turn_end", + config: Config{ + TurnEnd: []Hook{{Type: HookTypeCommand}}, + }, + expected: false, + }, } for _, tt := range tests { @@ -706,7 +713,7 @@ func TestPlainStdoutBecomesAdditionalContext(t *testing.T) { observationalEvents := []EventType{ EventBeforeLLMCall, EventAfterLLMCall, EventOnError, EventOnMaxIterations, EventNotification, EventOnUserInput, EventSessionEnd, - EventBeforeCompaction, EventAfterCompaction, + EventBeforeCompaction, EventAfterCompaction, EventTurnEnd, } for _, ev := range contextEvents { @@ -750,6 +757,8 @@ func configWithFlatHook(ev EventType, h Hook) *Config { cfg.SessionStart = []Hook{h} case EventTurnStart: cfg.TurnStart = []Hook{h} + case EventTurnEnd: + cfg.TurnEnd = []Hook{h} case EventBeforeLLMCall: cfg.BeforeLLMCall = []Hook{h} case EventAfterLLMCall: diff --git a/pkg/hooks/types.go b/pkg/hooks/types.go index 45d0011e8..77459e406 100644 --- a/pkg/hooks/types.go +++ b/pkg/hooks/types.go @@ -51,6 +51,16 @@ const ( // EventTurnStart fires at the start of every agent turn (each model // call). AdditionalContext is injected transiently and never persisted. EventTurnStart EventType = "turn_start" + // EventTurnEnd fires once per agent turn (each model call) when the + // turn finishes — symmetric to [EventTurnStart]. It runs no matter + // why the turn ended: a normal stop, an error, a hook-driven + // shutdown, the iteration limit, the loop detector, or context + // cancellation. The reason is reported in [Input.Reason] using one + // of the turnEndReason* constants in the runtime package ("normal", + // "continue", "steered", "error", "canceled", "hook_blocked", + // "max_iterations", "loop_detected"). Observational; output is + // ignored. + EventTurnEnd EventType = "turn_end" // EventBeforeLLMCall fires immediately before each model call. // Returning decision="block" (or continue=false / exit code 2) // stops the run loop before the model is invoked — useful for hard @@ -209,6 +219,8 @@ type Input struct { // PreCompact specific: "manual", "auto", "overflow", "tool_overflow". Source string `json:"source,omitempty"` // SessionEnd specific: "clear", "logout", "prompt_input_exit", "other". + // TurnEnd specific: "normal", "continue", "steered", "error", + // "canceled", "hook_blocked", "max_iterations", "loop_detected". Reason string `json:"reason,omitempty"` // Stop / AfterLLMCall / SubagentStop: the model's final response content. StopResponse string `json:"stop_response,omitempty"` diff --git a/pkg/runtime/hooks.go b/pkg/runtime/hooks.go index 1d7460a5f..697d5e149 100644 --- a/pkg/runtime/hooks.go +++ b/pkg/runtime/hooks.go @@ -132,6 +132,51 @@ func (r *LocalRuntime) executeTurnStartHooks(ctx context.Context, sess *session. }, events)) } +// Reason values reported in [hooks.Input.Reason] when [hooks.EventTurnEnd] +// fires. The runtime guarantees that turn_end runs once per turn that +// fired turn_start, no matter how the turn exited; the reason classifies +// which exit path the runtime took. +const ( + // turnEndReasonNormal — the model finished the turn cleanly and the + // run loop is about to break out (no further iterations). + turnEndReasonNormal = "normal" + // turnEndReasonContinue — the turn finished cleanly and the loop is + // about to start a new iteration (e.g. after tool calls, or after a + // stop with a queued follow-up). + turnEndReasonContinue = "continue" + // turnEndReasonSteered — the turn finished and was followed by + // drained steered messages, prompting a new iteration. + turnEndReasonSteered = "steered" + // turnEndReasonError — the model call failed and the runtime is + // shutting down the run (handleStreamError returned non-retry). + turnEndReasonError = "error" + // turnEndReasonCanceled — the turn ended because the stream context + // was cancelled (e.g. user Ctrl+C). Includes deferred firing on + // any return path while ctx is done. + turnEndReasonCanceled = "canceled" + // turnEndReasonHookBlocked — a hook (before_llm_call or + // post_tool_use) signalled run termination via a deny verdict. + turnEndReasonHookBlocked = "hook_blocked" + // turnEndReasonLoopDetected — the consecutive-tool-call loop + // detector terminated the turn. + turnEndReasonLoopDetected = "loop_detected" + // turnEndReasonMaxIterations — the iteration limit was reached + // during this turn. + turnEndReasonMaxIterations = "max_iterations" +) + +// executeTurnEndHooks fires turn_end once per turn — symmetric to +// turn_start. Observational; the result is discarded. Reason is one +// of the turnEndReason* constants above and is reported via +// [hooks.Input.Reason] so handlers can branch on the exit path. +func (r *LocalRuntime) executeTurnEndHooks(ctx context.Context, sess *session.Session, a *agent.Agent, reason string, events chan Event) { + r.dispatchHook(ctx, a, hooks.EventTurnEnd, &hooks.Input{ + SessionID: sess.ID, + AgentName: a.Name(), + Reason: reason, + }, events) +} + // contextMessages converts a context-providing hook's AdditionalContext // into a one-element transient system-message slice ready to thread // through [session.Session.GetMessages]. Returns nil for empty results diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index fb7b3f970..d1da4f050 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -15,6 +15,7 @@ import ( "github.com/docker/docker-agent/pkg/agent" "github.com/docker/docker-agent/pkg/chat" "github.com/docker/docker-agent/pkg/compaction" + "github.com/docker/docker-agent/pkg/model/provider" "github.com/docker/docker-agent/pkg/modelsdev" "github.com/docker/docker-agent/pkg/runtime/toolexec" "github.com/docker/docker-agent/pkg/session" @@ -314,11 +315,6 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, } slog.Debug("Starting conversation loop iteration", "agent", a.Name()) - streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes( - attribute.String("agent", a.Name()), - attribute.String("session.id", sess.ID), - )) - model := a.Model() // Per-tool model routing: use a cheaper model for this turn @@ -364,177 +360,292 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, events) } - // Run turn_start hooks BEFORE building messages so their - // AdditionalContext, alongside the session_start extras captured - // once at the top of RunStream, can be spliced after the invariant - // cache checkpoint and before the conversation history. Neither - // hook's output is persisted, so per-turn signals (date, prompt - // files) refresh every turn while session-level context (cwd, OS, - // arch) stays stable — all without bloating the stored history. - turnStartMsgs := r.executeTurnStartHooks(ctx, sess, a, events) - messages := sess.GetMessages(a, slices.Concat(sessionStartMsgs, userPromptMsgs, turnStartMsgs)...) - slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages)) - - // before_llm_call hooks fire just before the model is invoked. - // A terminating verdict (e.g. from the max_iterations builtin) - // stops the run loop here, before any tokens are spent. Hooks - // may also rewrite the outgoing messages by returning - // HookSpecificOutput.UpdatedMessages — the redact_secrets - // builtin uses this to scrub secrets from chat content before - // the LLM ever sees it. The rewrite happens BEFORE the - // runtime's Go-only message transforms so a hook that drops a - // message (e.g. a custom "strip system reminders") doesn't get - // silently overridden by a transform later in the chain. - stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, messages) - if stop { - slog.Warn("before_llm_call hook signalled run termination", - "agent", a.Name(), "session_id", sess.ID, "reason", msg) - r.emitHookDrivenShutdown(ctx, a, sess, msg, events) - streamSpan.End() + // Everything from turn_start onwards is wrapped in a closure so a + // single deferred turn_end hook fires on every exit path: a normal + // stop, a follow-up continue, an error, a hook-driven shutdown, the + // loop-detector tripping, ctx cancellation, even a panic. The + // closure returns the loop control directive and the reason string + // reported via [hooks.Input.Reason]; the deferred dispatch then runs + // AFTER the closure body has assigned both, so callers see the same + // reason the runtime took. ctrl drives the outer for-loop's + // continue-or-exit decision. + ctrl := r.runTurn(ctx, sess, a, m, model, modelID, contextLimit, sessionSpan, + slices.Concat(sessionStartMsgs, userPromptMsgs), + agentTools, loopDetector, &overflowCompactions, &toolModelOverride, events) + switch ctrl { + case turnContinue: + continue + case turnExit: return } - if rewritten != nil { - messages = rewritten - } + } +} - // Apply registered before_llm_call message transforms (e.g. - // strip_unsupported_modalities for text-only models, plus any - // embedder-supplied redactor / scrubber registered via - // WithMessageTransform). Runs after the gate so a transform - // failure cannot waste the gate's allow verdict. modelID is - // passed explicitly so transforms see the actual model the - // loop chose (per-tool override + alloy-mode selection), - // not whatever a fresh agent.Model() call would re-randomize. - messages = r.applyBeforeLLMCallTransforms(ctx, sess, a, modelID, messages) - - // Try primary model with fallback chain if configured - res, usedModel, err := r.fallback.execute(streamCtx, a, model, messages, agentTools, sess, m, events) - if err != nil { - outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, &overflowCompactions, streamSpan, events) +// turnControl is what [LocalRuntime.runTurn] reports back to the outer +// run-stream loop: continue to the next iteration, or exit the loop +// entirely. break and return are equivalent here because the loop is +// the last statement in runStreamLoop, so we collapse them into one. +type turnControl int + +const ( + // turnContinue — outer loop should re-iterate (e.g. follow-up, + // drained steered, retry after stream error, more tool calls). + turnContinue turnControl = iota + // turnExit — outer loop should stop and let runStreamLoop’s + // deferred cleanup run (normal stop, error, hook-blocked, + // loop-detected, ctx cancelled). + turnExit +) + +// runTurn performs one iteration of the run-stream loop, from +// turn_start onwards. Wrapping the body in its own function exists for +// one reason: a deferred call can fire turn_end on every exit path — a +// normal stop, an error from handleStreamError, a hook-driven +// shutdown, the loop detector, context cancellation, even a panic — +// without sprinkling explicit dispatch calls at every return / break / +// continue. endReason is captured by reference so each branch can set +// it before falling out; the deferred call reads it AFTER the body has +// assigned the final value. +// +// The outer loop owns persistent per-stream state (iteration counter, +// session-start extras, agent-switch tracking); per-turn state that +// needs to survive into the next iteration (overflowCompactions, +// toolModelOverride) is passed by pointer so this function can mutate +// it the same way the inline body did. +func (r *LocalRuntime) runTurn( + ctx context.Context, + sess *session.Session, + a *agent.Agent, + m *modelsdev.Model, + model provider.Provider, + modelID string, + contextLimit int64, + sessionSpan trace.Span, + priorExtras []chat.Message, + agentTools []tools.Tool, + loopDetector *toolexec.LoopDetector, + overflowCompactions *int, + toolModelOverride *string, + events chan Event, +) (ctrl turnControl) { + streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes( + attribute.String("agent", a.Name()), + attribute.String("session.id", sess.ID), + )) + // streamSpan ends inline at the natural points (success path before + // recordAssistantMessage, error path after handleStreamError) so its + // duration tracks the model call only, not the whole iteration. The + // boolean prevents a double-End on paths that already closed it. + spanEnded := false + endStreamSpan := func() { + if !spanEnded { streamSpan.End() - if outcome == streamErrorRetry { - continue - } - return + spanEnded = true + } + } + defer endStreamSpan() + + // endReason is set by every exit branch below and read by the + // deferred turn_end dispatch. Default = normal so a clean fall- + // through (model produced output, more tool calls, no hook + // blocked) reports "continue" or "normal" depending on which + // branch ran last. Branches overwrite this before returning. + endReason := turnEndReasonNormal + defer func() { + if ctxErr := ctx.Err(); ctxErr != nil && endReason == turnEndReasonNormal { + // Context cancellation is detected after the fact: a + // branch that returned early because of ctx.Err overrides + // the default, but a panic-recovered branch may not have + // had the chance, so re-check here. + endReason = turnEndReasonCanceled } + // Use a non-cancellable context so turn_end runs even when + // the stream was interrupted (Ctrl+C, parent cancellation), + // matching the same guarantee session_end has at the + // finalizeEventChannel level. + r.executeTurnEndHooks(context.WithoutCancel(ctx), sess, a, endReason, events) + }() + + // Run turn_start hooks BEFORE building messages so their + // AdditionalContext, alongside the session_start extras captured + // once at the top of RunStream, can be spliced after the invariant + // cache checkpoint and before the conversation history. Neither + // hook's output is persisted, so per-turn signals (date, prompt + // files) refresh every turn while session-level context (cwd, OS, + // arch) stays stable — all without bloating the stored history. + turnStartMsgs := r.executeTurnStartHooks(ctx, sess, a, events) + messages := sess.GetMessages(a, slices.Concat(priorExtras, turnStartMsgs)...) + slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages)) + + // before_llm_call hooks fire just before the model is invoked. + // A terminating verdict (e.g. from the max_iterations builtin) + // stops the run loop here, before any tokens are spent. Hooks + // may also rewrite the outgoing messages by returning + // HookSpecificOutput.UpdatedMessages — the redact_secrets + // builtin uses this to scrub secrets from chat content before + // the LLM ever sees it. The rewrite happens BEFORE the + // runtime's Go-only message transforms so a hook that drops a + // message (e.g. a custom "strip system reminders") doesn't get + // silently overridden by a transform later in the chain. + stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, messages) + if stop { + slog.Warn("before_llm_call hook signalled run termination", + "agent", a.Name(), "session_id", sess.ID, "reason", msg) + r.emitHookDrivenShutdown(ctx, a, sess, msg, events) + endStreamSpan() + endReason = turnEndReasonHookBlocked + return turnExit + } + if rewritten != nil { + messages = rewritten + } + + // Apply registered before_llm_call message transforms (e.g. + // strip_unsupported_modalities for text-only models, plus any + // embedder-supplied redactor / scrubber registered via + // WithMessageTransform). Runs after the gate so a transform + // failure cannot waste the gate's allow verdict. modelID is + // passed explicitly so transforms see the actual model the + // loop chose (per-tool override + alloy-mode selection), + // not whatever a fresh agent.Model() call would re-randomize. + messages = r.applyBeforeLLMCallTransforms(ctx, sess, a, modelID, messages) + + // Try primary model with fallback chain if configured + res, usedModel, err := r.fallback.execute(streamCtx, a, model, messages, agentTools, sess, m, events) + if err != nil { + outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, overflowCompactions, streamSpan, events) + endStreamSpan() + endReason = turnEndReasonError + if outcome == streamErrorRetry { + return turnContinue + } + return turnExit + } - // A successful model call resets the overflow compaction counter. - overflowCompactions = 0 + // A successful model call resets the overflow compaction counter. + *overflowCompactions = 0 - // after_llm_call hooks fire on success only; failed calls - // fire on_error above. The assistant text content is passed - // via stop_response, matching the stop event's payload, so - // handlers can reuse the same parsing. - r.executeAfterLLMCallHooks(ctx, sess, a, res.Content) + // after_llm_call hooks fire on success only; failed calls + // fire on_error above. The assistant text content is passed + // via stop_response, matching the stop event's payload, so + // handlers can reuse the same parsing. + r.executeAfterLLMCallHooks(ctx, sess, a, res.Content) - if usedModel != nil && usedModel.ID() != model.ID() { - slog.Info("Used fallback model", "agent", a.Name(), "primary", model.ID(), "used", usedModel.ID()) - events <- AgentInfo(a.Name(), usedModel.ID(), a.Description(), a.WelcomeMessage()) - } - streamSpan.SetAttributes( - attribute.Int("tool.calls", len(res.Calls)), - attribute.Int("content.length", len(res.Content)), - attribute.Bool("stopped", res.Stopped), - ) - streamSpan.End() - slog.Debug("Stream processed", "agent", a.Name(), "tool_calls", len(res.Calls), "content_length", len(res.Content), "stopped", res.Stopped) - - msgUsage := r.recordAssistantMessage(sess, a, res, agentTools, modelID, m, events) - - usage := SessionUsage(sess, contextLimit) - usage.LastMessage = msgUsage - events <- NewTokenUsageEvent(sess.ID, a.Name(), usage) - - // Record the message count before tool calls so we can - // measure how much content was added by tool results. - messageCountBeforeTools := len(sess.GetAllMessages()) - - stopRun, stopMsg := r.processToolCalls(ctx, sess, res.Calls, agentTools, events) - - // Re-probe toolsets after tool calls: an install/setup tool call may - // have made a previously-unavailable LSP or MCP connectable. reprobe() - // calls ensureToolSetsAreStarted, emits recovery notices, and updates - // the TUI tool-count immediately. - // - // The new tools are picked up by the next iteration's getTools() call - // at the top of this loop, so the model sees them on its very next - // response — within the same user turn, without requiring a new user - // message. reprobe's return value is intentionally discarded here; - // the top-of-loop getTools() is the authoritative source. - if len(res.Calls) > 0 { - r.reprobe(ctx, sess, a, agentTools, sessionSpan, events) - } + if usedModel != nil && usedModel.ID() != model.ID() { + slog.Info("Used fallback model", "agent", a.Name(), "primary", model.ID(), "used", usedModel.ID()) + events <- AgentInfo(a.Name(), usedModel.ID(), a.Description(), a.WelcomeMessage()) + } + streamSpan.SetAttributes( + attribute.Int("tool.calls", len(res.Calls)), + attribute.Int("content.length", len(res.Content)), + attribute.Bool("stopped", res.Stopped), + ) + endStreamSpan() + slog.Debug("Stream processed", "agent", a.Name(), "tool_calls", len(res.Calls), "content_length", len(res.Content), "stopped", res.Stopped) - // Check for degenerate tool call loops - if loopDetector.Record(res.Calls) { - toolName := "unknown" - if len(res.Calls) > 0 { - toolName = res.Calls[0].Function.Name - } - consecutive := loopDetector.Consecutive() - slog.Warn("Repetitive tool call loop detected", - "agent", a.Name(), "tool", toolName, - "consecutive", consecutive, "session_id", sess.ID) - errMsg := fmt.Sprintf( - "Agent terminated: detected %d consecutive identical calls to %s. "+ - "This indicates a degenerate loop where the model is not making progress.", - consecutive, toolName) - events <- Error(errMsg) - r.notifyError(ctx, a, sess.ID, errMsg) - loopDetector.Reset() - return - } + msgUsage := r.recordAssistantMessage(sess, a, res, agentTools, modelID, m, events) - // post_tool_use hook signalled run termination via a deny - // verdict (decision="block" / continue=false / exit 2). - // User-authored hooks can use this to stop the run; the - // runtime fans out the standard Error / notification / - // on_error stanzas before exiting. - if stopRun { - slog.Warn("post_tool_use hook signalled run termination", - "agent", a.Name(), "session_id", sess.ID, "reason", stopMsg) - r.emitHookDrivenShutdown(ctx, a, sess, stopMsg, events) - return + usage := SessionUsage(sess, contextLimit) + usage.LastMessage = msgUsage + events <- NewTokenUsageEvent(sess.ID, a.Name(), usage) + + // Record the message count before tool calls so we can + // measure how much content was added by tool results. + messageCountBeforeTools := len(sess.GetAllMessages()) + + stopRun, stopMsg := r.processToolCalls(ctx, sess, res.Calls, agentTools, events) + + // Re-probe toolsets after tool calls: an install/setup tool call may + // have made a previously-unavailable LSP or MCP connectable. reprobe() + // calls ensureToolSetsAreStarted, emits recovery notices, and updates + // the TUI tool-count immediately. + // + // The new tools are picked up by the next iteration's getTools() call + // at the top of this loop, so the model sees them on its very next + // response — within the same user turn, without requiring a new user + // message. reprobe's return value is intentionally discarded here; + // the top-of-loop getTools() is the authoritative source. + if len(res.Calls) > 0 { + r.reprobe(ctx, sess, a, agentTools, sessionSpan, events) + } + + // Check for degenerate tool call loops + if loopDetector.Record(res.Calls) { + toolName := "unknown" + if len(res.Calls) > 0 { + toolName = res.Calls[0].Function.Name } + consecutive := loopDetector.Consecutive() + slog.Warn("Repetitive tool call loop detected", + "agent", a.Name(), "tool", toolName, + "consecutive", consecutive, "session_id", sess.ID) + errMsg := fmt.Sprintf( + "Agent terminated: detected %d consecutive identical calls to %s. "+ + "This indicates a degenerate loop where the model is not making progress.", + consecutive, toolName) + events <- Error(errMsg) + r.notifyError(ctx, a, sess.ID, errMsg) + loopDetector.Reset() + endReason = turnEndReasonLoopDetected + return turnExit + } + + // post_tool_use hook signalled run termination via a deny + // verdict (decision="block" / continue=false / exit 2). + // User-authored hooks can use this to stop the run; the + // runtime fans out the standard Error / notification / + // on_error stanzas before exiting. + if stopRun { + slog.Warn("post_tool_use hook signalled run termination", + "agent", a.Name(), "session_id", sess.ID, "reason", stopMsg) + r.emitHookDrivenShutdown(ctx, a, sess, stopMsg, events) + endReason = turnEndReasonHookBlocked + return turnExit + } + + // Record per-toolset model override for the next LLM turn. + *toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools) + + // Drain steer messages that arrived during tool calls. + if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained { + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + endReason = turnEndReasonSteered + return turnContinue + } - // Record per-toolset model override for the next LLM turn. - toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools) + if res.Stopped { + slog.Debug("Conversation stopped", "agent", a.Name()) + r.executeStopHooks(ctx, sess, a, res.Content, events) - // Drain steer messages that arrived during tool calls. + // Re-check steer queue: closes the race between the mid-loop drain and this stop. if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained { r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) - continue + endReason = turnEndReasonSteered + return turnContinue } - if res.Stopped { - slog.Debug("Conversation stopped", "agent", a.Name()) - r.executeStopHooks(ctx, sess, a, res.Content, events) - - // Re-check steer queue: closes the race between the mid-loop drain and this stop. - if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained { - r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) - continue - } - - // --- FOLLOW-UP: end-of-turn injection --- - // Pop exactly one follow-up message. Unlike steered - // messages, follow-ups are plain user messages that start - // a new turn — the model sees them as fresh input, not a - // mid-stream interruption. Each follow-up gets a full - // undivided agent turn. - if followUp, ok := r.followUpQueue.Dequeue(ctx); ok { - userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) - sess.AddMessage(userMsg) - events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) - r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) - continue // re-enter the loop for a new turn - } - - break + // --- FOLLOW-UP: end-of-turn injection --- + // Pop exactly one follow-up message. Unlike steered + // messages, follow-ups are plain user messages that start + // a new turn — the model sees them as fresh input, not a + // mid-stream interruption. Each follow-up gets a full + // undivided agent turn. + if followUp, ok := r.followUpQueue.Dequeue(ctx); ok { + userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) + sess.AddMessage(userMsg) + events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + endReason = turnEndReasonContinue + return turnContinue // re-enter the loop for a new turn } - r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + endReason = turnEndReasonNormal + return turnExit } + + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + endReason = turnEndReasonContinue + return turnContinue } // Run executes the agent loop synchronously and returns the final session diff --git a/pkg/runtime/turn_end_test.go b/pkg/runtime/turn_end_test.go new file mode 100644 index 000000000..0390bcc36 --- /dev/null +++ b/pkg/runtime/turn_end_test.go @@ -0,0 +1,384 @@ +package runtime + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/docker/docker-agent/pkg/agent" + "github.com/docker/docker-agent/pkg/chat" + "github.com/docker/docker-agent/pkg/config/latest" + "github.com/docker/docker-agent/pkg/hooks" + "github.com/docker/docker-agent/pkg/model/provider/base" + "github.com/docker/docker-agent/pkg/session" + "github.com/docker/docker-agent/pkg/team" + "github.com/docker/docker-agent/pkg/tools" +) + +// turnEndRecorder is a thread-safe recorder for turn_end fires; tests +// inspect the captured reasons after RunStream drains. We capture the +// reason via the input's Reason field — the runtime sets it to one of +// the turnEndReason* constants depending on which exit path the loop +// took. +type turnEndRecorder struct { + mu sync.Mutex + reasons []string +} + +func (r *turnEndRecorder) record(reason string) { + r.mu.Lock() + defer r.mu.Unlock() + r.reasons = append(r.reasons, reason) +} + +func (r *turnEndRecorder) snapshot() []string { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]string, len(r.reasons)) + copy(out, r.reasons) + return out +} + +// installTurnEndRecorder registers a builtin turn_end hook on rt that +// captures the reason on every fire. It returns the recorder so the +// test can inspect the captured fires after running. +func installTurnEndRecorder(t *testing.T, rt *LocalRuntime, name string) *turnEndRecorder { + t.Helper() + rec := &turnEndRecorder{} + require.NoError(t, rt.hooksRegistry.RegisterBuiltin( + name, + func(_ context.Context, in *hooks.Input, _ []string) (*hooks.Output, error) { + rec.record(in.Reason) + return nil, nil + }, + )) + return rec +} + +// TestTurnEndFiresOnNormalStop pins the contract that turn_end fires +// once when the model stops cleanly without any tool calls or +// follow-up — the most common path through the loop. The reason +// reported is one of the turnEndReason* constants; "normal" is the +// canonical exit, but "continue" is also acceptable on this branch +// because both indicate a clean fall-through. +func TestTurnEndFiresOnNormalStop(t *testing.T) { + t.Parallel() + + stream := newStreamBuilder(). + AddContent("Hello"). + AddStopWithUsage(3, 2). + Build() + prov := &mockProvider{id: "test/mock-model", stream: stream} + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithHooks(&latest.HooksConfig{ + TurnEnd: []latest.HookDefinition{ + {Type: "builtin", Command: "test-turn-end-normal"}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + rec := installTurnEndRecorder(t, rt, "test-turn-end-normal") + + sess := session.New(session.WithUserMessage("hi")) + for range rt.RunStream(t.Context(), sess) { + } + + reasons := rec.snapshot() + require.Len(t, reasons, 1, "turn_end must fire exactly once for a single-turn clean stop") + assert.Equal(t, "normal", reasons[0], + "a clean res.Stopped with no follow-up must report 'normal'") +} + +// TestTurnEndFiresOnHookBlocked pins the contract that turn_end fires +// with reason="hook_blocked" when before_llm_call signals run +// termination via a deny verdict. The runtime emits its hook-driven +// shutdown stanzas and turn_end runs after — observability over the +// actual exit reason. +func TestTurnEndFiresOnHookBlocked(t *testing.T) { + t.Parallel() + + stream := newStreamBuilder(). + AddContent("Hello"). + AddStopWithUsage(3, 2). + Build() + prov := &mockProvider{id: "test/mock-model", stream: stream} + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithHooks(&latest.HooksConfig{ + BeforeLLMCall: []latest.HookDefinition{ + {Type: "builtin", Command: "test-blocker"}, + }, + TurnEnd: []latest.HookDefinition{ + {Type: "builtin", Command: "test-turn-end-blocked"}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + // before_llm_call returns a deny verdict, terminating the run. + require.NoError(t, rt.hooksRegistry.RegisterBuiltin( + "test-blocker", + func(_ context.Context, _ *hooks.Input, _ []string) (*hooks.Output, error) { + return &hooks.Output{ + Decision: hooks.DecisionBlockValue, + Reason: "test-block", + }, nil + }, + )) + + rec := installTurnEndRecorder(t, rt, "test-turn-end-blocked") + + sess := session.New(session.WithUserMessage("hi")) + for range rt.RunStream(t.Context(), sess) { + } + + reasons := rec.snapshot() + require.Len(t, reasons, 1, "turn_end must fire even when the run was blocked") + assert.Equal(t, "hook_blocked", reasons[0], + "turn_end must report hook_blocked when before_llm_call denied the call") +} + +// TestTurnEndFiresOnStreamError pins the contract that turn_end fires +// with reason="error" when fallback.execute returns an error and the +// runtime exits the loop via handleStreamError. The deferred dispatch +// is what makes this contract robust: explicit dispatch calls would +// have to be sprinkled at every error-path return, and any miss would +// silently break observability. +func TestTurnEndFiresOnStreamError(t *testing.T) { + t.Parallel() + + prov := &mockProviderWithError{id: "test/error-model"} + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithHooks(&latest.HooksConfig{ + TurnEnd: []latest.HookDefinition{ + {Type: "builtin", Command: "test-turn-end-error"}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + rec := installTurnEndRecorder(t, rt, "test-turn-end-error") + + sess := session.New(session.WithUserMessage("hi")) + for range rt.RunStream(t.Context(), sess) { + } + + reasons := rec.snapshot() + require.Len(t, reasons, 1, "turn_end must fire on stream error") + assert.Equal(t, "error", reasons[0], + "turn_end must report 'error' when handleStreamError exits the loop") +} + +// blockingProvider returns a stream whose Recv blocks until the +// provided context is cancelled, modelling a slow upstream that +// receives a cancel mid-stream. +type blockingProvider struct { + id string + release chan struct{} +} + +func (p *blockingProvider) ID() string { return p.id } + +func (p *blockingProvider) CreateChatCompletionStream(ctx context.Context, _ []chat.Message, _ []tools.Tool) (chat.MessageStream, error) { + return &blockingStream{ctx: ctx, release: p.release}, nil +} + +func (p *blockingProvider) BaseConfig() base.Config { return base.Config{} } +func (p *blockingProvider) MaxTokens() int { return 0 } + +type blockingStream struct { + ctx context.Context + release chan struct{} +} + +func (s *blockingStream) Recv() (chat.MessageStreamResponse, error) { + // Block until the parent context is cancelled or a release signal + // is received. On cancellation we surface ctx.Err() so the + // runtime's fallback.execute treats it as a stream error and + // proceeds to handleStreamError, which exits the loop and + // triggers the deferred turn_end dispatch. + select { + case <-s.ctx.Done(): + return chat.MessageStreamResponse{}, s.ctx.Err() + case <-s.release: + return chat.MessageStreamResponse{}, errors.New("released without data") + } +} + +func (s *blockingStream) Close() {} + +// TestTurnEndFiresOnContextCancellation pins the critical contract +// that turn_end fires even when the parent context is cancelled +// mid-turn. The runtime uses context.WithoutCancel internally for the +// turn_end dispatch so handlers run to completion on Ctrl+C — without +// this, a session_end-only observability strategy would silently miss +// the per-turn lifecycle data. +// +// We synchronise the cancellation against turn_start to avoid the +// pre-turn-start race where the loop's ctx.Err() guard exits before +// turn_start has fired (correctly skipping turn_end). A turn_start +// hook signals that the loop has entered the turn body; the test +// then cancels and asserts turn_end fired with a cancellation-class +// reason. +func TestTurnEndFiresOnContextCancellation(t *testing.T) { + t.Parallel() + + prov := &blockingProvider{id: "test/blocking-model", release: make(chan struct{})} + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithHooks(&latest.HooksConfig{ + TurnStart: []latest.HookDefinition{ + {Type: "builtin", Command: "test-turn-start-signal"}, + }, + TurnEnd: []latest.HookDefinition{ + {Type: "builtin", Command: "test-turn-end-cancel"}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + // turn_start fires AFTER the loop's ctx.Err() guard, so by the + // time this hook runs the loop is committed to the turn and + // turn_end is on the deferred path. Closing turnStartFired + // unblocks the test goroutine which then cancels the context. + turnStartFired := make(chan struct{}) + var once sync.Once + require.NoError(t, rt.hooksRegistry.RegisterBuiltin( + "test-turn-start-signal", + func(_ context.Context, _ *hooks.Input, _ []string) (*hooks.Output, error) { + once.Do(func() { close(turnStartFired) }) + return nil, nil + }, + )) + + rec := installTurnEndRecorder(t, rt, "test-turn-end-cancel") + + ctx, cancel := context.WithCancel(t.Context()) + + sess := session.New(session.WithUserMessage("hi")) + + done := make(chan struct{}) + go func() { + defer close(done) + for range rt.RunStream(ctx, sess) { + } + }() + + <-turnStartFired // model call is in flight; turn_end is on the defer path + cancel() + <-done + + reasons := rec.snapshot() + require.Len(t, reasons, 1, "turn_end must fire exactly once on cancellation") + // Either "canceled" (deferred guard caught ctx.Err) or "error" + // (handleStreamError ran first and set 'error') is acceptable — + // both legitimately indicate the cancellation path. The + // invariant under test is "turn_end fires", not which specific + // reason wins this micro-race. + assert.Contains(t, []string{"canceled", "error"}, reasons[0], + "turn_end must report a cancellation-class reason") +} + +// TestTurnEndFiresEveryIteration pins the symmetric contract with +// turn_start: when a tool call provokes a follow-on iteration, +// turn_end fires once per iteration. A single missed dispatch would +// let stateful turn_end handlers (e.g. a metrics span that closes +// per turn) silently leak. +func TestTurnEndFiresEveryIteration(t *testing.T) { + t.Parallel() + + // Two iterations: the first emits a tool call (stop reason = + // tool_calls so the loop re-enters), the second stops cleanly. + turn1 := newStreamBuilder(). + AddToolCallName("call_1", "noop"). + AddToolCallArguments("call_1", `{}`). + AddToolCallStopWithUsage(3, 2). + Build() + turn2 := newStreamBuilder(). + AddContent("done"). + AddStopWithUsage(3, 2). + Build() + + prov := &queueProvider{ + id: "test/mock-model", + streams: []chat.MessageStream{turn1, turn2}, + } + + noopTool := tools.Tool{ + Name: "noop", + Parameters: map[string]any{}, + Handler: func(_ context.Context, _ tools.ToolCall) (*tools.ToolCallResult, error) { + return tools.ResultSuccess("ok"), nil + }, + } + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithToolSets(newStubToolSet(nil, []tools.Tool{noopTool}, nil)), + agent.WithHooks(&latest.HooksConfig{ + TurnEnd: []latest.HookDefinition{ + {Type: "builtin", Command: "test-turn-end-iter"}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + rec := installTurnEndRecorder(t, rt, "test-turn-end-iter") + + sess := session.New( + session.WithUserMessage("hi"), + session.WithToolsApproved(true), + ) + for range rt.RunStream(t.Context(), sess) { + } + + reasons := rec.snapshot() + require.Len(t, reasons, 2, + "turn_end must fire once per loop iteration; one missing dispatch leaks state") + assert.Equal(t, "continue", reasons[0], + "intermediate iteration (after tool calls) must report 'continue'") + assert.Equal(t, "normal", reasons[1], + "final iteration must report 'normal'") +} + From 0df5837a83fc1ca29cbb41223a7d02be75123b2a Mon Sep 17 00:00:00 2001 From: Djordje Lukic Date: Wed, 29 Apr 2026 11:37:21 +0000 Subject: [PATCH 2/3] Fix lint findings on turn_end PR - drop unused turnEndReasonMaxIterations (max-iterations exits before turn_start, so turn_end never fires for that path) and update doc comments accordingly - rework blockingProvider/blockingStream to capture ctx.Done()/ctx.Err() at construction time instead of stashing context.Context in a struct field (containedctx) - gci/gofumpt fixups Assisted-By: docker-agent --- agent-schema.json | 2 +- examples/hooks.yaml | 7 +++--- pkg/config/latest/types.go | 9 ++++--- pkg/hooks/types.go | 13 +++++----- pkg/runtime/hooks.go | 3 --- pkg/runtime/turn_end_test.go | 46 ++++++++++++++++++++---------------- 6 files changed, 39 insertions(+), 41 deletions(-) diff --git a/agent-schema.json b/agent-schema.json index f43c3c8f1..758f5a361 100644 --- a/agent-schema.json +++ b/agent-schema.json @@ -622,7 +622,7 @@ }, "turn_end": { "type": "array", - "description": "Hooks that run once per agent turn when the turn finishes — the symmetric counterpart of turn_start. Fires no matter why the turn ended: a normal stop, an error, a hook-driven shutdown, the max_iterations limit, the loop detector, or context cancellation. The reason is reported via the hook input's reason field ('normal', 'continue', 'steered', 'error', 'canceled', 'hook_blocked', 'max_iterations', 'loop_detected'). Observational; output is ignored.", + "description": "Hooks that run once per agent turn when the turn finishes — the symmetric counterpart of turn_start. Fires no matter why the turn ended: a normal stop, an error, a hook-driven shutdown, the loop detector, or context cancellation. The reason is reported via the hook input's reason field ('normal', 'continue', 'steered', 'error', 'canceled', 'hook_blocked', 'loop_detected'). Observational; output is ignored.", "items": { "$ref": "#/definitions/HookDefinition" } diff --git a/examples/hooks.yaml b/examples/hooks.yaml index 2392a66fd..c74ac5ea5 100644 --- a/examples/hooks.yaml +++ b/examples/hooks.yaml @@ -230,9 +230,9 @@ agents: # ==================================================================== # TURN-END - runs ONCE per turn after the iteration finishes — the # symmetric counterpart of turn_start. Fires no matter why the turn - # ended: a normal stop, an error, a hook-driven shutdown, the - # max_iterations limit, the loop detector, or context cancellation. - # The reason is reported via the .reason field: + # ended: a normal stop, an error, a hook-driven shutdown, the loop + # detector, or context cancellation. The reason is reported via + # the .reason field: # # normal - model finished cleanly, no follow-up # continue - more iterations to come (e.g. tool calls) @@ -241,7 +241,6 @@ agents: # canceled - context cancellation (Ctrl+C, parent ctx done) # hook_blocked - before_llm_call or post_tool_use signalled stop # loop_detected - degenerate consecutive-tool-call loop - # max_iterations - iteration cap was reached during this turn # # Observational; the result is ignored. Use it to time turns, # accumulate per-turn metrics (token usage, tool counts), or notify diff --git a/pkg/config/latest/types.go b/pkg/config/latest/types.go index 924c83b9c..82a0323d4 100644 --- a/pkg/config/latest/types.go +++ b/pkg/config/latest/types.go @@ -1712,11 +1712,10 @@ type HooksConfig struct { // TurnEnd hooks run once per agent turn when the turn finishes — // the symmetric counterpart of TurnStart. Fires no matter why the // turn ended: a normal stop, an error, a hook-driven shutdown, the - // max_iterations limit, the loop detector, or context cancellation. - // The reason is reported in the hook input's reason field - // ("normal", "continue", "steered", "error", "canceled", - // "hook_blocked", "max_iterations", "loop_detected"). Observational; - // output is ignored. + // loop detector, or context cancellation. The reason is reported + // in the hook input's reason field ("normal", "continue", + // "steered", "error", "canceled", "hook_blocked", + // "loop_detected"). Observational; output is ignored. TurnEnd []HookDefinition `json:"turn_end,omitempty" yaml:"turn_end,omitempty"` // BeforeLLMCall hooks run just before each model call (after diff --git a/pkg/hooks/types.go b/pkg/hooks/types.go index 77459e406..474c0cf05 100644 --- a/pkg/hooks/types.go +++ b/pkg/hooks/types.go @@ -54,12 +54,11 @@ const ( // EventTurnEnd fires once per agent turn (each model call) when the // turn finishes — symmetric to [EventTurnStart]. It runs no matter // why the turn ended: a normal stop, an error, a hook-driven - // shutdown, the iteration limit, the loop detector, or context - // cancellation. The reason is reported in [Input.Reason] using one - // of the turnEndReason* constants in the runtime package ("normal", - // "continue", "steered", "error", "canceled", "hook_blocked", - // "max_iterations", "loop_detected"). Observational; output is - // ignored. + // shutdown, the loop detector, or context cancellation. The reason + // is reported in [Input.Reason] using one of the turnEndReason* + // constants in the runtime package ("normal", "continue", + // "steered", "error", "canceled", "hook_blocked", + // "loop_detected"). Observational; output is ignored. EventTurnEnd EventType = "turn_end" // EventBeforeLLMCall fires immediately before each model call. // Returning decision="block" (or continue=false / exit code 2) @@ -220,7 +219,7 @@ type Input struct { Source string `json:"source,omitempty"` // SessionEnd specific: "clear", "logout", "prompt_input_exit", "other". // TurnEnd specific: "normal", "continue", "steered", "error", - // "canceled", "hook_blocked", "max_iterations", "loop_detected". + // "canceled", "hook_blocked", "loop_detected". Reason string `json:"reason,omitempty"` // Stop / AfterLLMCall / SubagentStop: the model's final response content. StopResponse string `json:"stop_response,omitempty"` diff --git a/pkg/runtime/hooks.go b/pkg/runtime/hooks.go index 697d5e149..dcf01d84d 100644 --- a/pkg/runtime/hooks.go +++ b/pkg/runtime/hooks.go @@ -160,9 +160,6 @@ const ( // turnEndReasonLoopDetected — the consecutive-tool-call loop // detector terminated the turn. turnEndReasonLoopDetected = "loop_detected" - // turnEndReasonMaxIterations — the iteration limit was reached - // during this turn. - turnEndReasonMaxIterations = "max_iterations" ) // executeTurnEndHooks fires turn_end once per turn — symmetric to diff --git a/pkg/runtime/turn_end_test.go b/pkg/runtime/turn_end_test.go index 0390bcc36..c3e798e06 100644 --- a/pkg/runtime/turn_end_test.go +++ b/pkg/runtime/turn_end_test.go @@ -2,7 +2,6 @@ package runtime import ( "context" - "errors" "sync" "testing" @@ -198,39 +197,45 @@ func TestTurnEndFiresOnStreamError(t *testing.T) { } // blockingProvider returns a stream whose Recv blocks until the -// provided context is cancelled, modelling a slow upstream that -// receives a cancel mid-stream. +// CreateChatCompletionStream-supplied context is cancelled, modelling +// a slow upstream that receives a cancel mid-stream. The cancellation +// signal is plumbed via the per-call done channel captured below — +// stashing the context on the stream itself would trip the +// containedctx linter, since context belongs in function arguments, +// not struct fields. type blockingProvider struct { - id string - release chan struct{} + id string } func (p *blockingProvider) ID() string { return p.id } func (p *blockingProvider) CreateChatCompletionStream(ctx context.Context, _ []chat.Message, _ []tools.Tool) (chat.MessageStream, error) { - return &blockingStream{ctx: ctx, release: p.release}, nil + // Snapshot ctx.Done() at stream-construction time — the runtime + // passes a per-call streamCtx that is cancelled when the parent + // (RunStream's) context is cancelled, so capturing the channel + // here is equivalent to capturing the context for our purposes + // and avoids holding a context.Context in struct state. + return &blockingStream{ + done: ctx.Done(), + err: func() error { return ctx.Err() }, + }, nil } func (p *blockingProvider) BaseConfig() base.Config { return base.Config{} } func (p *blockingProvider) MaxTokens() int { return 0 } type blockingStream struct { - ctx context.Context - release chan struct{} + done <-chan struct{} + err func() error } func (s *blockingStream) Recv() (chat.MessageStreamResponse, error) { - // Block until the parent context is cancelled or a release signal - // is received. On cancellation we surface ctx.Err() so the - // runtime's fallback.execute treats it as a stream error and - // proceeds to handleStreamError, which exits the loop and - // triggers the deferred turn_end dispatch. - select { - case <-s.ctx.Done(): - return chat.MessageStreamResponse{}, s.ctx.Err() - case <-s.release: - return chat.MessageStreamResponse{}, errors.New("released without data") - } + // Block until the parent context is cancelled. On cancellation + // we surface ctx.Err() so the runtime's fallback.execute treats + // it as a stream error and proceeds to handleStreamError, which + // exits the loop and triggers the deferred turn_end dispatch. + <-s.done + return chat.MessageStreamResponse{}, s.err() } func (s *blockingStream) Close() {} @@ -251,7 +256,7 @@ func (s *blockingStream) Close() {} func TestTurnEndFiresOnContextCancellation(t *testing.T) { t.Parallel() - prov := &blockingProvider{id: "test/blocking-model", release: make(chan struct{})} + prov := &blockingProvider{id: "test/blocking-model"} root := agent.New("root", "test agent", agent.WithModel(prov), @@ -381,4 +386,3 @@ func TestTurnEndFiresEveryIteration(t *testing.T) { assert.Equal(t, "normal", reasons[1], "final iteration must report 'normal'") } - From 41f29c9ceb4234a7e0ec174d0022ec4ced9f5564 Mon Sep 17 00:00:00 2001 From: Djordje Lukic Date: Wed, 29 Apr 2026 12:59:17 +0000 Subject: [PATCH 3/3] docs(hooks): document turn_end Assisted-By: docker-agent --- docs/configuration/hooks/index.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/configuration/hooks/index.md b/docs/configuration/hooks/index.md index da810b596..7c3b8d5ed 100644 --- a/docs/configuration/hooks/index.md +++ b/docs/configuration/hooks/index.md @@ -43,6 +43,7 @@ docker-agent dispatches the following hook events: | `session_start` | When a session begins or resumes | No | | `user_prompt_submit` | Once per user message, after submission and before the model runs | Yes | | `turn_start` | At the start of every agent turn (each model call) | No | +| `turn_end` | At the end of every agent turn — fires no matter why the turn ended | No | | `before_llm_call` | Just before every model call (after `turn_start`) | Yes | | `after_llm_call` | After every successful model call, before the response is recorded | No | | `session_end` | When a session terminates | No | @@ -225,6 +226,7 @@ In addition to the common fields, each event ships its own payload: | `session_start` | `source` — one of `startup`, `resume`, `clear`, `compact` | | `user_prompt_submit` | `prompt` — the text the user just submitted | | `turn_start` | _none_ (just the common fields) | +| `turn_end` | `agent_name`, `reason` — one of `normal`, `continue`, `steered`, `error`, `canceled`, `hook_blocked`, `loop_detected` | | `before_llm_call` | _none_ | | `after_llm_call` | `agent_name`, `stop_response`, `last_user_message` | | `session_end` | `reason` — one of `clear`, `logout`, `prompt_input_exit`, `other` | @@ -495,6 +497,24 @@ Use `on_error` and `on_max_iterations` instead of `notification` when you want a `turn_start` fires at the start of every agent turn (each model call). Anything you contribute via `additional_context` (or plain stdout) is appended as a **transient** system message for that turn only — it is *not* persisted to the session. Use it for fast-moving signals like the date, current git state, or per-turn prompt files. The built-in hooks `add_date`, `add_prompt_files`, `add_git_status`, and `add_git_diff` all target this event. +### Turn-End: per-turn finalizer + +`turn_end` is the symmetric counterpart of `turn_start`. It fires once per turn when the iteration finishes — no matter why. The runtime guarantees the dispatch on every exit path (a normal stop, an error, a hook-driven shutdown, the loop detector, even context cancellation), and it uses `context.WithoutCancel` internally so handlers run to completion on Ctrl+C. + +The `reason` field classifies the exit: + +| `reason` | When | +| --------------- | ---- | +| `normal` | Model finished cleanly with no follow-up | +| `continue` | More iterations to come (e.g. tool calls, follow-up message) | +| `steered` | Drained steered messages prompted a re-entry | +| `error` | Model call failed (`handleStreamError` exited the loop) | +| `canceled` | Context was cancelled (e.g. Ctrl+C) | +| `hook_blocked` | `before_llm_call` or `post_tool_use` denied the call | +| `loop_detected` | The consecutive-tool-call loop detector terminated the turn | + +`turn_end` is observational — the result is ignored. Use it to time turns, accumulate per-turn metrics (token usage, tool counts), or notify external observability pipelines symmetrically with `turn_start`. + ### Before/After-LLM-Call: budget guards and model auditing `before_llm_call` fires immediately before every model call (after `turn_start` has assembled the messages). It cannot contribute context — use `turn_start` for that — but it can **stop the run** by returning `decision: block` (or exit code 2). The built-in `max_iterations` hook implements a hard cap on top of this event.