Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d51e4c2
refactor(screentracker): internalize initial prompt handling in PTYCo…
johnstcn Feb 4, 2026
241671a
refactor(server): use PTYConversation's internal snapshot loop and In…
johnstcn Feb 4, 2026
c80b288
Update pty_conversation tests for new NewPTY API and status behavior
johnstcn Feb 4, 2026
35fed24
fix: race condition in initial prompt sending
johnstcn Feb 4, 2026
2c907a5
httpapi: replace StartSnapshotLoop with Conversation accessor
johnstcn Feb 4, 2026
ccec233
refactor(screentracker): remove redundant exported fields from PTYCon…
johnstcn Feb 4, 2026
d16c552
refactor: use srv.Conversation().Start(ctx) instead of removed StartS…
johnstcn Feb 4, 2026
1fa447e
refactor(tests): remove direct field access to InitialPromptSent
johnstcn Feb 4, 2026
24ff106
refactor: move conversation.Start() into NewServer() and remove Conve…
johnstcn Feb 4, 2026
9a197fc
refactor(screentracker): move status check from sendLocked to Send()
johnstcn Feb 4, 2026
3dd8c56
docs: add clarifying comments addressing review feedback
johnstcn Feb 4, 2026
f5bd08d
refactor: use Conversation interface in Server struct
johnstcn Feb 4, 2026
86c9d91
refactor: remove unnecessary agentType local variable
johnstcn Feb 4, 2026
b5437f7
refactor: simplify nil channel handling in Start()
johnstcn Feb 4, 2026
593b65f
refactor: remove unused InitialPrompt field from PTYConversation
johnstcn Feb 4, 2026
6079777
refactor: replace initialPromptReady/Sent with outbound message queue
johnstcn Feb 5, 2026
002fd67
screentracker: use no-op defaults for optional callbacks
johnstcn Feb 5, 2026
a6b54a4
screentracker: remove duplicate agentReady nil check
johnstcn Feb 5, 2026
3259c1f
screentracker: remove unnecessary comment and replace goto with label…
johnstcn Feb 5, 2026
54b8ef4
refactor: use separate snapshot/send goroutines with stableSignal cha…
johnstcn Feb 5, 2026
b157ff7
e2e: always rebuild binary when AGENTAPI_BINARY_PATH is not set
johnstcn Feb 5, 2026
7d488f3
Fix stableSignal deadlock in pty_conversation.go
johnstcn Feb 5, 2026
f725683
Optimize ReadyForInitialPrompt check and fix stability comparison
johnstcn Feb 5, 2026
0b9e687
refactor: use channel for initialPromptReady one-time signaling
johnstcn Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
fmt.Println(srv.GetOpenAPI())
return nil
}
srv.StartSnapshotLoop(ctx)
logger.Info("Starting server on port", "port", port)
processExitCh := make(chan error, 1)
go func() {
Expand Down
13 changes: 5 additions & 8 deletions e2e/echo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,11 @@ func setup(ctx context.Context, t testing.TB, p *params) ([]ScriptEntry, *agenta
cwd, err := os.Getwd()
require.NoError(t, err, "Failed to get current working directory")
binaryPath = filepath.Join(cwd, "..", "out", "agentapi")
_, err = os.Stat(binaryPath)
if err != nil {
Comment on lines -136 to -137
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: rebuilding binary here on every run as a stale binary caused me to miss a potential issue

t.Logf("Building binary at %s", binaryPath)
buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".")
buildCmd.Dir = filepath.Join(cwd, "..")
t.Logf("run: %s", buildCmd.String())
require.NoError(t, buildCmd.Run(), "Failed to build binary")
}
t.Logf("Building binary at %s", binaryPath)
buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".")
buildCmd.Dir = filepath.Join(cwd, "..")
t.Logf("run: %s", buildCmd.String())
require.NoError(t, buildCmd.Run(), "Failed to build binary")
}

serverPort, err := getFreePort()
Expand Down
66 changes: 30 additions & 36 deletions lib/httpapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Server struct {
srv *http.Server
mu sync.RWMutex
logger *slog.Logger
conversation *st.PTYConversation
conversation st.Conversation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review: this is the whole point of this PR

agentio *termexec.Process
agentType mf.AgentType
emitter *EventEmitter
Expand Down Expand Up @@ -244,6 +244,14 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
return mf.FormatToolCall(config.AgentType, message)
}

emitter := NewEventEmitter(1024)

// Format initial prompt into message parts if provided
var initialPrompt []st.MessagePart
if config.InitialPrompt != "" {
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
}

conversation := st.NewPTY(ctx, st.PTYConversationConfig{
AgentType: config.AgentType,
AgentIO: config.Process,
Expand All @@ -253,9 +261,17 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
FormatMessage: formatMessage,
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
FormatToolCall: formatToolCall,
Logger: logger,
}, config.InitialPrompt)
emitter := NewEventEmitter(1024)
InitialPrompt: initialPrompt,
// OnSnapshot uses a callback rather than passing the emitter directly
// to keep the screentracker package decoupled from httpapi concerns.
// This preserves clean package boundaries and avoids import cycles.
OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) {
emitter.UpdateStatusAndEmitChanges(status, config.AgentType)
emitter.UpdateMessagesAndEmitChanges(messages)
emitter.UpdateScreenAndEmitChanges(screen)
},
Comment on lines 265 to 272
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review: Could alternatively extract an Emitter interface and pass this in.

