diff --git a/pkg/app/app.go b/pkg/app/app.go index fdbb1d2eb..6b02c3ef9 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -565,6 +565,18 @@ func (a *App) Resume(req runtime.ResumeRequest) { a.runtime.Resume(context.Background(), req) } +// TogglePause toggles whether the runtime loop is paused at iteration +// boundaries. The second return value is false if the underlying runtime +// doesn't support pausing (e.g. remote runtimes), in which case the first +// return value is meaningless. +func (a *App) TogglePause() (paused, supported bool) { + rt, ok := a.runtime.(interface{ TogglePause() bool }) + if !ok { + return false, false + } + return rt.TogglePause(), true +} + // ResumeElicitation resumes an elicitation request with the given action and content func (a *App) ResumeElicitation(ctx context.Context, action tools.ElicitationAction, content map[string]any) error { return a.runtime.ResumeElicitation(ctx, action, content) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index d1da4f050..0b43d67a9 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -275,6 +275,12 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, var prevAgentName string for { + // Pause the loop here if /pause has been toggled on. Any in-flight + // LLM request and its tool calls have already completed. + if err := r.waitIfPaused(ctx); err != nil { + return + } + a = r.resolveSessionAgent(sess) // Clear per-tool model override on agent switch so it doesn't diff --git a/pkg/runtime/pause.go b/pkg/runtime/pause.go new file mode 100644 index 000000000..28077226f --- /dev/null +++ b/pkg/runtime/pause.go @@ -0,0 +1,37 @@ +package runtime + +import "context" + +// TogglePause toggles whether runStreamLoop pauses at iteration boundaries. +// Returns true if now paused. The pause takes effect as soon as the in-flight +// LLM request and its tool calls complete. +// +// Internally, pauseCh is non-nil and open while paused; closing it on resume +// wakes every goroutine waiting in waitIfPaused. +func (r *LocalRuntime) TogglePause() bool { + r.pauseMu.Lock() + defer r.pauseMu.Unlock() + if r.pauseCh != nil { + close(r.pauseCh) + r.pauseCh = nil + return false + } + r.pauseCh = make(chan struct{}) + return true +} + +// waitIfPaused blocks until the runtime is resumed or ctx is cancelled. +func (r *LocalRuntime) waitIfPaused(ctx context.Context) error { + r.pauseMu.Lock() + ch := r.pauseCh + r.pauseMu.Unlock() + if ch == nil { + return nil + } + select { + case <-ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/runtime/pause_test.go b/pkg/runtime/pause_test.go new file mode 100644 index 000000000..97381e990 --- /dev/null +++ b/pkg/runtime/pause_test.go @@ -0,0 +1,175 @@ +package runtime + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestTogglePause_StateCycles verifies a /pause /pause /pause sequence +// alternates between paused and resumed. +func TestTogglePause_StateCycles(t *testing.T) { + t.Parallel() + + r := &LocalRuntime{} + + assert.True(t, r.TogglePause(), "first toggle should pause") + assert.False(t, r.TogglePause(), "second toggle should resume") + assert.True(t, r.TogglePause(), "third toggle should pause again") + assert.False(t, r.TogglePause(), "fourth toggle should resume again") +} + +// TestWaitIfPaused_NotPaused returns immediately when the runtime isn't paused. +func TestWaitIfPaused_NotPaused(t *testing.T) { + t.Parallel() + + r := &LocalRuntime{} + + done := make(chan error, 1) + go func() { done <- r.waitIfPaused(t.Context()) }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("waitIfPaused should return immediately when not paused") + } +} + +// TestWaitIfPaused_BlocksUntilResumed verifies that a goroutine in +// waitIfPaused stays blocked while paused and wakes up on resume. +func TestWaitIfPaused_BlocksUntilResumed(t *testing.T) { + t.Parallel() + + r := &LocalRuntime{} + r.TogglePause() // pause + + done := make(chan error, 1) + go func() { done <- r.waitIfPaused(t.Context()) }() + + // Should still be blocked. + select { + case <-done: + t.Fatal("waitIfPaused returned before resume") + case <-time.After(50 * time.Millisecond): + } + + r.TogglePause() // resume + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("waitIfPaused did not unblock after resume") + } +} + +// TestWaitIfPaused_ContextCancellation verifies cancelling the context wakes +// up a goroutine waiting in waitIfPaused, returning the context error. +func TestWaitIfPaused_ContextCancellation(t *testing.T) { + t.Parallel() + + r := &LocalRuntime{} + r.TogglePause() // pause + + ctx, cancel := context.WithCancel(t.Context()) + done := make(chan error, 1) + go func() { done <- r.waitIfPaused(ctx) }() + + // Should still be blocked. + select { + case <-done: + t.Fatal("waitIfPaused returned before cancellation") + case <-time.After(50 * time.Millisecond): + } + + cancel() + + select { + case err := <-done: + require.ErrorIs(t, err, context.Canceled) + case <-time.After(time.Second): + t.Fatal("waitIfPaused did not unblock after ctx cancellation") + } +} + +// TestWaitIfPaused_BroadcastsToAllWaiters verifies a single resume wakes up +// every goroutine that was waiting on the same pause. +func TestWaitIfPaused_BroadcastsToAllWaiters(t *testing.T) { + t.Parallel() + + r := &LocalRuntime{} + r.TogglePause() + + const n = 8 + var wg sync.WaitGroup + for range n { + wg.Go(func() { + _ = r.waitIfPaused(t.Context()) + }) + } + + // Give them a moment to all enter waitIfPaused. + time.Sleep(50 * time.Millisecond) + r.TogglePause() // single resume should wake all waiters + + doneAll := make(chan struct{}) + go func() { + wg.Wait() + close(doneAll) + }() + + select { + case <-doneAll: + case <-time.After(time.Second): + t.Fatal("not all waiters woke up after a single resume") + } +} + +// TestTogglePause_RaceFreeUnderConcurrentCallers exercises concurrent +// TogglePause and waitIfPaused calls. Run with -race to detect data races. +func TestTogglePause_RaceFreeUnderConcurrentCallers(t *testing.T) { + t.Parallel() + + r := &LocalRuntime{} + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + var wg sync.WaitGroup + const togglers = 4 + const waiters = 4 + + for range togglers { + wg.Go(func() { + for range 200 { + r.TogglePause() + } + }) + } + for range waiters { + wg.Go(func() { + for range 200 { + _ = r.waitIfPaused(ctx) + } + }) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // If a waiter is left blocked on a pause that no toggler will flip, + // cancelling the context unblocks it so wg.Wait() can return. + select { + case <-done: + case <-time.After(2 * time.Second): + cancel() + <-done + } +} diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index e41af6934..e26a2ff13 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -8,6 +8,7 @@ import ( "maps" "os" "strings" + "sync" "time" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -228,6 +229,12 @@ type LocalRuntime struct { // WithMaxOverflowCompactions to exercise both the "compaction // succeeded" and "compaction exhausted" branches. maxOverflowCompactions int + + // pauseMu guards pauseCh. + pauseMu sync.Mutex + // pauseCh is non-nil and open while /pause has paused the run loop; + // nil otherwise. See TogglePause and waitIfPaused. + pauseCh chan struct{} } type Opt func(*LocalRuntime) diff --git a/pkg/tui/commands/commands.go b/pkg/tui/commands/commands.go index 3acbcdf85..3965da89c 100644 --- a/pkg/tui/commands/commands.go +++ b/pkg/tui/commands/commands.go @@ -195,6 +195,17 @@ func builtInSessionCommands() []Item { return core.CmdHandler(messages.NewSessionMsg{}) }, }, + { + ID: "session.pause", + Label: "Pause", + SlashCommand: "/pause", + Description: "Pause/resume the runtime loop after the current request", + Category: "Session", + Immediate: true, + Execute: func(string) tea.Cmd { + return core.CmdHandler(messages.TogglePauseMsg{}) + }, + }, { ID: "session.permissions", Label: "Permissions", diff --git a/pkg/tui/handlers.go b/pkg/tui/handlers.go index e2af925e1..8f07d37b1 100644 --- a/pkg/tui/handlers.go +++ b/pkg/tui/handlers.go @@ -310,6 +310,21 @@ func (m *appModel) handleToggleYolo() (tea.Model, tea.Cmd) { return m.forwardChat(messages.SessionToggleChangedMsg{}) } +// handleTogglePause toggles whether the runtime loop is paused at iteration +// boundaries. The pause kicks in once the in-flight LLM request and its tool +// calls finish; running /pause again resumes the loop. +func (m *appModel) handleTogglePause() (tea.Model, tea.Cmd) { + paused, supported := m.application.TogglePause() + switch { + case !supported: + return m, notification.InfoCmd("Pause is not supported with remote runtimes") + case paused: + return m, notification.InfoCmd("Runtime paused — /pause again to resume") + default: + return m, notification.SuccessCmd("Runtime resumed") + } +} + func (m *appModel) handleToggleHideToolResults() (tea.Model, tea.Cmd) { return m.forwardChat(messages.ToggleHideToolResultsMsg{}) } diff --git a/pkg/tui/messages/toggle.go b/pkg/tui/messages/toggle.go index 72875842c..386467b19 100644 --- a/pkg/tui/messages/toggle.go +++ b/pkg/tui/messages/toggle.go @@ -5,6 +5,11 @@ type ( // ToggleYoloMsg toggles YOLO mode (auto-approve tools). ToggleYoloMsg struct{} + // TogglePauseMsg toggles whether the runtime loop is paused at + // iteration boundaries. The pause takes effect as soon as the + // in-flight LLM request and its tool calls complete. + TogglePauseMsg struct{} + // ToggleHideToolResultsMsg toggles hiding of tool results. ToggleHideToolResultsMsg struct{} diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index 016195a6d..c571f5297 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -774,6 +774,9 @@ func (m *appModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case messages.ToggleYoloMsg: return m.handleToggleYolo() + case messages.TogglePauseMsg: + return m.handleTogglePause() + case messages.ToggleHideToolResultsMsg: return m.handleToggleHideToolResults()