This repository was archived by the owner on May 5, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinteractive.go
More file actions
320 lines (286 loc) · 8.29 KB
/
interactive.go
File metadata and controls
320 lines (286 loc) · 8.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package agentbridge
import (
"context"
"errors"
"strings"
"sync"
)
// SteerMode describes how a provider handles input submitted while a turn is
// already running.
type SteerMode string
const (
// SteerModeNative means the provider can inject input into the active turn.
SteerModeNative SteerMode = "native"
// SteerModeNativeEnqueue means the provider can append input to the active
// session without agentbridge waiting for the current turn to finish. The
// provider may consume it at the next model/tool boundary.
SteerModeNativeEnqueue SteerMode = "native_enqueue"
// SteerModeQueueWhenBusy means agentbridge queues input until the turn is idle.
SteerModeQueueWhenBusy SteerMode = "queue_when_busy"
// SteerModeInterruptOnly means the provider has no safe enqueue semantics.
SteerModeInterruptOnly SteerMode = "interrupt_only"
)
// TurnEventKind is a normalized event emitted by interactive backends.
type TurnEventKind string
const (
TurnEventStarted TurnEventKind = "turn_started"
// TurnEventAssistantText contains a complete assistant message. Providers
// that stream chunks coalesce them before emitting this normalized event.
TurnEventAssistantText TurnEventKind = "assistant_text"
// TurnEventUserText contains user-authored text observed on a provider
// event stream.
TurnEventUserText TurnEventKind = "user_text"
TurnEventReasoning TurnEventKind = "reasoning"
TurnEventToolUse TurnEventKind = "tool_use"
TurnEventFileChange TurnEventKind = "file_change"
TurnEventSteerConsumed TurnEventKind = "steer_consumed"
TurnEventCompleted TurnEventKind = "turn_completed"
TurnEventInterrupted TurnEventKind = "turn_interrupted"
TurnEventError TurnEventKind = "error"
)
// TurnEvent is the provider-neutral event stream used by interactive sessions.
type TurnEvent struct {
Provider string
ThreadID string
TurnID string
Kind TurnEventKind
Text string
Usage Usage
Err error
Raw string
}
// TurnRef identifies an in-flight provider turn.
type TurnRef struct {
ThreadID string
TurnID string
}
// SubmitMode describes how a submitted user message was accepted.
type SubmitMode string
const (
SubmitStarted SubmitMode = "started"
SubmitSteered SubmitMode = "steered"
SubmitQueued SubmitMode = "queued"
)
// SubmitResult is returned immediately after input is accepted.
type SubmitResult struct {
ThreadID string
TurnID string
Mode SubmitMode
QueueDepth int
}
// InteractiveDriver is implemented by provider-specific long-lived transports.
type InteractiveDriver interface {
SteerMode() SteerMode
StartTurn(ctx context.Context, req RunRequest) (TurnRef, error)
SteerTurn(ctx context.Context, turn TurnRef, req RunRequest) error
InterruptTurn(ctx context.Context, turn TurnRef) error
Events() <-chan TurnEvent
Close() error
}
var ErrInteractiveClosed = errors.New("interactive session is closed")
var ErrNoActiveTurn = errors.New("no active turn to steer")
// InteractiveSession serializes user input for one logical conversation.
//
// If the driver supports native steer/enqueue, Submit injects or appends new
// input into the active provider session. Otherwise Submit queues input and
// starts the next turn after completion.
type InteractiveSession struct {
driver InteractiveDriver
events chan TurnEvent
done chan struct{}
mu sync.Mutex
closed bool
active *TurnRef
queue []RunRequest
}
// NewInteractiveSession wraps a provider driver with common steer/queue state.
func NewInteractiveSession(driver InteractiveDriver) *InteractiveSession {
s := &InteractiveSession{
driver: driver,
events: make(chan TurnEvent, 128),
done: make(chan struct{}),
}
go s.forwardEvents()
return s
}
// Events returns the normalized event stream for this conversation.
func (s *InteractiveSession) Events() <-chan TurnEvent {
if s == nil {
ch := make(chan TurnEvent)
close(ch)
return ch
}
return s.events
}
// SteerMode returns the active provider's busy-turn behavior.
func (s *InteractiveSession) SteerMode() SteerMode {
if s == nil || s.driver == nil {
return SteerModeInterruptOnly
}
return s.driver.SteerMode()
}
// Submit starts a turn, steers the active turn, or queues the input depending
// on the provider capability and current state.
func (s *InteractiveSession) Submit(ctx context.Context, req RunRequest) (SubmitResult, error) {
if s == nil || s.driver == nil {
return SubmitResult{}, ErrInteractiveClosed
}
req.UserText = strings.TrimSpace(req.UserText)
if req.UserText == "" {
return SubmitResult{}, errors.New("empty prompt")
}
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return SubmitResult{}, ErrInteractiveClosed
}
if s.active == nil {
turn, err := s.driver.StartTurn(ctx, req)
if err != nil {
return SubmitResult{}, err
}
s.active = &turn
return SubmitResult{ThreadID: turn.ThreadID, TurnID: turn.TurnID, Mode: SubmitStarted}, nil
}
if isProviderNativeSteerMode(s.driver.SteerMode()) {
turn := *s.active
if err := s.driver.SteerTurn(ctx, turn, req); err != nil {
return SubmitResult{}, err
}
return SubmitResult{ThreadID: turn.ThreadID, TurnID: turn.TurnID, Mode: SubmitSteered}, nil
}
s.queue = append(s.queue, req)
return SubmitResult{
ThreadID: s.active.ThreadID,
TurnID: s.active.TurnID,
Mode: SubmitQueued,
QueueDepth: len(s.queue),
}, nil
}
// Steer injects/appends input into the active turn and fails if there is no
// active provider-native turn. It never starts a new turn.
func (s *InteractiveSession) Steer(ctx context.Context, req RunRequest) (SubmitResult, error) {
if s == nil || s.driver == nil {
return SubmitResult{}, ErrInteractiveClosed
}
req.UserText = strings.TrimSpace(req.UserText)
if req.UserText == "" {
return SubmitResult{}, errors.New("empty prompt")
}
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return SubmitResult{}, ErrInteractiveClosed
}
if s.active == nil {
return SubmitResult{}, ErrNoActiveTurn
}
if !isProviderNativeSteerMode(s.driver.SteerMode()) {
return SubmitResult{}, ErrSteerUnsupported
}
turn := *s.active
if err := s.driver.SteerTurn(ctx, turn, req); err != nil {
return SubmitResult{}, err
}
return SubmitResult{ThreadID: turn.ThreadID, TurnID: turn.TurnID, Mode: SubmitSteered}, nil
}
func isProviderNativeSteerMode(mode SteerMode) bool {
return mode == SteerModeNative || mode == SteerModeNativeEnqueue
}
// Interrupt requests cancellation of the active turn.
func (s *InteractiveSession) Interrupt(ctx context.Context) error {
if s == nil || s.driver == nil {
return ErrInteractiveClosed
}
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return ErrInteractiveClosed
}
if s.active == nil {
return nil
}
return s.driver.InterruptTurn(ctx, *s.active)
}
// Close stops the underlying provider process.
func (s *InteractiveSession) Close() error {
if s == nil || s.driver == nil {
return nil
}
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
close(s.done)
s.mu.Unlock()
return s.driver.Close()
}
func (s *InteractiveSession) forwardEvents() {
defer close(s.events)
for {
select {
case <-s.done:
return
case event, ok := <-s.driver.Events():
if !ok {
s.mu.Lock()
s.closed = true
s.active = nil
s.queue = nil
s.mu.Unlock()
return
}
s.handleEvent(event)
select {
case s.events <- event:
case <-s.done:
return
}
}
}
}
func (s *InteractiveSession) handleEvent(event TurnEvent) {
switch event.Kind {
case TurnEventCompleted, TurnEventInterrupted, TurnEventError:
default:
return
}
var next *RunRequest
s.mu.Lock()
if s.active != nil && (event.TurnID == "" || event.TurnID == s.active.TurnID) {
s.active = nil
}
if !s.closed && s.active == nil && len(s.queue) > 0 {
req := s.queue[0]
if strings.TrimSpace(req.ThreadID) == "" && strings.TrimSpace(event.ThreadID) != "" {
req.ThreadID = strings.TrimSpace(event.ThreadID)
}
copy(s.queue, s.queue[1:])
s.queue = s.queue[:len(s.queue)-1]
next = &req
}
s.mu.Unlock()
if next != nil {
go s.startQueued(*next)
}
}
func (s *InteractiveSession) startQueued(req RunRequest) {
turn, err := s.driver.StartTurn(context.Background(), req)
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
if err == nil {
s.active = &turn
}
s.mu.Unlock()
if err != nil {
select {
case s.events <- TurnEvent{Kind: TurnEventError, Err: err}:
case <-s.done:
}
}
}