Logger: logger,
})

// Create temporary directory for uploads
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
Expand All @@ -281,6 +297,16 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
// Register API routes
s.registerRoutes()

// Start the conversation polling loop if we have a process.
// Process is nil only when --print-openapi is used (no agent runs).
// The process is already running at this point - termexec.StartProcess()
// blocks until the PTY is created and the process is active. Agent
// readiness (waiting for the prompt) is handled asynchronously inside
// conversation.Start() via ReadyForInitialPrompt.
if config.Process != nil {
s.conversation.Start(ctx)
}

return s, nil
}

Expand Down Expand Up @@ -336,38 +362,6 @@ func sseMiddleware(ctx huma.Context, next func(huma.Context)) {
next(ctx)
}

func (s *Server) StartSnapshotLoop(ctx context.Context) {
s.conversation.Start(ctx)
go func() {
ticker := s.clock.NewTicker(snapshotInterval)
defer ticker.Stop()
for {
currentStatus := s.conversation.Status()

// Send initial prompt when agent becomes stable for the first time
if !s.conversation.InitialPromptSent && convertStatus(currentStatus) == AgentStatusStable {
if err := s.conversation.Send(FormatMessage(s.agentType, s.conversation.InitialPrompt)...); err != nil {
s.logger.Error("Failed to send initial prompt", "error", err)
} else {
s.conversation.InitialPromptSent = true
s.conversation.ReadyForInitialPrompt = false
currentStatus = st.ConversationStatusChanging
s.logger.Info("Initial prompt sent successfully")
}
}
s.emitter.UpdateStatusAndEmitChanges(currentStatus, s.agentType)
s.emitter.UpdateMessagesAndEmitChanges(s.conversation.Messages())
s.emitter.UpdateScreenAndEmitChanges(s.conversation.Text())

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()
}

// registerRoutes sets up all API endpoints
func (s *Server) registerRoutes() {
// GET /status endpoint
Expand Down
152 changes: 124 additions & 28 deletions lib/screentracker/pty_conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ type PTYConversationConfig struct {
ReadyForInitialPrompt func(message string) bool
// FormatToolCall removes the coder report_task tool call from the agent message and also returns the array of removed tool calls
FormatToolCall func(message string) (string, []string)
Logger *slog.Logger
// InitialPrompt is the initial prompt to send to the agent once ready
InitialPrompt []MessagePart
// OnSnapshot is called after each snapshot with current status, messages, and screen content
OnSnapshot func(status ConversationStatus, messages []ConversationMessage, screen string)
Logger *slog.Logger
}

func (cfg PTYConversationConfig) getStableSnapshotsThreshold() int {
Expand All @@ -88,21 +92,23 @@ type PTYConversation struct {
snapshotBuffer *RingBuffer[screenSnapshot]
messages []ConversationMessage
screenBeforeLastUserMessage string
lock sync.Mutex

// InitialPrompt is the initial prompt passed to the agent
InitialPrompt string
// InitialPromptSent keeps track if the InitialPrompt has been successfully sent to the agents
InitialPromptSent bool
// ReadyForInitialPrompt keeps track if the agent is ready to accept the initial prompt
ReadyForInitialPrompt bool
lock sync.Mutex

// outboundQueue holds messages waiting to be sent to the agent
outboundQueue chan []MessagePart
// stableSignal is used by the snapshot loop to signal the send loop
// when the agent is stable and there are items in the outbound queue.
stableSignal chan struct{}
// toolCallMessageSet keeps track of the tool calls that have been detected & logged in the current agent message
toolCallMessageSet map[string]bool
// initialPromptReady is closed when ReadyForInitialPrompt returns true.
// This is checked by a separate goroutine to avoid calling ReadyForInitialPrompt on every tick.
initialPromptReady chan struct{}
}

var _ Conversation = &PTYConversation{}

func NewPTY(ctx context.Context, cfg PTYConversationConfig, initialPrompt string) *PTYConversation {
func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation {
if cfg.Clock == nil {
cfg.Clock = quartz.NewReal()
}
Expand All @@ -118,33 +124,102 @@ func NewPTY(ctx context.Context, cfg PTYConversationConfig, initialPrompt string
Time: cfg.Clock.Now(),
},
},
InitialPrompt: initialPrompt,
InitialPromptSent: len(initialPrompt) == 0,
outboundQueue: make(chan []MessagePart, 1),
stableSignal: make(chan struct{}, 1),
toolCallMessageSet: make(map[string]bool),
initialPromptReady: make(chan struct{}),
}
// If we have an initial prompt, enqueue it
if len(cfg.InitialPrompt) > 0 {
c.outboundQueue <- cfg.InitialPrompt
}
if c.cfg.OnSnapshot == nil {
c.cfg.OnSnapshot = func(ConversationStatus, []ConversationMessage, string) {}
}
if c.cfg.ReadyForInitialPrompt == nil {
c.cfg.ReadyForInitialPrompt = func(string) bool { return true }
}
return c
}

func (c *PTYConversation) Start(ctx context.Context) {
// Initial prompt readiness loop - polls ReadyForInitialPrompt until it returns true,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start now runs three goroutines:

  • Wait for initial readiness
  • Poll for status and signal readiness to send messages
  • Pull from internal queue and send messages

Wait goroutine blocks poll until initial ready, and then exits.
Poll blocks pull until agent is stable and there is a message to send.
Pull pulls from outbound queue and sends.

// then sets initialPromptReady and exits. This avoids calling ReadyForInitialPrompt
// on every snapshot tick.
go func() {
ticker := c.cfg.Clock.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
screen := c.cfg.AgentIO.ReadScreen()
if c.cfg.ReadyForInitialPrompt(screen) {
close(c.initialPromptReady)
return
}
}
}
}()

// Snapshot loop
go func() {
ticker := c.cfg.Clock.NewTicker(c.cfg.SnapshotInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// It's important that we hold the lock while reading the screen.
// There's a race condition that occurs without it:
// 1. The screen is read
// 2. Independently, Send is called and takes the lock.
// 3. snapshotLocked is called and waits on the lock.
// 4. Send modifies the terminal state, releases the lock
// 5. snapshotLocked adds a snapshot from a stale screen
c.lock.Lock()
screen := c.cfg.AgentIO.ReadScreen()
c.snapshotLocked(screen)
status := c.statusLocked()
messages := c.messagesLocked()

// Signal send loop if agent is ready and queue has items.
// We check readiness independently of statusLocked() because
// statusLocked() returns "changing" when queue has items.
isReady := false
select {
case <-c.initialPromptReady:
isReady = true
default:
}
if len(c.outboundQueue) > 0 && c.isScreenStableLocked() && isReady {
select {
case c.stableSignal <- struct{}{}:
default:
// Signal already pending
}
}
c.lock.Unlock()

c.cfg.OnSnapshot(status, messages, screen)
}
}
}()

// Send loop
go func() {
for {
select {
case <-ctx.Done():
return
case <-c.stableSignal:
select {
case parts := <-c.outboundQueue:
c.lock.Lock()
if err := c.sendLocked(parts...); err != nil {
c.cfg.Logger.Error("failed to send queued message", "error", err)
}
c.lock.Unlock()
default:
c.cfg.Logger.Error("received stable signal but outbound queue is empty")
}
}
}
}()
Expand Down Expand Up @@ -226,6 +301,11 @@ func (c *PTYConversation) Send(messageParts ...MessagePart) error {
return ErrMessageValidationChanging
}

return c.sendLocked(messageParts...)
}

