diff --git a/cmd/server/server.go b/cmd/server/server.go index 6a7fa7f0..6d5cdec3 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -136,7 +136,6 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er fmt.Println(srv.GetOpenAPI()) return nil } - srv.StartSnapshotLoop(ctx) logger.Info("Starting server on port", "port", port) processExitCh := make(chan error, 1) go func() { diff --git a/e2e/echo_test.go b/e2e/echo_test.go index fbc1efc0..861fb421 100644 --- a/e2e/echo_test.go +++ b/e2e/echo_test.go @@ -133,14 +133,11 @@ func setup(ctx context.Context, t testing.TB, p *params) ([]ScriptEntry, *agenta cwd, err := os.Getwd() require.NoError(t, err, "Failed to get current working directory") binaryPath = filepath.Join(cwd, "..", "out", "agentapi") - _, err = os.Stat(binaryPath) - if err != nil { - t.Logf("Building binary at %s", binaryPath) - buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".") - buildCmd.Dir = filepath.Join(cwd, "..") - t.Logf("run: %s", buildCmd.String()) - require.NoError(t, buildCmd.Run(), "Failed to build binary") - } + t.Logf("Building binary at %s", binaryPath) + buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".") + buildCmd.Dir = filepath.Join(cwd, "..") + t.Logf("run: %s", buildCmd.String()) + require.NoError(t, buildCmd.Run(), "Failed to build binary") } serverPort, err := getFreePort() diff --git a/lib/httpapi/server.go b/lib/httpapi/server.go index cb13a29e..e43315bf 100644 --- a/lib/httpapi/server.go +++ b/lib/httpapi/server.go @@ -41,7 +41,7 @@ type Server struct { srv *http.Server mu sync.RWMutex logger *slog.Logger - conversation *st.PTYConversation + conversation st.Conversation agentio *termexec.Process agentType mf.AgentType emitter *EventEmitter @@ -244,6 +244,14 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { return mf.FormatToolCall(config.AgentType, message) } + emitter := NewEventEmitter(1024) + + // Format initial prompt into message parts if provided + var initialPrompt []st.MessagePart + if config.InitialPrompt != "" { + initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt) + } + conversation := st.NewPTY(ctx, st.PTYConversationConfig{ AgentType: config.AgentType, AgentIO: config.Process, @@ -253,9 +261,17 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { FormatMessage: formatMessage, ReadyForInitialPrompt: isAgentReadyForInitialPrompt, FormatToolCall: formatToolCall, - Logger: logger, - }, config.InitialPrompt) - emitter := NewEventEmitter(1024) + InitialPrompt: initialPrompt, + // OnSnapshot uses a callback rather than passing the emitter directly + // to keep the screentracker package decoupled from httpapi concerns. + // This preserves clean package boundaries and avoids import cycles. + OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) { + emitter.UpdateStatusAndEmitChanges(status, config.AgentType) + emitter.UpdateMessagesAndEmitChanges(messages) + emitter.UpdateScreenAndEmitChanges(screen) + }, + Logger: logger, + }) // Create temporary directory for uploads tempDir, err := os.MkdirTemp("", "agentapi-uploads-") @@ -281,6 +297,16 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { // Register API routes s.registerRoutes() + // Start the conversation polling loop if we have a process. + // Process is nil only when --print-openapi is used (no agent runs). + // The process is already running at this point - termexec.StartProcess() + // blocks until the PTY is created and the process is active. Agent + // readiness (waiting for the prompt) is handled asynchronously inside + // conversation.Start() via ReadyForInitialPrompt. + if config.Process != nil { + s.conversation.Start(ctx) + } + return s, nil } @@ -336,38 +362,6 @@ func sseMiddleware(ctx huma.Context, next func(huma.Context)) { next(ctx) } -func (s *Server) StartSnapshotLoop(ctx context.Context) { - s.conversation.Start(ctx) - go func() { - ticker := s.clock.NewTicker(snapshotInterval) - defer ticker.Stop() - for { - currentStatus := s.conversation.Status() - - // Send initial prompt when agent becomes stable for the first time - if !s.conversation.InitialPromptSent && convertStatus(currentStatus) == AgentStatusStable { - if err := s.conversation.Send(FormatMessage(s.agentType, s.conversation.InitialPrompt)...); err != nil { - s.logger.Error("Failed to send initial prompt", "error", err) - } else { - s.conversation.InitialPromptSent = true - s.conversation.ReadyForInitialPrompt = false - currentStatus = st.ConversationStatusChanging - s.logger.Info("Initial prompt sent successfully") - } - } - s.emitter.UpdateStatusAndEmitChanges(currentStatus, s.agentType) - s.emitter.UpdateMessagesAndEmitChanges(s.conversation.Messages()) - s.emitter.UpdateScreenAndEmitChanges(s.conversation.Text()) - - select { - case <-ctx.Done(): - return - case <-ticker.C: - } - } - }() -} - // registerRoutes sets up all API endpoints func (s *Server) registerRoutes() { // GET /status endpoint diff --git a/lib/screentracker/pty_conversation.go b/lib/screentracker/pty_conversation.go index 38d8e409..5ae29b2c 100644 --- a/lib/screentracker/pty_conversation.go +++ b/lib/screentracker/pty_conversation.go @@ -66,7 +66,11 @@ type PTYConversationConfig struct { ReadyForInitialPrompt func(message string) bool // FormatToolCall removes the coder report_task tool call from the agent message and also returns the array of removed tool calls FormatToolCall func(message string) (string, []string) - Logger *slog.Logger + // InitialPrompt is the initial prompt to send to the agent once ready + InitialPrompt []MessagePart + // OnSnapshot is called after each snapshot with current status, messages, and screen content + OnSnapshot func(status ConversationStatus, messages []ConversationMessage, screen string) + Logger *slog.Logger } func (cfg PTYConversationConfig) getStableSnapshotsThreshold() int { @@ -88,21 +92,23 @@ type PTYConversation struct { snapshotBuffer *RingBuffer[screenSnapshot] messages []ConversationMessage screenBeforeLastUserMessage string - lock sync.Mutex - - // InitialPrompt is the initial prompt passed to the agent - InitialPrompt string - // InitialPromptSent keeps track if the InitialPrompt has been successfully sent to the agents - InitialPromptSent bool - // ReadyForInitialPrompt keeps track if the agent is ready to accept the initial prompt - ReadyForInitialPrompt bool + lock sync.Mutex + + // outboundQueue holds messages waiting to be sent to the agent + outboundQueue chan []MessagePart + // stableSignal is used by the snapshot loop to signal the send loop + // when the agent is stable and there are items in the outbound queue. + stableSignal chan struct{} // toolCallMessageSet keeps track of the tool calls that have been detected & logged in the current agent message toolCallMessageSet map[string]bool + // initialPromptReady is closed when ReadyForInitialPrompt returns true. + // This is checked by a separate goroutine to avoid calling ReadyForInitialPrompt on every tick. + initialPromptReady chan struct{} } var _ Conversation = &PTYConversation{} -func NewPTY(ctx context.Context, cfg PTYConversationConfig, initialPrompt string) *PTYConversation { +func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation { if cfg.Clock == nil { cfg.Clock = quartz.NewReal() } @@ -118,33 +124,102 @@ func NewPTY(ctx context.Context, cfg PTYConversationConfig, initialPrompt string Time: cfg.Clock.Now(), }, }, - InitialPrompt: initialPrompt, - InitialPromptSent: len(initialPrompt) == 0, + outboundQueue: make(chan []MessagePart, 1), + stableSignal: make(chan struct{}, 1), toolCallMessageSet: make(map[string]bool), + initialPromptReady: make(chan struct{}), + } + // If we have an initial prompt, enqueue it + if len(cfg.InitialPrompt) > 0 { + c.outboundQueue <- cfg.InitialPrompt + } + if c.cfg.OnSnapshot == nil { + c.cfg.OnSnapshot = func(ConversationStatus, []ConversationMessage, string) {} + } + if c.cfg.ReadyForInitialPrompt == nil { + c.cfg.ReadyForInitialPrompt = func(string) bool { return true } } return c } func (c *PTYConversation) Start(ctx context.Context) { + // Initial prompt readiness loop - polls ReadyForInitialPrompt until it returns true, + // then sets initialPromptReady and exits. This avoids calling ReadyForInitialPrompt + // on every snapshot tick. + go func() { + ticker := c.cfg.Clock.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + screen := c.cfg.AgentIO.ReadScreen() + if c.cfg.ReadyForInitialPrompt(screen) { + close(c.initialPromptReady) + return + } + } + } + }() + + // Snapshot loop go func() { ticker := c.cfg.Clock.NewTicker(c.cfg.SnapshotInterval) defer ticker.Stop() + for { select { case <-ctx.Done(): return case <-ticker.C: - // It's important that we hold the lock while reading the screen. - // There's a race condition that occurs without it: - // 1. The screen is read - // 2. Independently, Send is called and takes the lock. - // 3. snapshotLocked is called and waits on the lock. - // 4. Send modifies the terminal state, releases the lock - // 5. snapshotLocked adds a snapshot from a stale screen c.lock.Lock() screen := c.cfg.AgentIO.ReadScreen() c.snapshotLocked(screen) + status := c.statusLocked() + messages := c.messagesLocked() + + // Signal send loop if agent is ready and queue has items. + // We check readiness independently of statusLocked() because + // statusLocked() returns "changing" when queue has items. + isReady := false + select { + case <-c.initialPromptReady: + isReady = true + default: + } + if len(c.outboundQueue) > 0 && c.isScreenStableLocked() && isReady { + select { + case c.stableSignal <- struct{}{}: + default: + // Signal already pending + } + } c.lock.Unlock() + + c.cfg.OnSnapshot(status, messages, screen) + } + } + }() + + // Send loop + go func() { + for { + select { + case <-ctx.Done(): + return + case <-c.stableSignal: + select { + case parts := <-c.outboundQueue: + c.lock.Lock() + if err := c.sendLocked(parts...); err != nil { + c.cfg.Logger.Error("failed to send queued message", "error", err) + } + c.lock.Unlock() + default: + c.cfg.Logger.Error("received stable signal but outbound queue is empty") + } } } }() @@ -226,6 +301,11 @@ func (c *PTYConversation) Send(messageParts ...MessagePart) error { return ErrMessageValidationChanging } + return c.sendLocked(messageParts...) +} + +// sendLocked sends a message to the agent. Caller MUST hold c.lock. +func (c *PTYConversation) sendLocked(messageParts ...MessagePart) error { var sb strings.Builder for _, part := range messageParts { sb.WriteString(part.String()) @@ -328,6 +408,21 @@ func (c *PTYConversation) Status() ConversationStatus { return c.statusLocked() } +// isScreenStableLocked returns true if the screen content has been stable +// for the required number of snapshots. Caller MUST hold c.lock. +func (c *PTYConversation) isScreenStableLocked() bool { + snapshots := c.snapshotBuffer.GetAll() + if len(snapshots) < c.stableSnapshotsThreshold { + return false + } + for i := 1; i < len(snapshots); i++ { + if snapshots[0].screen != snapshots[i].screen { + return false + } + } + return true +} + // caller MUST hold c.lock func (c *PTYConversation) statusLocked() ConversationStatus { // sanity checks @@ -350,17 +445,13 @@ func (c *PTYConversation) statusLocked() ConversationStatus { return ConversationStatusInitializing } - for i := 1; i < len(snapshots); i++ { - if snapshots[0].screen != snapshots[i].screen { - return ConversationStatusChanging - } + if !c.isScreenStableLocked() { + return ConversationStatusChanging } - if !c.InitialPromptSent && !c.ReadyForInitialPrompt { - if len(snapshots) > 0 && c.cfg.ReadyForInitialPrompt(snapshots[len(snapshots)-1].screen) { - c.ReadyForInitialPrompt = true - return ConversationStatusStable - } + // Handle initial prompt readiness: report "changing" until the queue is drained + // to avoid the status flipping "changing" -> "stable" -> "changing" + if len(c.outboundQueue) > 0 { return ConversationStatusChanging } @@ -371,6 +462,11 @@ func (c *PTYConversation) Messages() []ConversationMessage { c.lock.Lock() defer c.lock.Unlock() + return c.messagesLocked() +} + +// messagesLocked returns a copy of messages. Caller MUST hold c.lock. +func (c *PTYConversation) messagesLocked() []ConversationMessage { result := make([]ConversationMessage, len(c.messages)) copy(result, c.messages) return result diff --git a/lib/screentracker/pty_conversation_test.go b/lib/screentracker/pty_conversation_test.go index e2903227..7fe061ee 100644 --- a/lib/screentracker/pty_conversation_test.go +++ b/lib/screentracker/pty_conversation_test.go @@ -40,7 +40,7 @@ func statusTest(t *testing.T, params statusTestParams) { if params.cfg.Clock == nil { params.cfg.Clock = quartz.NewReal() } - c := st.NewPTY(ctx, params.cfg, "") + c := st.NewPTY(ctx, params.cfg) assert.Equal(t, st.ConversationStatusInitializing, c.Status()) for i, step := range params.steps { @@ -147,7 +147,7 @@ func TestMessages(t *testing.T) { for _, opt := range opts { opt(&cfg) } - return st.NewPTY(context.Background(), cfg, "") + return st.NewPTY(context.Background(), cfg) } t.Run("messages are copied", func(t *testing.T) { @@ -361,19 +361,18 @@ func TestInitialPromptReadiness(t *testing.T) { ReadyForInitialPrompt: func(message string) bool { return message == "ready" }, + InitialPrompt: []st.MessagePart{st.MessagePartText{Content: "initial prompt here"}}, } - c := st.NewPTY(context.Background(), cfg, "initial prompt here") + c := st.NewPTY(context.Background(), cfg) // Fill buffer with stable snapshots, but agent is not ready c.Snapshot("loading...") // Even though screen is stable, status should be changing because agent is not ready assert.Equal(t, st.ConversationStatusChanging, c.Status()) - assert.False(t, c.ReadyForInitialPrompt) - assert.False(t, c.InitialPromptSent) }) - t.Run("agent becomes ready - status changes to stable", func(t *testing.T) { + t.Run("agent becomes ready - status stays changing until initial prompt sent", func(t *testing.T) { mClock := quartz.NewMock(t) mClock.Set(now) cfg := st.PTYConversationConfig{ @@ -384,21 +383,21 @@ func TestInitialPromptReadiness(t *testing.T) { ReadyForInitialPrompt: func(message string) bool { return message == "ready" }, + InitialPrompt: []st.MessagePart{st.MessagePartText{Content: "initial prompt here"}}, } - c := st.NewPTY(context.Background(), cfg, "initial prompt here") + c := st.NewPTY(context.Background(), cfg) // Agent not ready initially c.Snapshot("loading...") assert.Equal(t, st.ConversationStatusChanging, c.Status()) - // Agent becomes ready + // Agent becomes ready, but status stays "changing" until initial prompt is sent + // This is the new behavior - we don't flip to "stable" then back to "changing" c.Snapshot("ready") - assert.Equal(t, st.ConversationStatusStable, c.Status()) - assert.True(t, c.ReadyForInitialPrompt) - assert.False(t, c.InitialPromptSent) + assert.Equal(t, st.ConversationStatusChanging, c.Status()) }) - t.Run("ready for initial prompt lifecycle: false -> true -> false", func(t *testing.T) { + t.Run("initial prompt lifecycle - status stays changing until sent", func(t *testing.T) { mClock := quartz.NewMock(t) mClock.Set(now) agent := &testAgent{screen: "loading..."} @@ -410,35 +409,21 @@ func TestInitialPromptReadiness(t *testing.T) { ReadyForInitialPrompt: func(message string) bool { return message == "ready" }, + InitialPrompt: []st.MessagePart{st.MessagePartText{Content: "initial prompt here"}}, SkipWritingMessage: true, SkipSendMessageStatusCheck: true, } - c := st.NewPTY(context.Background(), cfg, "initial prompt here") + c := st.NewPTY(context.Background(), cfg) - // Initial state: ReadyForInitialPrompt should be false + // Initial state: status should be changing while waiting for readiness c.Snapshot("loading...") - assert.False(t, c.ReadyForInitialPrompt, "should start as false") - assert.False(t, c.InitialPromptSent) assert.Equal(t, st.ConversationStatusChanging, c.Status()) - // Agent becomes ready: ReadyForInitialPrompt should become true + // Agent becomes ready: status still "changing" until initial prompt is actually sent + // This prevents the status from flipping "changing" → "stable" → "changing" agent.screen = "ready" c.Snapshot("ready") - assert.Equal(t, st.ConversationStatusStable, c.Status()) - assert.True(t, c.ReadyForInitialPrompt, "should become true when ready") - assert.False(t, c.InitialPromptSent) - - // Send the initial prompt - assert.NoError(t, c.Send(st.MessagePartText{Content: "initial prompt here"})) - - // After sending initial prompt: ReadyForInitialPrompt should be set back to false - // (simulating what happens in the actual server code) - c.InitialPromptSent = true - c.ReadyForInitialPrompt = false - - // Verify final state - assert.False(t, c.ReadyForInitialPrompt, "should be false after initial prompt sent") - assert.True(t, c.InitialPromptSent) + assert.Equal(t, st.ConversationStatusChanging, c.Status()) }) t.Run("no initial prompt - normal status logic applies", func(t *testing.T) { @@ -452,56 +437,48 @@ func TestInitialPromptReadiness(t *testing.T) { ReadyForInitialPrompt: func(message string) bool { return false // Agent never ready }, + // No InitialPrompt set - means no need to wait for readiness } - // Empty initial prompt means no need to wait for readiness - c := st.NewPTY(context.Background(), cfg, "") + c := st.NewPTY(context.Background(), cfg) c.Snapshot("loading...") // Status should be stable because no initial prompt to wait for assert.Equal(t, st.ConversationStatusStable, c.Status()) - assert.False(t, c.ReadyForInitialPrompt) - assert.True(t, c.InitialPromptSent) // Set to true when initial prompt is empty }) - t.Run("initial prompt sent - normal status logic applies", func(t *testing.T) { + t.Run("no initial prompt configured - normal status logic applies", func(t *testing.T) { + // When no InitialPrompt is configured, the conversation behaves as if + // the initial prompt has already been sent, so normal status logic applies. mClock := quartz.NewMock(t) mClock.Set(now) agent := &testAgent{screen: "ready"} cfg := st.PTYConversationConfig{ Clock: mClock, SnapshotInterval: 1 * time.Second, - ScreenStabilityLength: 0, + ScreenStabilityLength: 2 * time.Second, // threshold = 3 AgentIO: agent, - ReadyForInitialPrompt: func(message string) bool { - return message == "ready" - }, + // No InitialPrompt configured - normal status logic applies immediately SkipWritingMessage: true, SkipSendMessageStatusCheck: true, } - c := st.NewPTY(context.Background(), cfg, "initial prompt here") + c := st.NewPTY(context.Background(), cfg) - // First, agent becomes ready + // Fill buffer to reach stability with "ready" screen + c.Snapshot("ready") c.Snapshot("ready") + c.Snapshot("ready") + // Since no initial prompt is configured, screen stability determines status assert.Equal(t, st.ConversationStatusStable, c.Status()) - assert.True(t, c.ReadyForInitialPrompt) - assert.False(t, c.InitialPromptSent) - // Send the initial prompt + // After screen changes, status becomes changing agent.screen = "processing..." - assert.NoError(t, c.Send(st.MessagePartText{Content: "initial prompt here"})) - - // Mark initial prompt as sent (simulating what the server does) - c.InitialPromptSent = true - c.ReadyForInitialPrompt = false - - // Now test that status logic works normally after initial prompt is sent c.Snapshot("processing...") + assert.Equal(t, st.ConversationStatusChanging, c.Status()) - // Status should be stable because initial prompt was already sent - // and the readiness check is bypassed + // After screen is stable again (3 identical snapshots), status becomes stable + c.Snapshot("processing...") + c.Snapshot("processing...") assert.Equal(t, st.ConversationStatusStable, c.Status()) - assert.False(t, c.ReadyForInitialPrompt) - assert.True(t, c.InitialPromptSent) }) }