// sendLocked sends a message to the agent. Caller MUST hold c.lock.
func (c *PTYConversation) sendLocked(messageParts ...MessagePart) error {
var sb strings.Builder
for _, part := range messageParts {
sb.WriteString(part.String())
Expand Down Expand Up @@ -328,6 +408,21 @@ func (c *PTYConversation) Status() ConversationStatus {
return c.statusLocked()
}

// isScreenStableLocked returns true if the screen content has been stable
// for the required number of snapshots. Caller MUST hold c.lock.
func (c *PTYConversation) isScreenStableLocked() bool {
snapshots := c.snapshotBuffer.GetAll()
if len(snapshots) < c.stableSnapshotsThreshold {
return false
}
for i := 1; i < len(snapshots); i++ {
if snapshots[0].screen != snapshots[i].screen {
return false
}
}
return true
}

// caller MUST hold c.lock
func (c *PTYConversation) statusLocked() ConversationStatus {
// sanity checks
Expand All @@ -350,17 +445,13 @@ func (c *PTYConversation) statusLocked() ConversationStatus {
return ConversationStatusInitializing
}

for i := 1; i < len(snapshots); i++ {
if snapshots[0].screen != snapshots[i].screen {
return ConversationStatusChanging
}
if !c.isScreenStableLocked() {
return ConversationStatusChanging
}

if !c.InitialPromptSent && !c.ReadyForInitialPrompt {
if len(snapshots) > 0 && c.cfg.ReadyForInitialPrompt(snapshots[len(snapshots)-1].screen) {
c.ReadyForInitialPrompt = true
return ConversationStatusStable
}
// Handle initial prompt readiness: report "changing" until the queue is drained
// to avoid the status flipping "changing" -> "stable" -> "changing"
if len(c.outboundQueue) > 0 {
return ConversationStatusChanging
}

Expand All @@ -371,6 +462,11 @@ func (c *PTYConversation) Messages() []ConversationMessage {
c.lock.Lock()
defer c.lock.Unlock()

return c.messagesLocked()
}

// messagesLocked returns a copy of messages. Caller MUST hold c.lock.
func (c *PTYConversation) messagesLocked() []ConversationMessage {
result := make([]ConversationMessage, len(c.messages))
copy(result, c.messages)
return result
Expand Down
Loading