From 3ff8c4260e2f931254bf0dec958ad6339f1e2313 Mon Sep 17 00:00:00 2001 From: Michael Brown Date: Sat, 21 Mar 2026 21:47:36 -0400 Subject: [PATCH 1/3] Support model change notices and provider handoffs - Emit and project thread.model-set events - Add model change activity notices in orchestration - Allow provider switches with handoff history --- .../OrchestrationEngineHarness.integration.ts | 5 + .../Layers/ModelChangeReactor.test.ts | 192 ++++++++++++++++++ .../Layers/ModelChangeReactor.ts | 68 +++++++ .../Layers/OrchestrationEngine.test.ts | 64 ++++++ .../Layers/OrchestrationEngine.ts | 22 +- .../Layers/OrchestrationReactor.test.ts | 12 +- .../Layers/OrchestrationReactor.ts | 3 + .../Layers/ProjectionPipeline.test.ts | 93 +++++++++ .../Layers/ProjectionPipeline.ts | 15 ++ .../Layers/ProviderCommandReactor.test.ts | 162 ++++++++++----- .../Layers/ProviderCommandReactor.ts | 116 ++++++++--- .../Layers/ProviderRuntimeIngestion.test.ts | 47 ++++- .../Layers/ProviderRuntimeIngestion.ts | 11 + apps/server/src/orchestration/Schemas.ts | 2 + .../Services/ModelChangeReactor.ts | 31 +++ apps/server/src/orchestration/decider.ts | 36 ++++ .../src/orchestration/projector.test.ts | 55 +++++ apps/server/src/orchestration/projector.ts | 12 ++ .../src/orchestration/providerHandoff.ts | 114 +++++++++++ apps/server/src/serverLayers.ts | 5 + apps/web/public/mockServiceWorker.js | 2 +- apps/web/src/components/ChatView.tsx | 47 +++-- .../components/chat/MessagesTimeline.test.tsx | 51 +++++ .../src/components/chat/MessagesTimeline.tsx | 50 +++++ apps/web/src/session-logic.test.ts | 104 +++++++++- apps/web/src/session-logic.ts | 63 +++++- packages/contracts/src/orchestration.test.ts | 58 ++++++ packages/contracts/src/orchestration.ts | 43 ++++ packages/shared/src/model.test.ts | 18 ++ packages/shared/src/model.ts | 43 ++++ packages/shared/src/shell.test.ts | 72 +++++++ packages/shared/src/shell.ts | 37 +++- 32 files changed, 1548 insertions(+), 105 deletions(-) create mode 100644 apps/server/src/orchestration/Layers/ModelChangeReactor.test.ts create mode 100644 apps/server/src/orchestration/Layers/ModelChangeReactor.ts create mode 100644 apps/server/src/orchestration/Services/ModelChangeReactor.ts create mode 100644 apps/server/src/orchestration/providerHandoff.ts diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 19f34b49a0..e48bcd1371 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -43,6 +43,7 @@ import { CodexAdapter } from "../src/provider/Services/CodexAdapter.ts"; import { ProviderService } from "../src/provider/Services/ProviderService.ts"; import { AnalyticsService } from "../src/telemetry/Services/AnalyticsService.ts"; import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointReactor.ts"; +import { ModelChangeReactorLive } from "../src/orchestration/Layers/ModelChangeReactor.ts"; import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts"; import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts"; @@ -311,10 +312,14 @@ export const makeOrchestrationIntegrationHarness = ( const checkpointReactorLayer = CheckpointReactorLive.pipe( Layer.provideMerge(runtimeServicesLayer), ); + const modelChangeReactorLayer = ModelChangeReactorLive.pipe( + Layer.provideMerge(runtimeServicesLayer), + ); const orchestrationReactorLayer = OrchestrationReactorLive.pipe( Layer.provideMerge(runtimeIngestionLayer), Layer.provideMerge(providerCommandReactorLayer), Layer.provideMerge(checkpointReactorLayer), + Layer.provideMerge(modelChangeReactorLayer), ); const layer = orchestrationReactorLayer.pipe( Layer.provide(persistenceLayer), diff --git a/apps/server/src/orchestration/Layers/ModelChangeReactor.test.ts b/apps/server/src/orchestration/Layers/ModelChangeReactor.test.ts new file mode 100644 index 0000000000..2117b7e650 --- /dev/null +++ b/apps/server/src/orchestration/Layers/ModelChangeReactor.test.ts @@ -0,0 +1,192 @@ +import type { OrchestrationThreadActivity } from "@t3tools/contracts"; +import { + CommandId, + DEFAULT_PROVIDER_INTERACTION_MODE, + ProjectId, + ThreadId, +} from "@t3tools/contracts"; +import { Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import { afterEach, describe, expect, it } from "vitest"; + +import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; +import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts"; +import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; +import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; +import { ModelChangeReactorLive } from "./ModelChangeReactor.ts"; +import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; +import { + OrchestrationEngineService, + type OrchestrationEngineShape, +} from "../Services/OrchestrationEngine.ts"; +import { ModelChangeReactor } from "../Services/ModelChangeReactor.ts"; +import { ServerConfig } from "../../config.ts"; +import * as NodeServices from "@effect/platform-node/NodeServices"; + +const asProjectId = (value: string): ProjectId => ProjectId.makeUnsafe(value); +const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value); + +async function waitForActivity( + engine: OrchestrationEngineShape, + predicate: (activity: OrchestrationThreadActivity) => boolean, + timeoutMs = 2000, +): Promise { + const deadline = Date.now() + timeoutMs; + const poll = async (): Promise => { + const readModel = await Effect.runPromise(engine.getReadModel()); + const activities = + readModel.threads.find((thread) => thread.id === "thread-1")?.activities ?? []; + const match = activities.find(predicate); + if (match) { + return match; + } + if (Date.now() >= deadline) { + throw new Error("Timed out waiting for model change activity"); + } + await new Promise((resolve) => setTimeout(resolve, 10)); + return poll(); + }; + return poll(); +} + +describe("ModelChangeReactor", () => { + let runtime: ManagedRuntime.ManagedRuntime< + OrchestrationEngineService | ModelChangeReactor, + unknown + > | null = null; + let scope: Scope.Closeable | null = null; + + afterEach(async () => { + if (scope) { + await Effect.runPromise(Scope.close(scope, Exit.void)); + } + scope = null; + if (runtime) { + await runtime.dispose(); + } + runtime = null; + }); + + async function createHarness() { + const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionPipelineLive), + Layer.provide(OrchestrationEventStoreLive), + Layer.provide(OrchestrationCommandReceiptRepositoryLive), + Layer.provide(SqlitePersistenceMemory), + ); + const layer = ModelChangeReactorLive.pipe( + Layer.provideMerge(orchestrationLayer), + Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())), + Layer.provideMerge(NodeServices.layer), + ); + runtime = ManagedRuntime.make(layer); + + const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const reactor = await runtime.runPromise(Effect.service(ModelChangeReactor)); + scope = await Effect.runPromise(Scope.make("sequential")); + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + + const createdAt = new Date().toISOString(); + await Effect.runPromise( + engine.dispatch({ + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-create"), + projectId: asProjectId("project-1"), + title: "Project", + workspaceRoot: "/tmp/project-1", + defaultModel: "gpt-5-codex", + createdAt, + }), + ); + await Effect.runPromise( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-create"), + threadId: asThreadId("thread-1"), + projectId: asProjectId("project-1"), + title: "Thread", + model: "gpt-5-codex", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + }), + ); + + return { engine, reactor }; + } + + it("appends a user-triggered model change notice activity", async () => { + const { engine } = await createHarness(); + + await Effect.runPromise( + engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-model-set-user"), + threadId: asThreadId("thread-1"), + model: "gpt-5.4", + source: "client", + }), + ); + + const activity = await waitForActivity( + engine, + (entry) => entry.kind === "thread.model.changed" && entry.summary === "Model changed", + ); + + expect(activity.payload).toMatchObject({ + fromModel: "gpt-5-codex", + toModel: "gpt-5.4", + source: "user", + }); + }); + + it("appends a provider-reroute notice with reason", async () => { + const { engine } = await createHarness(); + + await Effect.runPromise( + engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-model-set-reroute"), + threadId: asThreadId("thread-1"), + model: "gpt-5.4-mini", + source: "provider-reroute", + reason: "capacity", + }), + ); + + const activity = await waitForActivity( + engine, + (entry) => + entry.kind === "thread.model.changed" && + typeof (entry.payload as { reason?: unknown }).reason === "string", + ); + + expect(activity.payload).toMatchObject({ + fromModel: "gpt-5-codex", + toModel: "gpt-5.4-mini", + source: "provider-reroute", + reason: "capacity", + }); + }); + + it("does not append a notice when the model does not change", async () => { + const { engine, reactor } = await createHarness(); + + await Effect.runPromise( + engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-model-set-noop"), + threadId: asThreadId("thread-1"), + model: "gpt-5-codex", + source: "client", + }), + ); + await Effect.runPromise(reactor.drain); + + const readModel = await Effect.runPromise(engine.getReadModel()); + expect(readModel.threads.find((thread) => thread.id === "thread-1")?.activities ?? []).toEqual( + [], + ); + }); +}); diff --git a/apps/server/src/orchestration/Layers/ModelChangeReactor.ts b/apps/server/src/orchestration/Layers/ModelChangeReactor.ts new file mode 100644 index 0000000000..6d93dc5370 --- /dev/null +++ b/apps/server/src/orchestration/Layers/ModelChangeReactor.ts @@ -0,0 +1,68 @@ +import { + CommandId, + EventId, + type OrchestrationEvent, + type ThreadModelChangedActivityPayload, +} from "@t3tools/contracts"; +import { Effect, Layer, Stream } from "effect"; +import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; + +import { + ModelChangeReactor, + type ModelChangeReactorShape, +} from "../Services/ModelChangeReactor.ts"; +import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; + +type ModelSetDomainEvent = Extract; + +const serverCommandId = (tag: string): CommandId => + CommandId.makeUnsafe(`server:${tag}:${crypto.randomUUID()}`); + +const make = Effect.gen(function* () { + const orchestrationEngine = yield* OrchestrationEngineService; + + const appendModelChangeActivity = (event: ModelSetDomainEvent) => { + const payload: ThreadModelChangedActivityPayload = { + fromModel: event.payload.previousModel, + toModel: event.payload.model, + source: event.payload.source === "client" ? "user" : "provider-reroute", + ...(event.payload.reason !== undefined ? { reason: event.payload.reason } : {}), + }; + + return orchestrationEngine.dispatch({ + type: "thread.activity.append", + commandId: serverCommandId("thread-model-change-notice"), + threadId: event.payload.threadId, + activity: { + id: EventId.makeUnsafe(crypto.randomUUID()), + tone: "info", + kind: "thread.model.changed", + summary: "Model changed", + payload, + turnId: null, + createdAt: event.payload.updatedAt, + }, + createdAt: event.payload.updatedAt, + }); + }; + + const worker = yield* makeDrainableWorker((event: ModelSetDomainEvent) => + appendModelChangeActivity(event), + ); + + const start: ModelChangeReactorShape["start"] = Effect.forkScoped( + Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => { + if (event.type !== "thread.model-set") { + return Effect.void; + } + return worker.enqueue(event); + }), + ); + + return { + start, + drain: worker.drain, + } satisfies ModelChangeReactorShape; +}); + +export const ModelChangeReactorLive = Layer.effect(ModelChangeReactor, make); diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 0aa204d829..c435851192 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -221,6 +221,70 @@ describe("OrchestrationEngine", () => { await system.dispose(); }); + it("emits thread.model-set when a thread model changes", async () => { + const system = await createOrchestrationSystem(); + const { engine } = system; + const createdAt = now(); + + await system.run( + engine.dispatch({ + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-stream-model-create"), + projectId: asProjectId("project-stream-model"), + title: "Stream Model Project", + workspaceRoot: "/tmp/project-stream-model", + defaultModel: "gpt-5-codex", + createdAt, + }), + ); + + await system.run( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-stream-thread-model-create"), + threadId: ThreadId.makeUnsafe("thread-stream-model"), + projectId: asProjectId("project-stream-model"), + title: "domain-stream-model", + model: "gpt-5-codex", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt, + }), + ); + + const event = await system.run( + Effect.gen(function* () { + const eventQueue = yield* Queue.unbounded(); + yield* Effect.forkScoped( + Stream.take(engine.streamDomainEvents, 1).pipe( + Stream.runForEach((nextEvent) => + Queue.offer(eventQueue, nextEvent).pipe(Effect.asVoid), + ), + ), + ); + yield* Effect.sleep("10 millis"); + yield* engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-stream-thread-model-set"), + threadId: ThreadId.makeUnsafe("thread-stream-model"), + model: "gpt-5.4", + source: "client", + }); + return yield* Queue.take(eventQueue); + }).pipe(Effect.scoped), + ); + + expect(event.type).toBe("thread.model-set"); + if (event.type === "thread.model-set") { + expect(event.payload.previousModel).toBe("gpt-5-codex"); + expect(event.payload.model).toBe("gpt-5.4"); + } + + await system.dispose(); + }); + it("stores completed checkpoint summaries even when no files changed", async () => { const system = await createOrchestrationSystem(); const { engine } = system; diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 69b28b9d3c..5bac49ad56 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -107,6 +107,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { readModel, }); const eventBases = Array.isArray(eventBase) ? eventBase : [eventBase]; + const aggregateRef = commandToAggregateRef(envelope.command); const committedCommand = yield* sql .withTransaction( Effect.gen(function* () { @@ -122,10 +123,25 @@ const makeOrchestrationEngine = Effect.gen(function* () { const lastSavedEvent = committedEvents.at(-1) ?? null; if (lastSavedEvent === null) { - return yield* new OrchestrationCommandInvariantError({ - commandType: envelope.command.type, - detail: "Command produced no events.", + // No-op: the decider intentionally produced zero events + // (e.g. idempotent model set where the model is already current). + // Record a "skipped" receipt so the command is not retried, and + // return the read-model unchanged. + yield* commandReceiptRepository.upsert({ + commandId: envelope.command.commandId, + aggregateKind: aggregateRef.aggregateKind, + aggregateId: aggregateRef.aggregateId, + acceptedAt: new Date().toISOString(), + resultSequence: 0, + status: "accepted", + error: null, }); + + return { + committedEvents: [], + lastSequence: 0, + nextReadModel, + } as const; } yield* commandReceiptRepository.upsert({ diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts index 1514bef595..ba129e71e2 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts @@ -2,6 +2,7 @@ import { Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; import { afterEach, describe, expect, it } from "vitest"; import { CheckpointReactor } from "../Services/CheckpointReactor.ts"; +import { ModelChangeReactor } from "../Services/ModelChangeReactor.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts"; @@ -17,7 +18,7 @@ describe("OrchestrationReactor", () => { runtime = null; }); - it("starts provider ingestion, provider command, and checkpoint reactors", async () => { + it("starts provider ingestion, provider command, checkpoint, and model change reactors", async () => { const started: string[] = []; runtime = ManagedRuntime.make( @@ -46,6 +47,14 @@ describe("OrchestrationReactor", () => { drain: Effect.void, }), ), + Layer.provideMerge( + Layer.succeed(ModelChangeReactor, { + start: Effect.sync(() => { + started.push("model-change-reactor"); + }), + drain: Effect.void, + }), + ), ), ); @@ -57,6 +66,7 @@ describe("OrchestrationReactor", () => { "provider-runtime-ingestion", "provider-command-reactor", "checkpoint-reactor", + "model-change-reactor", ]); await Effect.runPromise(Scope.close(scope, Exit.void)); diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts index 1e498885a0..b6b7233e1e 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts @@ -5,6 +5,7 @@ import { type OrchestrationReactorShape, } from "../Services/OrchestrationReactor.ts"; import { CheckpointReactor } from "../Services/CheckpointReactor.ts"; +import { ModelChangeReactor } from "../Services/ModelChangeReactor.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; @@ -12,11 +13,13 @@ export const makeOrchestrationReactor = Effect.gen(function* () { const providerRuntimeIngestion = yield* ProviderRuntimeIngestionService; const providerCommandReactor = yield* ProviderCommandReactor; const checkpointReactor = yield* CheckpointReactor; + const modelChangeReactor = yield* ModelChangeReactor; const start: OrchestrationReactorShape["start"] = Effect.gen(function* () { yield* providerRuntimeIngestion.start; yield* providerCommandReactor.start; yield* checkpointReactor.start; + yield* modelChangeReactor.start; }); return { diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 83ee080fbe..433a9324b9 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -166,6 +166,99 @@ it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => { ); }); +it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-model-set-")))( + "OrchestrationProjectionPipeline", + (it) => { + it.effect("updates projection thread model rows from thread.model-set", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const createdAt = new Date().toISOString(); + const updatedAt = new Date(Date.parse(createdAt) + 1_000).toISOString(); + + yield* eventStore.append({ + type: "project.created", + eventId: EventId.makeUnsafe("evt-model-project"), + aggregateKind: "project", + aggregateId: ProjectId.makeUnsafe("project-model"), + occurredAt: createdAt, + commandId: CommandId.makeUnsafe("cmd-model-project"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-model-project"), + metadata: {}, + payload: { + projectId: ProjectId.makeUnsafe("project-model"), + title: "Project Model", + workspaceRoot: "/tmp/project-model", + defaultModel: null, + scripts: [], + createdAt, + updatedAt: createdAt, + }, + }); + + yield* eventStore.append({ + type: "thread.created", + eventId: EventId.makeUnsafe("evt-model-thread-create"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-model"), + occurredAt: createdAt, + commandId: CommandId.makeUnsafe("cmd-model-thread-create"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-model-thread-create"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-model"), + projectId: ProjectId.makeUnsafe("project-model"), + title: "Thread Model", + model: "gpt-5-codex", + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }); + + yield* eventStore.append({ + type: "thread.model-set", + eventId: EventId.makeUnsafe("evt-model-thread-set"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-model"), + occurredAt: updatedAt, + commandId: CommandId.makeUnsafe("cmd-model-thread-set"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-model-thread-set"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-model"), + model: "gpt-5.4", + previousModel: "gpt-5-codex", + source: "client", + updatedAt, + }, + }); + + yield* projectionPipeline.bootstrap; + + const rows = yield* sql<{ + readonly model: string; + readonly updatedAt: string; + }>` + SELECT + model, + updated_at AS "updatedAt" + FROM projection_threads + WHERE thread_id = 'thread-model' + `; + + assert.deepEqual(rows, [{ model: "gpt-5.4", updatedAt }]); + }), + ); + }, +); + it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-base-")))( "OrchestrationProjectionPipeline", (it) => { diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 0651dab646..c9cc8f37f5 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -452,6 +452,21 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { return; } + case "thread.model-set": { + const existingRow = yield* projectionThreadRepository.getById({ + threadId: event.payload.threadId, + }); + if (Option.isNone(existingRow)) { + return; + } + yield* projectionThreadRepository.upsert({ + ...existingRow.value, + model: event.payload.model, + updatedAt: event.payload.updatedAt, + }); + return; + } + case "thread.runtime-mode-set": { const existingRow = yield* projectionThreadRepository.getById({ threadId: event.payload.threadId, diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 5a7084a61b..68b58df928 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -503,7 +503,7 @@ describe("ProviderCommandReactor", () => { }); }); - it("rejects a first turn when requested provider conflicts with the thread model", async () => { + it("allows a first turn to switch providers when the request specifies a different provider", async () => { const harness = await createHarness(); const now = new Date().toISOString(); @@ -519,40 +519,27 @@ describe("ProviderCommandReactor", () => { attachments: [], }, provider: "claudeAgent", + model: "claude-sonnet-4-6", interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, runtimeMode: "approval-required", createdAt: now, }), ); - await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); - const thread = readModel.threads.find( - (entry) => entry.id === ThreadId.makeUnsafe("thread-1"), - ); - return ( - thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? - false - ); - }); - - expect(harness.startSession).not.toHaveBeenCalled(); - expect(harness.sendTurn).not.toHaveBeenCalled(); + await waitFor(() => harness.startSession.mock.calls.length === 1); + await waitFor(() => harness.sendTurn.mock.calls.length === 1); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); - const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1")); - expect(thread?.session).toBeNull(); - expect( - thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"), - ).toMatchObject({ - summary: "Provider turn start failed", - payload: { - detail: expect.stringContaining("cannot switch to 'claudeAgent'"), - }, + expect(harness.startSession.mock.calls[0]?.[1]).toMatchObject({ + provider: "claudeAgent", + model: "claude-sonnet-4-6", + }); + expect(harness.sendTurn.mock.calls[0]?.[0]).toMatchObject({ + threadId: ThreadId.makeUnsafe("thread-1"), + model: "claude-sonnet-4-6", }); }); - it("rejects a turn when the requested model belongs to a different provider", async () => { + it("rejects a turn when the requested model conflicts with an explicit provider", async () => { const harness = await createHarness(); const now = new Date().toISOString(); @@ -567,6 +554,7 @@ describe("ProviderCommandReactor", () => { text: "hello", attachments: [], }, + provider: "codex", model: "claude-sonnet-4-6", interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, runtimeMode: "approval-required", @@ -800,7 +788,7 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.runtimeMode).toBe("approval-required"); }); - it("rejects provider changes after a thread is already bound to a session provider", async () => { + it("restarts the session when switching providers on an existing thread", async () => { const harness = await createHarness(); const now = new Date().toISOString(); @@ -836,38 +824,120 @@ describe("ProviderCommandReactor", () => { attachments: [], }, provider: "claudeAgent", + model: "claude-sonnet-4-6", interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, runtimeMode: "approval-required", createdAt: now, }), ); - await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); - const thread = readModel.threads.find( - (entry) => entry.id === ThreadId.makeUnsafe("thread-1"), - ); - return ( - thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? - false - ); - }); - - expect(harness.startSession.mock.calls.length).toBe(1); - expect(harness.sendTurn.mock.calls.length).toBe(1); + await waitFor(() => harness.startSession.mock.calls.length === 2); + await waitFor(() => harness.sendTurn.mock.calls.length === 2); expect(harness.stopSession.mock.calls.length).toBe(0); const readModel = await Effect.runPromise(harness.engine.getReadModel()); const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1")); expect(thread?.session?.threadId).toBe("thread-1"); - expect(thread?.session?.providerName).toBe("codex"); + expect(thread?.session?.providerName).toBe("claudeAgent"); expect(thread?.session?.runtimeMode).toBe("approval-required"); - expect( - thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"), - ).toMatchObject({ - payload: { - detail: expect.stringContaining("cannot switch to 'claudeAgent'"), - }, + expect(harness.startSession.mock.calls[1]?.[1]).toMatchObject({ + provider: "claudeAgent", + model: "claude-sonnet-4-6", + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + model: "claude-sonnet-4-6", + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("You are continuing an existing conversation"), + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("User:\nfirst"), + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("Latest user message:\nsecond"), + }); + }); + + it("preserves handoff history when switching providers after the previous session stopped", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-provider-stopped-1"), + threadId: ThreadId.makeUnsafe("thread-1"), + message: { + messageId: asMessageId("user-message-provider-stopped-1"), + role: "user", + text: "first", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length === 1); + await waitFor(() => harness.sendTurn.mock.calls.length === 1); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.stop", + commandId: CommandId.makeUnsafe("cmd-session-stop-before-provider-switch"), + threadId: ThreadId.makeUnsafe("thread-1"), + createdAt: now, + }), + ); + + await waitFor(() => harness.stopSession.mock.calls.length === 1); + + const stoppedReadModel = await Effect.runPromise(harness.engine.getReadModel()); + const stoppedThread = stoppedReadModel.threads.find( + (entry) => entry.id === ThreadId.makeUnsafe("thread-1"), + ); + expect(stoppedThread?.session?.status).toBe("stopped"); + expect(stoppedThread?.session?.providerName).toBe("codex"); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-provider-stopped-2"), + threadId: ThreadId.makeUnsafe("thread-1"), + message: { + messageId: asMessageId("user-message-provider-stopped-2"), + role: "user", + text: "second", + attachments: [], + }, + provider: "claudeAgent", + model: "claude-sonnet-4-6", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length === 2); + await waitFor(() => harness.sendTurn.mock.calls.length === 2); + + expect(harness.stopSession.mock.calls.length).toBe(1); + expect(harness.startSession.mock.calls[1]?.[1]).toMatchObject({ + provider: "claudeAgent", + model: "claude-sonnet-4-6", + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + model: "claude-sonnet-4-6", + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("You are continuing an existing conversation"), + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("User:\nfirst"), + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("Latest user message:\nsecond"), }); }); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 57405ca515..1fb7284f2a 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -3,6 +3,7 @@ import { CommandId, DEFAULT_GIT_TEXT_GENERATION_MODEL, EventId, + MessageId, type OrchestrationEvent, type ProviderModelOptions, ProviderKind, @@ -27,6 +28,7 @@ import { type ProviderCommandReactorShape, } from "../Services/ProviderCommandReactor.ts"; import { inferProviderForModel } from "@t3tools/shared/model"; +import { buildProviderHandoffPrompt, getMessagesBeforeMessage } from "../providerHandoff.ts"; type ProviderIntentEvent = Extract< OrchestrationEvent, @@ -81,6 +83,15 @@ const sameModelOptions = ( right: ProviderModelOptions | undefined, ): boolean => JSON.stringify(left ?? null) === JSON.stringify(right ?? null); +interface EnsuredSessionResult { + readonly sessionThreadId: ThreadId; + readonly providerSwitched: boolean; + readonly previousProvider?: ProviderKind; + readonly previousModel?: string; + readonly desiredProvider: ProviderKind; + readonly desiredModel?: string; +} + function isUnknownPendingApprovalRequestError(cause: Cause.Cause): boolean { const error = Cause.squash(cause); if (Schema.is(ProviderAdapterRequestError)(error)) { @@ -233,26 +244,28 @@ const make = Effect.gen(function* () { ) ? thread.session.providerName : undefined; - const threadProvider: ProviderKind = currentProvider ?? inferProviderForModel(thread.model); - if (options?.provider !== undefined && options.provider !== threadProvider) { - return yield* new ProviderAdapterRequestError({ - provider: threadProvider, - method: "thread.turn.start", - detail: `Thread '${threadId}' is bound to provider '${threadProvider}' and cannot switch to '${options.provider}'.`, - }); - } + const persistedThreadProvider: ProviderKind = inferProviderForModel(thread.model); + const desiredProvider: ProviderKind = + options?.provider ?? + (options?.model !== undefined + ? inferProviderForModel(options.model, currentProvider ?? persistedThreadProvider) + : (currentProvider ?? persistedThreadProvider)); if ( options?.model !== undefined && - inferProviderForModel(options.model, threadProvider) !== threadProvider + inferProviderForModel(options.model, desiredProvider) !== desiredProvider ) { return yield* new ProviderAdapterRequestError({ - provider: threadProvider, + provider: desiredProvider, method: "thread.turn.start", - detail: `Model '${options.model}' does not belong to provider '${threadProvider}' for thread '${threadId}'.`, + detail: `Model '${options.model}' does not belong to provider '${desiredProvider}' for thread '${threadId}'.`, }); } - const preferredProvider: ProviderKind = currentProvider ?? threadProvider; - const desiredModel = options?.model ?? thread.model; + const preferredProvider: ProviderKind = desiredProvider; + const desiredModel = + options?.model ?? + (desiredProvider === persistedThreadProvider || currentProvider === desiredProvider + ? thread.model + : undefined); const effectiveCwd = resolveThreadWorkspaceCwd({ thread, projects: readModel.projects, @@ -302,8 +315,7 @@ const make = Effect.gen(function* () { thread.session && thread.session.status !== "stopped" ? thread.id : null; if (existingSessionThreadId) { const runtimeModeChanged = thread.runtimeMode !== thread.session?.runtimeMode; - const providerChanged = - options?.provider !== undefined && options.provider !== currentProvider; + const providerChanged = currentProvider !== undefined && desiredProvider !== currentProvider; const activeSession = yield* resolveActiveSession(existingSessionThreadId); const sessionModelSwitch = currentProvider === undefined @@ -323,7 +335,13 @@ const make = Effect.gen(function* () { !shouldRestartForModelChange && !shouldRestartForModelOptionsChange ) { - return existingSessionThreadId; + const result: EnsuredSessionResult = { + sessionThreadId: existingSessionThreadId, + providerSwitched: false, + desiredProvider, + ...(desiredModel !== undefined ? { desiredModel } : {}), + }; + return result; } const resumeCursor = @@ -334,7 +352,7 @@ const make = Effect.gen(function* () { threadId, existingSessionThreadId, currentProvider, - desiredProvider: options?.provider ?? currentProvider, + desiredProvider, currentRuntimeMode: thread.session?.runtimeMode, desiredRuntimeMode: thread.runtimeMode, runtimeModeChanged, @@ -346,7 +364,7 @@ const make = Effect.gen(function* () { }); const restartedSession = yield* startProviderSession({ ...(resumeCursor !== undefined ? { resumeCursor } : {}), - ...(options?.provider !== undefined ? { provider: options.provider } : {}), + provider: desiredProvider, }); yield* Effect.logInfo("provider command reactor restarted provider session", { threadId, @@ -356,18 +374,47 @@ const make = Effect.gen(function* () { runtimeMode: restartedSession.runtimeMode, }); yield* bindSessionToThread(restartedSession); - return restartedSession.threadId; + const result: EnsuredSessionResult = { + sessionThreadId: restartedSession.threadId, + providerSwitched: providerChanged, + ...(currentProvider !== undefined ? { previousProvider: currentProvider } : {}), + ...(activeSession?.model !== undefined ? { previousModel: activeSession.model } : {}), + desiredProvider, + ...(desiredModel !== undefined ? { desiredModel } : {}), + }; + return result; } - const startedSession = yield* startProviderSession( - options?.provider !== undefined ? { provider: options.provider } : undefined, - ); + const startedSession = yield* startProviderSession({ provider: desiredProvider }); yield* bindSessionToThread(startedSession); - return startedSession.threadId; + + // Detect provider switch even when the previous session is stopped. + // The last-known provider comes from the stopped session metadata or, + // failing that, from the model persisted on the thread. + const lastKnownProvider: ProviderKind | undefined = Schema.is(ProviderKind)( + thread.session?.providerName, + ) + ? thread.session.providerName + : undefined; + const providerChangedFromStopped = + lastKnownProvider !== undefined && lastKnownProvider !== desiredProvider; + + const result: EnsuredSessionResult = { + sessionThreadId: startedSession.threadId, + providerSwitched: providerChangedFromStopped, + ...(providerChangedFromStopped && lastKnownProvider !== undefined + ? { previousProvider: lastKnownProvider } + : {}), + ...(providerChangedFromStopped ? { previousModel: thread.model } : {}), + desiredProvider, + ...(desiredModel !== undefined ? { desiredModel } : {}), + }; + return result; }); const sendTurnForThread = Effect.fnUntraced(function* (input: { readonly threadId: ThreadId; + readonly messageId: MessageId; readonly messageText: string; readonly attachments?: ReadonlyArray; readonly provider?: ProviderKind; @@ -381,7 +428,7 @@ const make = Effect.gen(function* () { if (!thread) { return; } - yield* ensureSessionForThread(input.threadId, input.createdAt, { + const ensuredSession = yield* ensureSessionForThread(input.threadId, input.createdAt, { ...(input.provider !== undefined ? { provider: input.provider } : {}), ...(input.model !== undefined ? { model: input.model } : {}), ...(input.modelOptions !== undefined ? { modelOptions: input.modelOptions } : {}), @@ -393,7 +440,6 @@ const make = Effect.gen(function* () { if (input.modelOptions !== undefined) { threadModelOptions.set(input.threadId, input.modelOptions); } - const normalizedInput = toNonEmptyProviderInput(input.messageText); const normalizedAttachments = input.attachments ?? []; const activeSession = yield* providerService .listSessions() @@ -405,10 +451,27 @@ const make = Effect.gen(function* () { ? "in-session" : (yield* providerService.getCapabilities(activeSession.provider)).sessionModelSwitch; const modelForTurn = sessionModelSwitch === "unsupported" ? activeSession?.model : input.model; + const handoffPrompt = + ensuredSession.providerSwitched && + ensuredSession.previousProvider !== undefined && + getMessagesBeforeMessage(thread.messages, input.messageId).length > 0 + ? buildProviderHandoffPrompt({ + messagesBeforeCurrent: getMessagesBeforeMessage(thread.messages, input.messageId), + previousProvider: ensuredSession.previousProvider, + nextProvider: ensuredSession.desiredProvider, + previousModel: ensuredSession.previousModel, + nextModel: modelForTurn ?? ensuredSession.desiredModel, + latestUserText: input.messageText, + ...(input.attachments !== undefined ? { latestAttachments: input.attachments } : {}), + }) + : undefined; + const providerInput = toNonEmptyProviderInput( + handoffPrompt ? `${handoffPrompt}` : input.messageText, + ); yield* providerService.sendTurn({ threadId: input.threadId, - ...(normalizedInput ? { input: normalizedInput } : {}), + ...(providerInput ? { input: providerInput } : {}), ...(normalizedAttachments.length > 0 ? { attachments: normalizedAttachments } : {}), ...(modelForTurn !== undefined ? { model: modelForTurn } : {}), ...(input.modelOptions !== undefined ? { modelOptions: input.modelOptions } : {}), @@ -522,6 +585,7 @@ const make = Effect.gen(function* () { yield* sendTurnForThread({ threadId: event.payload.threadId, + messageId: message.id, messageText: message.text, ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), ...(event.payload.provider !== undefined ? { provider: event.payload.provider } : {}), diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index c1ba48108f..05043278df 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -30,12 +30,14 @@ import { type ProviderServiceShape, } from "../../provider/Services/ProviderService.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; +import { ModelChangeReactorLive } from "./ModelChangeReactor.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts"; import { OrchestrationEngineService, type OrchestrationEngineShape, } from "../Services/OrchestrationEngine.ts"; +import { ModelChangeReactor } from "../Services/ModelChangeReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; import { ServerConfig } from "../../config.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; @@ -129,7 +131,7 @@ type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][nu describe("ProviderRuntimeIngestion", () => { let runtime: ManagedRuntime.ManagedRuntime< - OrchestrationEngineService | ProviderRuntimeIngestionService, + OrchestrationEngineService | ProviderRuntimeIngestionService | ModelChangeReactor, unknown > | null = null; let scope: Scope.Closeable | null = null; @@ -165,7 +167,7 @@ describe("ProviderRuntimeIngestion", () => { Layer.provide(OrchestrationCommandReceiptRepositoryLive), Layer.provide(SqlitePersistenceMemory), ); - const layer = ProviderRuntimeIngestionLive.pipe( + const layer = Layer.mergeAll(ProviderRuntimeIngestionLive, ModelChangeReactorLive).pipe( Layer.provideMerge(orchestrationLayer), Layer.provideMerge(SqlitePersistenceMemory), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), @@ -175,8 +177,10 @@ describe("ProviderRuntimeIngestion", () => { runtime = ManagedRuntime.make(layer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService)); + const modelChangeReactor = await runtime.runPromise(Effect.service(ModelChangeReactor)); scope = await Effect.runPromise(Scope.make("sequential")); await Effect.runPromise(ingestion.start.pipe(Scope.provide(scope))); + await Effect.runPromise(modelChangeReactor.start.pipe(Scope.provide(scope))); const drain = () => Effect.runPromise(ingestion.drain); const createdAt = new Date().toISOString(); @@ -282,6 +286,45 @@ describe("ProviderRuntimeIngestion", () => { expect(thread.session?.lastError).toBe("turn failed"); }); + it("routes provider model reroutes through thread.model.set and appends a notice", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "model.rerouted", + eventId: asEventId("evt-model-rerouted"), + provider: "codex", + threadId: asThreadId("thread-1"), + createdAt: now, + payload: { + fromModel: "gpt-5-codex", + toModel: "gpt-5.4", + reason: "capacity", + }, + }); + + const thread = await waitForThread( + harness.engine, + (entry) => + entry.model === "gpt-5.4" && + entry.activities.some( + (activity) => + activity.kind === "thread.model.changed" && + typeof (activity.payload as { reason?: unknown }).reason === "string", + ), + ); + + expect(thread.model).toBe("gpt-5.4"); + expect( + thread.activities.find((activity) => activity.kind === "thread.model.changed")?.payload, + ).toMatchObject({ + fromModel: "gpt-5-codex", + toModel: "gpt-5.4", + source: "provider-reroute", + reason: "capacity", + }); + }); + it("applies provider session.state.changed transitions directly", async () => { const harness = await createHarness(); const waitingAt = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3df47941af..486257b042 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1158,6 +1158,17 @@ const make = Effect.gen(function* () { }); } + if (event.type === "model.rerouted" && event.payload.toModel !== thread.model) { + yield* orchestrationEngine.dispatch({ + type: "thread.model.set", + commandId: providerCommandId(event, "thread-model-set"), + threadId: thread.id, + model: event.payload.toModel, + source: "provider-reroute", + reason: event.payload.reason, + }); + } + if (event.type === "turn.diff.updated") { const turnId = toTurnId(event.turnId); if (turnId && (yield* isGitRepoForThread(thread.id))) { diff --git a/apps/server/src/orchestration/Schemas.ts b/apps/server/src/orchestration/Schemas.ts index c96385cad1..e07cd7a494 100644 --- a/apps/server/src/orchestration/Schemas.ts +++ b/apps/server/src/orchestration/Schemas.ts @@ -4,6 +4,7 @@ import { ProjectDeletedPayload as ContractsProjectDeletedPayloadSchema, ThreadCreatedPayload as ContractsThreadCreatedPayloadSchema, ThreadMetaUpdatedPayload as ContractsThreadMetaUpdatedPayloadSchema, + ThreadModelSetPayload as ContractsThreadModelSetPayloadSchema, ThreadRuntimeModeSetPayload as ContractsThreadRuntimeModeSetPayloadSchema, ThreadInteractionModeSetPayload as ContractsThreadInteractionModeSetPayloadSchema, ThreadDeletedPayload as ContractsThreadDeletedPayloadSchema, @@ -27,6 +28,7 @@ export const ProjectDeletedPayload = ContractsProjectDeletedPayloadSchema; export const ThreadCreatedPayload = ContractsThreadCreatedPayloadSchema; export const ThreadMetaUpdatedPayload = ContractsThreadMetaUpdatedPayloadSchema; +export const ThreadModelSetPayload = ContractsThreadModelSetPayloadSchema; export const ThreadRuntimeModeSetPayload = ContractsThreadRuntimeModeSetPayloadSchema; export const ThreadInteractionModeSetPayload = ContractsThreadInteractionModeSetPayloadSchema; export const ThreadDeletedPayload = ContractsThreadDeletedPayloadSchema; diff --git a/apps/server/src/orchestration/Services/ModelChangeReactor.ts b/apps/server/src/orchestration/Services/ModelChangeReactor.ts new file mode 100644 index 0000000000..76a8578fd6 --- /dev/null +++ b/apps/server/src/orchestration/Services/ModelChangeReactor.ts @@ -0,0 +1,31 @@ +/** + * ModelChangeReactor - Thread model change notice reaction service interface. + * + * Owns background workers that react to durable thread model changes and + * append inline timeline notices as orchestration activities. + * + * @module ModelChangeReactor + */ +import { ServiceMap } from "effect"; +import type { Effect, Scope } from "effect"; + +export interface ModelChangeReactorShape { + /** + * Start reacting to thread model-set domain events. + * + * The returned effect must be run in a scope so all worker fibers can be + * finalized on shutdown. + */ + readonly start: Effect.Effect; + + /** + * Resolves when the internal processing queue is empty and idle. + * Intended for test use to replace timing-sensitive sleeps. + */ + readonly drain: Effect.Effect; +} + +export class ModelChangeReactor extends ServiceMap.Service< + ModelChangeReactor, + ModelChangeReactorShape +>()("t3/orchestration/Services/ModelChangeReactor") {} diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 6ea4c51759..89091fedfb 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -215,6 +215,42 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }; } + case "thread.model.set": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const thread = readModel.threads.find((entry) => entry.id === command.threadId); + if (!thread) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Thread '${command.threadId}' was not found in read model.`, + }); + } + if (thread.model === command.model) { + return []; + } + const occurredAt = nowIso(); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt, + commandId: command.commandId, + }), + type: "thread.model-set", + payload: { + threadId: command.threadId, + model: command.model, + previousModel: thread.model, + source: command.source, + ...(command.reason !== undefined ? { reason: command.reason } : {}), + updatedAt: occurredAt, + }, + }; + } + case "thread.runtime-mode.set": { yield* requireThread({ readModel, diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 71f5b6bd4b..8fe009a715 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -267,6 +267,61 @@ describe("orchestration projector", () => { expect(afterUpdate.threads[0]?.updatedAt).toBe(updatedAt); }); + it("updates canonical thread model from thread.model-set", async () => { + const createdAt = "2026-02-23T08:00:00.000Z"; + const updatedAt = "2026-02-23T08:00:05.000Z"; + const model = createEmptyReadModel(createdAt); + + const afterCreate = await Effect.runPromise( + projectEvent( + model, + makeEvent({ + sequence: 1, + type: "thread.created", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: createdAt, + commandId: "cmd-create", + payload: { + threadId: "thread-1", + projectId: "project-1", + title: "demo", + model: "gpt-5.3-codex", + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }), + ), + ); + + const next = await Effect.runPromise( + projectEvent( + afterCreate, + makeEvent({ + sequence: 2, + type: "thread.model-set", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: updatedAt, + commandId: "cmd-model-set", + payload: { + threadId: "thread-1", + model: "gpt-5.4", + previousModel: "gpt-5.3-codex", + source: "client", + updatedAt, + }, + }), + ), + ); + + expect(next.threads[0]?.model).toBe("gpt-5.4"); + expect(next.threads[0]?.updatedAt).toBe(updatedAt); + }); + it("marks assistant messages completed with non-streaming updates", async () => { const createdAt = "2026-02-23T09:00:00.000Z"; const deltaAt = "2026-02-23T09:00:01.000Z"; diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 015f82a677..a5bee150c3 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -17,6 +17,7 @@ import { ThreadCreatedPayload, ThreadDeletedPayload, ThreadInteractionModeSetPayload, + ThreadModelSetPayload, ThreadMetaUpdatedPayload, ThreadProposedPlanUpsertedPayload, ThreadRuntimeModeSetPayload, @@ -303,6 +304,17 @@ export function projectEvent( })), ); + case "thread.model-set": + return decodeForEvent(ThreadModelSetPayload, event.payload, event.type, "payload").pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + model: payload.model, + updatedAt: payload.updatedAt, + }), + })), + ); + case "thread.runtime-mode-set": return decodeForEvent(ThreadRuntimeModeSetPayload, event.payload, event.type, "payload").pipe( Effect.map((payload) => ({ diff --git a/apps/server/src/orchestration/providerHandoff.ts b/apps/server/src/orchestration/providerHandoff.ts new file mode 100644 index 0000000000..a93accb7d8 --- /dev/null +++ b/apps/server/src/orchestration/providerHandoff.ts @@ -0,0 +1,114 @@ +import type { ChatAttachment, MessageId, ProviderKind } from "@t3tools/contracts"; +import { resolveModelDisplayName } from "@t3tools/shared/model"; + +type HandoffMessage = { + readonly id: MessageId; + readonly role: "user" | "assistant" | "system"; + readonly text: string; + readonly attachments?: ReadonlyArray | undefined; +}; + +const MAX_HANDOFF_TRANSCRIPT_CHARS = 12_000; +const MAX_HANDOFF_MESSAGES = 24; + +function formatAttachmentSummary( + attachments: ReadonlyArray | undefined, +): string | null { + if (!attachments || attachments.length === 0) { + return null; + } + + if (attachments.length === 1) { + const attachment = attachments[0]; + return attachment ? `Attachment: ${attachment.name}` : "Attachment included"; + } + + return `Attachments: ${attachments.map((attachment) => attachment.name).join(", ")}`; +} + +function formatTranscriptMessage(message: HandoffMessage): string { + const roleLabel = + message.role === "assistant" ? "Assistant" : message.role === "system" ? "System" : "User"; + const sections = [`${roleLabel}:`, message.text.trim() || "[No text content]"]; + const attachmentSummary = formatAttachmentSummary(message.attachments); + if (attachmentSummary) { + sections.push(`[${attachmentSummary}]`); + } + return sections.join("\n"); +} + +function buildTranscript(messages: ReadonlyArray): string { + const slices: string[] = []; + let usedChars = 0; + let usedCount = 0; + + for (let index = messages.length - 1; index >= 0; index -= 1) { + const message = messages[index]; + if (!message) { + continue; + } + const nextSlice = formatTranscriptMessage(message); + const nextCost = nextSlice.length + 2; + if (usedCount > 0 && usedChars + nextCost > MAX_HANDOFF_TRANSCRIPT_CHARS) { + break; + } + slices.unshift(nextSlice); + usedChars += nextCost; + usedCount += 1; + if (usedCount >= MAX_HANDOFF_MESSAGES || usedChars >= MAX_HANDOFF_TRANSCRIPT_CHARS) { + break; + } + } + + const truncated = slices.length < messages.length; + return [truncated ? "[Earlier conversation omitted for brevity]\n" : null, slices.join("\n\n")] + .filter((value): value is string => typeof value === "string" && value.length > 0) + .join(""); +} + +export function buildProviderHandoffPrompt(input: { + readonly messagesBeforeCurrent: ReadonlyArray; + readonly previousProvider: ProviderKind; + readonly nextProvider: ProviderKind; + readonly previousModel: string | undefined; + readonly nextModel: string | undefined; + readonly latestUserText: string; + readonly latestAttachments?: ReadonlyArray; +}): string | undefined { + if (input.messagesBeforeCurrent.length === 0) { + return undefined; + } + + const transcript = buildTranscript(input.messagesBeforeCurrent); + if (!transcript) { + return undefined; + } + + const previousModelLabel = resolveModelDisplayName(input.previousModel, input.previousProvider); + const nextModelLabel = resolveModelDisplayName(input.nextModel, input.nextProvider); + const latestAttachmentSummary = formatAttachmentSummary(input.latestAttachments); + const latestUserText = + input.latestUserText.trim().length > 0 ? input.latestUserText.trim() : "[No text content]"; + + return [ + `You are continuing an existing conversation after switching from ${previousModelLabel || input.previousProvider} to ${nextModelLabel || input.nextProvider}.`, + "Use the transcript below as prior context and continue naturally. Do not mention the model handoff unless the user asks about it.", + "", + "Conversation transcript:", + transcript, + "", + "Latest user message:", + latestUserText, + latestAttachmentSummary ? `[${latestAttachmentSummary}]` : null, + ] + .filter((value): value is string => typeof value === "string") + .join("\n"); +} + +export function getMessagesBeforeMessage( + messages: ReadonlyArray, + messageId: MessageId, +): ReadonlyArray { + const targetIndex = messages.findIndex((message) => message.id === messageId); + return targetIndex <= 0 ? [] : messages.slice(0, targetIndex); +} diff --git a/apps/server/src/serverLayers.ts b/apps/server/src/serverLayers.ts index 7250f8566c..f787e38c03 100644 --- a/apps/server/src/serverLayers.ts +++ b/apps/server/src/serverLayers.ts @@ -10,6 +10,7 @@ import { OrchestrationEventStoreLive } from "./persistence/Layers/OrchestrationE import { ProviderSessionRuntimeRepositoryLive } from "./persistence/Layers/ProviderSessionRuntime"; import { OrchestrationEngineLive } from "./orchestration/Layers/OrchestrationEngine"; import { CheckpointReactorLive } from "./orchestration/Layers/CheckpointReactor"; +import { ModelChangeReactorLive } from "./orchestration/Layers/ModelChangeReactor"; import { OrchestrationReactorLive } from "./orchestration/Layers/OrchestrationReactor"; import { ProviderCommandReactorLive } from "./orchestration/Layers/ProviderCommandReactor"; import { OrchestrationProjectionPipelineLive } from "./orchestration/Layers/ProjectionPipeline"; @@ -102,10 +103,14 @@ export function makeServerRuntimeServicesLayer() { const checkpointReactorLayer = CheckpointReactorLive.pipe( Layer.provideMerge(runtimeServicesLayer), ); + const modelChangeReactorLayer = ModelChangeReactorLive.pipe( + Layer.provideMerge(runtimeServicesLayer), + ); const orchestrationReactorLayer = OrchestrationReactorLive.pipe( Layer.provideMerge(runtimeIngestionLayer), Layer.provideMerge(providerCommandReactorLayer), Layer.provideMerge(checkpointReactorLayer), + Layer.provideMerge(modelChangeReactorLayer), ); const terminalLayer = TerminalManagerLive.pipe( diff --git a/apps/web/public/mockServiceWorker.js b/apps/web/public/mockServiceWorker.js index daa58d0f12..8fa9dca80e 100644 --- a/apps/web/public/mockServiceWorker.js +++ b/apps/web/public/mockServiceWorker.js @@ -7,7 +7,7 @@ * - Please do NOT modify this file. */ -const PACKAGE_VERSION = '2.12.10' +const PACKAGE_VERSION = '2.12.11' const INTEGRITY_CHECKSUM = '4db4a41e972cec1b64cc569c66952d82' const IS_MOCKED_RESPONSE = Symbol('isMockedResponse') const activeClientIds = new Set() diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index e628f6ea6a..ede1ecd744 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -24,6 +24,7 @@ import { import { applyClaudePromptEffortPrefix, getDefaultModel, + inferProviderForModel, normalizeModelSlug, resolveModelSlugForProvider, } from "@t3tools/shared/model"; @@ -49,6 +50,7 @@ import { derivePendingApprovals, derivePendingUserInputs, derivePhase, + deriveModelChangeNotices, deriveTimelineEntries, deriveActiveWorkStartedAt, deriveActivePlanState, @@ -583,16 +585,14 @@ export default function ChatView({ threadId }: ChatViewProps) { const sessionProvider = activeThread?.session?.provider ?? null; const selectedProviderByThreadId = composerDraft.provider; - const hasThreadStarted = Boolean( - activeThread && - (activeThread.latestTurn !== null || - activeThread.messages.length > 0 || - activeThread.session !== null), - ); - const lockedProvider: ProviderKind | null = hasThreadStarted - ? (sessionProvider ?? selectedProviderByThreadId ?? null) - : null; - const selectedProvider: ProviderKind = lockedProvider ?? selectedProviderByThreadId ?? "codex"; + const inferredThreadProvider = activeThread?.model + ? inferProviderForModel(activeThread.model) + : activeProject?.model + ? inferProviderForModel(activeProject.model) + : null; + const lockedProvider: ProviderKind | null = null; + const selectedProvider: ProviderKind = + selectedProviderByThreadId ?? sessionProvider ?? inferredThreadProvider ?? "codex"; const baseThreadModel = resolveModelSlugForProvider( selectedProvider, activeThread?.model ?? activeProject?.model ?? getDefaultModel(selectedProvider), @@ -642,9 +642,7 @@ export default function ChatView({ threadId }: ChatViewProps) { }, [modelOptionsByProvider, selectedModelForPicker, selectedProvider]); const searchableModelOptions = useMemo( () => - AVAILABLE_PROVIDER_OPTIONS.filter( - (option) => lockedProvider === null || option.value === lockedProvider, - ).flatMap((option) => + AVAILABLE_PROVIDER_OPTIONS.flatMap((option) => modelOptionsByProvider[option.value].map(({ slug, name }) => ({ provider: option.value, providerLabel: option.label, @@ -655,7 +653,7 @@ export default function ChatView({ threadId }: ChatViewProps) { searchProvider: option.label.toLowerCase(), })), ), - [lockedProvider, modelOptionsByProvider], + [modelOptionsByProvider], ); const phase = derivePhase(activeThread?.session ?? null); const isSendBusy = sendPhase !== "idle"; @@ -672,6 +670,10 @@ export default function ChatView({ threadId }: ChatViewProps) { () => deriveWorkLogEntries(threadActivities, activeLatestTurn?.turnId ?? undefined), [activeLatestTurn?.turnId, threadActivities], ); + const modelChangeNotices = useMemo( + () => deriveModelChangeNotices(threadActivities), + [threadActivities], + ); const latestTurnHasToolActivity = useMemo( () => hasToolActivityForTurn(threadActivities, activeLatestTurn?.turnId), [activeLatestTurn?.turnId, threadActivities], @@ -912,8 +914,13 @@ export default function ChatView({ threadId }: ChatViewProps) { }, [serverMessages, attachmentPreviewHandoffByMessageId, optimisticUserMessages]); const timelineEntries = useMemo( () => - deriveTimelineEntries(timelineMessages, activeThread?.proposedPlans ?? [], workLogEntries), - [activeThread?.proposedPlans, timelineMessages, workLogEntries], + deriveTimelineEntries( + timelineMessages, + activeThread?.proposedPlans ?? [], + modelChangeNotices, + workLogEntries, + ), + [activeThread?.proposedPlans, modelChangeNotices, timelineMessages, workLogEntries], ); const { turnDiffSummaries, inferredCheckpointTurnCountByTurnId } = useTurnDiffSummaries(activeThread); @@ -1604,10 +1611,11 @@ export default function ChatView({ threadId }: ChatViewProps) { if (input.model !== undefined && input.model !== serverThread.model) { await api.orchestration.dispatchCommand({ - type: "thread.meta.update", + type: "thread.model.set", commandId: newCommandId(), threadId: input.threadId, model: input.model, + source: "client", }); } @@ -3092,10 +3100,6 @@ export default function ChatView({ threadId }: ChatViewProps) { const onProviderModelSelect = useCallback( (provider: ProviderKind, model: ModelSlug) => { if (!activeThread) return; - if (lockedProvider !== null && provider !== lockedProvider) { - scheduleComposerFocus(); - return; - } const resolvedModel = resolveAppModelSelection(provider, customModelsByProvider, model); setComposerDraftProvider(activeThread.id, provider); setComposerDraftModel(activeThread.id, resolvedModel); @@ -3104,7 +3108,6 @@ export default function ChatView({ threadId }: ChatViewProps) { }, [ activeThread, - lockedProvider, scheduleComposerFocus, setComposerDraftModel, setComposerDraftProvider, diff --git a/apps/web/src/components/chat/MessagesTimeline.test.tsx b/apps/web/src/components/chat/MessagesTimeline.test.tsx index e694faa0f2..716d6922ae 100644 --- a/apps/web/src/components/chat/MessagesTimeline.test.tsx +++ b/apps/web/src/components/chat/MessagesTimeline.test.tsx @@ -96,4 +96,55 @@ describe("MessagesTimeline", () => { expect(markup).toContain("lucide-terminal"); expect(markup).toContain("yoo what's "); }); + + it("renders model change notices inline outside the work log", async () => { + const { MessagesTimeline } = await import("./MessagesTimeline"); + const markup = renderToStaticMarkup( + {}} + onOpenTurnDiff={() => {}} + revertTurnCountByUserMessageId={new Map()} + onRevertUserMessage={() => {}} + isRevertingCheckpoint={false} + onImageExpand={() => {}} + markdownCwd={undefined} + resolvedTheme="light" + timestampFormat="locale" + workspaceRoot={undefined} + />, + ); + + expect(markup).toContain("Rerouted to GPT-5.4"); + expect(markup).toContain("from GPT-5.3 Codex"); + expect(markup).toContain("capacity"); + expect(markup).not.toContain("Work log"); + }); }); diff --git a/apps/web/src/components/chat/MessagesTimeline.tsx b/apps/web/src/components/chat/MessagesTimeline.tsx index f3e462f7fe..595912d337 100644 --- a/apps/web/src/components/chat/MessagesTimeline.tsx +++ b/apps/web/src/components/chat/MessagesTimeline.tsx @@ -175,6 +175,16 @@ export const MessagesTimeline = memo(function MessagesTimeline({ continue; } + if (timelineEntry.kind === "notice") { + nextRows.push({ + kind: "notice", + id: timelineEntry.id, + createdAt: timelineEntry.createdAt, + notice: timelineEntry.notice, + }); + continue; + } + nextRows.push({ kind: "message", id: timelineEntry.id, @@ -252,6 +262,7 @@ export const MessagesTimeline = memo(function MessagesTimeline({ if (!row) return 96; if (row.kind === "work") return 112; if (row.kind === "proposed-plan") return estimateTimelineProposedPlanHeight(row.proposedPlan); + if (row.kind === "notice") return 52; if (row.kind === "working") return 40; return estimateTimelineMessageHeight(row.message, { timelineWidthPx }); }, @@ -534,6 +545,38 @@ export const MessagesTimeline = memo(function MessagesTimeline({ )} + {row.kind === "notice" && ( +
+
+ +
+ + + + + + {row.notice.source === "provider-reroute" + ? `Rerouted to ${row.notice.toModelLabel}` + : `Switched to ${row.notice.toModelLabel}`} + + {(row.notice.fromModel !== row.notice.toModel || row.notice.reason) && ( + + {row.notice.fromModel !== row.notice.toModel + ? `from ${row.notice.fromModelLabel}` + : null} + {row.notice.fromModel !== row.notice.toModel && row.notice.reason + ? " • " + : null} + {row.notice.reason ? row.notice.reason : null} + + )} + +
+ +
+
+ )} + {row.kind === "working" && (
@@ -600,6 +643,7 @@ export const MessagesTimeline = memo(function MessagesTimeline({ type TimelineEntry = ReturnType[number]; type TimelineMessage = Extract["message"]; type TimelineProposedPlan = Extract["proposedPlan"]; +type TimelineNotice = Extract["notice"]; type TimelineWorkEntry = Extract["entry"]; type TimelineRow = | { @@ -622,6 +666,12 @@ type TimelineRow = createdAt: string; proposedPlan: TimelineProposedPlan; } + | { + kind: "notice"; + id: string; + createdAt: string; + notice: TimelineNotice; + } | { kind: "working"; id: string; createdAt: string | null }; function estimateTimelineProposedPlanHeight(proposedPlan: TimelineProposedPlan): number { diff --git a/apps/web/src/session-logic.test.ts b/apps/web/src/session-logic.test.ts index 4a113adebe..e88c44575f 100644 --- a/apps/web/src/session-logic.test.ts +++ b/apps/web/src/session-logic.test.ts @@ -10,6 +10,7 @@ import { describe, expect, it } from "vitest"; import { deriveActiveWorkStartedAt, deriveActivePlanState, + deriveModelChangeNotices, PROVIDER_OPTIONS, derivePendingApprovals, derivePendingUserInputs, @@ -922,7 +923,7 @@ describe("deriveWorkLogEntries", () => { }); describe("deriveTimelineEntries", () => { - it("includes proposed plans alongside messages and work entries in chronological order", () => { + it("includes proposed plans, notices, and work entries in chronological order", () => { const entries = deriveTimelineEntries( [ { @@ -944,6 +945,18 @@ describe("deriveTimelineEntries", () => { updatedAt: "2026-02-23T00:00:02.000Z", }, ], + [ + { + id: "notice-1", + createdAt: "2026-02-23T00:00:02.500Z", + noticeType: "model-change", + fromModel: "gpt-5.3-codex", + toModel: "gpt-5.4", + fromModelLabel: "GPT-5.3 Codex", + toModelLabel: "GPT-5.4", + source: "user", + }, + ], [ { id: "work-1", @@ -954,7 +967,12 @@ describe("deriveTimelineEntries", () => { ], ); - expect(entries.map((entry) => entry.kind)).toEqual(["message", "proposed-plan", "work"]); + expect(entries.map((entry) => entry.kind)).toEqual([ + "message", + "proposed-plan", + "notice", + "work", + ]); expect(entries[1]).toMatchObject({ kind: "proposed-plan", proposedPlan: { @@ -966,6 +984,88 @@ describe("deriveTimelineEntries", () => { }); }); +describe("deriveModelChangeNotices", () => { + it("returns historical model-change notices in activity order", () => { + const notices = deriveModelChangeNotices([ + makeActivity({ + id: "notice-2", + createdAt: "2026-02-23T00:00:02.000Z", + kind: "thread.model.changed", + summary: "Model changed", + tone: "info", + payload: { + fromModel: "gpt-5.3-codex", + toModel: "gpt-5.4", + source: "user", + }, + }), + makeActivity({ + id: "tool-1", + createdAt: "2026-02-23T00:00:01.000Z", + kind: "tool.completed", + summary: "Ran command", + }), + makeActivity({ + id: "notice-3", + createdAt: "2026-02-23T00:00:03.000Z", + kind: "thread.model.changed", + summary: "Model changed", + tone: "info", + payload: { + fromModel: "gpt-5.4", + toModel: "gpt-5.4-mini", + source: "provider-reroute", + reason: "capacity", + }, + }), + ]); + + expect(notices).toEqual([ + { + id: "notice-2", + createdAt: "2026-02-23T00:00:02.000Z", + noticeType: "model-change", + fromModel: "gpt-5.3-codex", + toModel: "gpt-5.4", + fromModelLabel: "GPT-5.3 Codex", + toModelLabel: "GPT-5.4", + source: "user", + }, + { + id: "notice-3", + createdAt: "2026-02-23T00:00:03.000Z", + noticeType: "model-change", + fromModel: "gpt-5.4", + toModel: "gpt-5.4-mini", + fromModelLabel: "GPT-5.4", + toModelLabel: "GPT-5.4 Mini", + source: "provider-reroute", + reason: "capacity", + }, + ]); + }); + + it("keeps model-change notices out of the work log", () => { + const activities = [ + makeActivity({ + id: "notice-4", + createdAt: "2026-02-23T00:00:02.000Z", + kind: "thread.model.changed", + summary: "Model changed", + tone: "info", + payload: { + fromModel: "gpt-5.4", + toModel: "gpt-5.4-mini", + source: "provider-reroute", + }, + }), + ]; + + expect(deriveModelChangeNotices(activities)).toHaveLength(1); + expect(deriveWorkLogEntries(activities, undefined)).toEqual([]); + }); +}); + describe("hasToolActivityForTurn", () => { it("returns false when turn id is missing", () => { const activities: OrchestrationThreadActivity[] = [ diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index 7c3ea96e65..8dc2477af3 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -10,6 +10,7 @@ import { type ThreadId, type TurnId, } from "@t3tools/contracts"; +import { inferProviderForModel, resolveModelDisplayName } from "@t3tools/shared/model"; import type { ChatMessage, @@ -45,6 +46,18 @@ export interface WorkLogEntry { requestKind?: PendingApproval["requestKind"]; } +export interface ModelChangeNotice { + id: string; + createdAt: string; + noticeType: "model-change"; + fromModel: string; + toModel: string; + fromModelLabel: string; + toModelLabel: string; + source: "user" | "provider-reroute"; + reason?: string; +} + interface DerivedWorkLogEntry extends WorkLogEntry { activityKind: OrchestrationThreadActivity["kind"]; collapseKey?: string; @@ -96,6 +109,12 @@ export type TimelineEntry = createdAt: string; proposedPlan: ProposedPlan; } + | { + id: string; + kind: "notice"; + createdAt: string; + notice: ModelChangeNotice; + } | { id: string; kind: "work"; @@ -464,6 +483,7 @@ export function deriveWorkLogEntries( .filter((activity) => activity.kind !== "tool.started") .filter((activity) => activity.kind !== "task.started" && activity.kind !== "task.completed") .filter((activity) => activity.summary !== "Checkpoint captured") + .filter((activity) => activity.kind !== "thread.model.changed") .filter((activity) => !isPlanBoundaryToolActivity(activity)) .map(toDerivedWorkLogEntry); return collapseDerivedWorkLogEntries(entries).map( @@ -471,6 +491,40 @@ export function deriveWorkLogEntries( ); } +export function deriveModelChangeNotices( + activities: ReadonlyArray, +): ModelChangeNotice[] { + return [...activities].toSorted(compareActivitiesByOrder).flatMap((activity) => { + if (activity.kind !== "thread.model.changed") { + return []; + } + const payload = + activity.payload && typeof activity.payload === "object" + ? (activity.payload as Record) + : null; + const fromModel = typeof payload?.fromModel === "string" ? payload.fromModel : null; + const toModel = typeof payload?.toModel === "string" ? payload.toModel : null; + const source = + payload?.source === "user" || payload?.source === "provider-reroute" ? payload.source : null; + if (!fromModel || !toModel || !source) { + return []; + } + return [ + { + id: activity.id, + createdAt: activity.createdAt, + noticeType: "model-change" as const, + fromModel, + toModel, + fromModelLabel: resolveModelDisplayName(fromModel, inferProviderForModel(fromModel)), + toModelLabel: resolveModelDisplayName(toModel, inferProviderForModel(toModel)), + source, + ...(typeof payload?.reason === "string" ? { reason: payload.reason } : {}), + }, + ]; + }); +} + function isPlanBoundaryToolActivity(activity: OrchestrationThreadActivity): boolean { if (activity.kind !== "tool.updated" && activity.kind !== "tool.completed") { return false; @@ -826,6 +880,7 @@ export function hasToolActivityForTurn( export function deriveTimelineEntries( messages: ChatMessage[], proposedPlans: ProposedPlan[], + notices: ModelChangeNotice[], workEntries: WorkLogEntry[], ): TimelineEntry[] { const messageRows: TimelineEntry[] = messages.map((message) => ({ @@ -840,13 +895,19 @@ export function deriveTimelineEntries( createdAt: proposedPlan.createdAt, proposedPlan, })); + const noticeRows: TimelineEntry[] = notices.map((notice) => ({ + id: notice.id, + kind: "notice", + createdAt: notice.createdAt, + notice, + })); const workRows: TimelineEntry[] = workEntries.map((entry) => ({ id: entry.id, kind: "work", createdAt: entry.createdAt, entry, })); - return [...messageRows, ...proposedPlanRows, ...workRows].toSorted((a, b) => + return [...messageRows, ...proposedPlanRows, ...noticeRows, ...workRows].toSorted((a, b) => a.createdAt.localeCompare(b.createdAt), ); } diff --git a/packages/contracts/src/orchestration.test.ts b/packages/contracts/src/orchestration.test.ts index 6fdff4e886..711129f7e5 100644 --- a/packages/contracts/src/orchestration.test.ts +++ b/packages/contracts/src/orchestration.test.ts @@ -10,6 +10,9 @@ import { OrchestrationProposedPlan, OrchestrationSession, ProjectCreateCommand, + ThreadModelChangedActivityPayload, + ThreadModelSetCommand, + ThreadModelSetPayload, ThreadTurnStartCommand, ThreadCreatedPayload, ThreadTurnDiff, @@ -23,6 +26,11 @@ const decodeThreadTurnStartCommand = Schema.decodeUnknownEffect(ThreadTurnStartC const decodeThreadTurnStartRequestedPayload = Schema.decodeUnknownEffect( ThreadTurnStartRequestedPayload, ); +const decodeThreadModelSetCommand = Schema.decodeUnknownEffect(ThreadModelSetCommand); +const decodeThreadModelSetPayload = Schema.decodeUnknownEffect(ThreadModelSetPayload); +const decodeThreadModelChangedActivityPayload = Schema.decodeUnknownEffect( + ThreadModelChangedActivityPayload, +); const decodeOrchestrationLatestTurn = Schema.decodeUnknownEffect(OrchestrationLatestTurn); const decodeOrchestrationProposedPlan = Schema.decodeUnknownEffect(OrchestrationProposedPlan); const decodeOrchestrationSession = Schema.decodeUnknownEffect(OrchestrationSession); @@ -215,6 +223,56 @@ it.effect("accepts a source proposed plan reference in thread.turn.start", () => }), ); +it.effect("decodes thread.model.set with source and optional reason", () => + Effect.gen(function* () { + const parsed = yield* decodeThreadModelSetCommand({ + type: "thread.model.set", + commandId: "cmd-model-set", + threadId: "thread-1", + model: "gpt-5.4", + source: "client", + reason: "manual switch", + }); + assert.strictEqual(parsed.model, "gpt-5.4"); + assert.strictEqual(parsed.source, "client"); + assert.strictEqual(parsed.reason, "manual switch"); + }), +); + +it.effect("decodes thread.model-set payload with previous and next model", () => + Effect.gen(function* () { + const parsed = yield* decodeThreadModelSetPayload({ + threadId: "thread-1", + model: "gpt-5.4", + previousModel: "gpt-5.3-codex", + source: "provider-reroute", + reason: "capacity", + updatedAt: "2026-01-01T00:00:00.000Z", + }); + assert.strictEqual(parsed.previousModel, "gpt-5.3-codex"); + assert.strictEqual(parsed.model, "gpt-5.4"); + assert.strictEqual(parsed.source, "provider-reroute"); + }), +); + +it.effect("decodes thread model changed activity payload for user and reroute notices", () => + Effect.gen(function* () { + const user = yield* decodeThreadModelChangedActivityPayload({ + fromModel: "gpt-5.3-codex", + toModel: "gpt-5.4", + source: "user", + }); + const reroute = yield* decodeThreadModelChangedActivityPayload({ + fromModel: "gpt-5.4", + toModel: "gpt-5.4-mini", + source: "provider-reroute", + reason: "capacity", + }); + assert.strictEqual(user.source, "user"); + assert.strictEqual(reroute.reason, "capacity"); + }), +); + it.effect( "decodes thread.turn-start-requested defaults for provider, runtime mode, and interaction mode", () => diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 3208adc8bb..2305d33e01 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -250,6 +250,21 @@ export const OrchestrationThreadActivity = Schema.Struct({ }); export type OrchestrationThreadActivity = typeof OrchestrationThreadActivity.Type; +export const ThreadModelSetSource = Schema.Literals(["client", "provider-reroute"]); +export type ThreadModelSetSource = typeof ThreadModelSetSource.Type; + +export const ThreadModelChangedNoticeSource = Schema.Literals(["user", "provider-reroute"]); +export type ThreadModelChangedNoticeSource = typeof ThreadModelChangedNoticeSource.Type; + +export const ThreadModelChangedActivityPayload = Schema.Struct({ + fromModel: TrimmedNonEmptyString, + toModel: TrimmedNonEmptyString, + source: ThreadModelChangedNoticeSource, + provider: Schema.optional(ProviderKind), + reason: Schema.optional(TrimmedNonEmptyString), +}); +export type ThreadModelChangedActivityPayload = typeof ThreadModelChangedActivityPayload.Type; + const OrchestrationLatestTurnState = Schema.Literals([ "running", "interrupted", @@ -358,6 +373,16 @@ const ThreadMetaUpdateCommand = Schema.Struct({ worktreePath: Schema.optional(Schema.NullOr(TrimmedNonEmptyString)), }); +export const ThreadModelSetCommand = Schema.Struct({ + type: Schema.Literal("thread.model.set"), + commandId: CommandId, + threadId: ThreadId, + model: TrimmedNonEmptyString, + source: ThreadModelSetSource, + reason: Schema.optional(TrimmedNonEmptyString), +}); +export type ThreadModelSetCommand = typeof ThreadModelSetCommand.Type; + const ThreadRuntimeModeSetCommand = Schema.Struct({ type: Schema.Literal("thread.runtime-mode.set"), commandId: CommandId, @@ -466,6 +491,7 @@ const DispatchableClientOrchestrationCommand = Schema.Union([ ThreadCreateCommand, ThreadDeleteCommand, ThreadMetaUpdateCommand, + ThreadModelSetCommand, ThreadRuntimeModeSetCommand, ThreadInteractionModeSetCommand, ThreadTurnStartCommand, @@ -485,6 +511,7 @@ export const ClientOrchestrationCommand = Schema.Union([ ThreadCreateCommand, ThreadDeleteCommand, ThreadMetaUpdateCommand, + ThreadModelSetCommand, ThreadRuntimeModeSetCommand, ThreadInteractionModeSetCommand, ClientThreadTurnStartCommand, @@ -585,6 +612,7 @@ export const OrchestrationEventType = Schema.Literals([ "thread.created", "thread.deleted", "thread.meta-updated", + "thread.model-set", "thread.runtime-mode-set", "thread.interaction-mode-set", "thread.message-sent", @@ -659,6 +687,16 @@ export const ThreadMetaUpdatedPayload = Schema.Struct({ updatedAt: IsoDateTime, }); +export const ThreadModelSetPayload = Schema.Struct({ + threadId: ThreadId, + model: TrimmedNonEmptyString, + previousModel: TrimmedNonEmptyString, + source: ThreadModelSetSource, + reason: Schema.optional(TrimmedNonEmptyString), + updatedAt: IsoDateTime, +}); +export type ThreadModelSetPayload = typeof ThreadModelSetPayload.Type; + export const ThreadRuntimeModeSetPayload = Schema.Struct({ threadId: ThreadId, runtimeMode: RuntimeMode, @@ -815,6 +853,11 @@ export const OrchestrationEvent = Schema.Union([ type: Schema.Literal("thread.meta-updated"), payload: ThreadMetaUpdatedPayload, }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.model-set"), + payload: ThreadModelSetPayload, + }), Schema.Struct({ ...EventBaseFields, type: Schema.Literal("thread.runtime-mode-set"), diff --git a/packages/shared/src/model.test.ts b/packages/shared/src/model.test.ts index 2c8aaf1986..fbacd6496b 100644 --- a/packages/shared/src/model.test.ts +++ b/packages/shared/src/model.test.ts @@ -21,6 +21,7 @@ import { normalizeCodexModelOptions, normalizeModelSlug, resolveReasoningEffortForProvider, + resolveModelDisplayName, resolveSelectableModel, resolveModelSlug, resolveModelSlugForProvider, @@ -205,6 +206,23 @@ describe("inferProviderForModel", () => { }); }); +describe("resolveModelDisplayName", () => { + it("returns built-in model names for known slugs and aliases", () => { + expect(resolveModelDisplayName("gpt-5.3-codex")).toBe("GPT-5.3 Codex"); + expect(resolveModelDisplayName("sonnet", "claudeAgent")).toBe("Claude Sonnet 4.6"); + }); + + it("humanizes unknown model slugs", () => { + expect(resolveModelDisplayName("claude/custom-opus")).toBe("Claude Custom Opus"); + expect(resolveModelDisplayName("custom/internal-model")).toBe("Custom Internal Model"); + }); + + it("returns an empty string for blank input", () => { + expect(resolveModelDisplayName("")).toBe(""); + expect(resolveModelDisplayName(undefined)).toBe(""); + }); +}); + describe("getDefaultReasoningEffort", () => { it("returns provider-scoped defaults", () => { expect(getDefaultReasoningEffort("codex")).toBe(DEFAULT_REASONING_EFFORT_BY_PROVIDER.codex); diff --git a/packages/shared/src/model.ts b/packages/shared/src/model.ts index 2d46320753..034ab6c265 100644 --- a/packages/shared/src/model.ts +++ b/packages/shared/src/model.ts @@ -153,6 +153,49 @@ export function inferProviderForModel( return typeof model === "string" && model.trim().startsWith("claude-") ? "claudeAgent" : fallback; } +function humanizeModelSlug(slug: string): string { + return slug + .trim() + .split(/[-_/]+/g) + .filter((part) => part.length > 0) + .map((part) => { + const upper = part.toUpperCase(); + if (upper === "GPT") { + return upper; + } + if (/^\d+(\.\d+)?$/.test(part)) { + return part; + } + return part.charAt(0).toUpperCase() + part.slice(1); + }) + .join(" "); +} + +export function resolveModelDisplayName( + model: string | null | undefined, + provider?: ProviderKind, +): string { + if (typeof model !== "string") { + return ""; + } + + const trimmed = model.trim(); + if (!trimmed) { + return ""; + } + + const effectiveProvider = provider ?? inferProviderForModel(trimmed); + const normalized = normalizeModelSlug(trimmed, effectiveProvider) ?? trimmed; + const builtIn = MODEL_OPTIONS_BY_PROVIDER[effectiveProvider].find( + (option) => option.slug === normalized, + ); + if (builtIn) { + return builtIn.name; + } + + return humanizeModelSlug(trimmed); +} + export function getReasoningEffortOptions(provider: "codex"): ReadonlyArray; export function getReasoningEffortOptions( provider: "claudeAgent", diff --git a/packages/shared/src/shell.test.ts b/packages/shared/src/shell.test.ts index e2393eefff..95ec769882 100644 --- a/packages/shared/src/shell.test.ts +++ b/packages/shared/src/shell.test.ts @@ -126,3 +126,75 @@ describe("readEnvironmentFromLoginShell", () => { }); }); }); + +describe("nushell support", () => { + it("uses nushell-native flags and command for nu shell", () => { + const execFile = vi.fn< + ( + file: string, + args: ReadonlyArray, + options: { encoding: "utf8"; timeout: number }, + ) => string + >(() => "__T3CODE_ENV_PATH_START__\n/a:/b\n__T3CODE_ENV_PATH_END__\n"); + + expect(readPathFromLoginShell("/opt/homebrew/bin/nu", execFile)).toBe("/a:/b"); + + const [shell, args] = execFile.mock.calls[0]!; + expect(shell).toBe("/opt/homebrew/bin/nu"); + // Nushell must get separate -l and -c flags (not bundled -ilc) + expect(args).toHaveLength(3); + expect(args?.[0]).toBe("-l"); + expect(args?.[1]).toBe("-c"); + // Command should use nushell syntax, not POSIX + const command = args?.[2] as string; + expect(command).toContain("$env.PATH | str join"); + expect(command).not.toContain("printenv"); + expect(command).not.toContain("||"); + }); + + it("uses nushell syntax for multiple env vars", () => { + const execFile = vi.fn< + ( + file: string, + args: ReadonlyArray, + options: { encoding: "utf8"; timeout: number }, + ) => string + >(() => + [ + "__T3CODE_ENV_PATH_START__", + "/a:/b", + "__T3CODE_ENV_PATH_END__", + "__T3CODE_ENV_SSH_AUTH_SOCK_START__", + "/tmp/agent.sock", + "__T3CODE_ENV_SSH_AUTH_SOCK_END__", + ].join("\n"), + ); + + expect( + readEnvironmentFromLoginShell("/opt/homebrew/bin/nu", ["PATH", "SSH_AUTH_SOCK"], execFile), + ).toEqual({ + PATH: "/a:/b", + SSH_AUTH_SOCK: "/tmp/agent.sock", + }); + + const command = execFile.mock.calls[0]![1]?.[2] as string; + // Non-PATH vars should use try/catch + expect(command).toContain("try { print $env.SSH_AUTH_SOCK } catch { }"); + }); + + it("uses POSIX flags for non-nushell shells", () => { + const execFile = vi.fn< + ( + file: string, + args: ReadonlyArray, + options: { encoding: "utf8"; timeout: number }, + ) => string + >(() => "__T3CODE_ENV_PATH_START__\n/a:/b\n__T3CODE_ENV_PATH_END__\n"); + + readPathFromLoginShell("/bin/bash", execFile); + + const args = execFile.mock.calls[0]![1]; + expect(args).toHaveLength(2); + expect(args?.[0]).toBe("-ilc"); + }); +}); diff --git a/packages/shared/src/shell.ts b/packages/shared/src/shell.ts index f1d60bf334..f4c5490ddf 100644 --- a/packages/shared/src/shell.ts +++ b/packages/shared/src/shell.ts @@ -37,6 +37,11 @@ function envCaptureEnd(name: string): string { return `__T3CODE_ENV_${name}_END__`; } +function isNushell(shell: string): boolean { + const base = shell.split("/").pop() ?? ""; + return base === "nu" || base.startsWith("nu."); +} + function buildEnvironmentCaptureCommand(names: ReadonlyArray): string { return names .map((name) => { @@ -53,6 +58,28 @@ function buildEnvironmentCaptureCommand(names: ReadonlyArray): string { .join("; "); } +/** + * Build a nushell-native command that captures environment variables using + * the same marker protocol as the POSIX variant. Nushell stores PATH as a + * list, so we join it with ":" to match the expected format. For other + * variables we use `try { print $env.X } catch { }` because nushell + * throws on missing env vars. + */ +function buildNushellEnvironmentCaptureCommand(names: ReadonlyArray): string { + return names + .map((name) => { + const start = envCaptureStart(name); + const end = envCaptureEnd(name); + + if (name === "PATH") { + return `print "${start}"; print ($env.PATH | str join ":"); print "${end}"`; + } + + return `print "${start}"; try { print $env.${name} } catch { }; print "${end}"`; + }) + .join("; "); +} + function extractEnvironmentValue(output: string, name: string): string | undefined { const startMarker = envCaptureStart(name); const endMarker = envCaptureEnd(name); @@ -89,7 +116,15 @@ export const readEnvironmentFromLoginShell: ShellEnvironmentReader = ( return {}; } - const output = execFile(shell, ["-ilc", buildEnvironmentCaptureCommand(names)], { + // Nushell doesn't support bundled flags (-ilc) or POSIX syntax (||, printf). + // Use separate flags and a nu-native capture command instead. + const nu = isNushell(shell); + const command = nu + ? buildNushellEnvironmentCaptureCommand(names) + : buildEnvironmentCaptureCommand(names); + const args = nu ? ["-l", "-c", command] : ["-ilc", command]; + + const output = execFile(shell, args, { encoding: "utf8", timeout: 5000, }); From ec94dd781ec43f3c84432af6fd50c52bf985e8a1 Mon Sep 17 00:00:00 2001 From: Michael Brown Date: Sat, 21 Mar 2026 22:09:57 -0400 Subject: [PATCH 2/3] Preserve source model on provider handoff - Thread the handoff source model through orchestration and contracts - Use it when building reroute prompts so model-switch messages stay accurate - Add coverage for handoff prompt truncation and model switch cases --- .../Layers/ProviderCommandReactor.test.ts | 86 +++++++++++++++++++ .../Layers/ProviderCommandReactor.ts | 8 +- apps/server/src/orchestration/decider.ts | 3 + .../src/orchestration/providerHandoff.test.ts | 40 +++++++++ .../src/orchestration/providerHandoff.ts | 30 ++++++- apps/web/src/components/ChatView.tsx | 2 + packages/contracts/src/orchestration.test.ts | 25 ++++++ packages/contracts/src/orchestration.ts | 3 + 8 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 apps/server/src/orchestration/providerHandoff.test.ts diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 68b58df928..f49e8a35ff 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -858,6 +858,78 @@ describe("ProviderCommandReactor", () => { }); }); + it("uses the handoff source model after an in-session reroute", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-reroute-source-1"), + threadId: ThreadId.makeUnsafe("thread-1"), + message: { + messageId: asMessageId("user-message-reroute-source-1"), + role: "user", + text: "first", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length === 1); + await waitFor(() => harness.sendTurn.mock.calls.length === 1); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-model-rerouted-before-switch"), + threadId: ThreadId.makeUnsafe("thread-1"), + model: "gpt-5.4", + source: "provider-reroute", + reason: "capacity", + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-model-client-before-switch"), + threadId: ThreadId.makeUnsafe("thread-1"), + model: "claude-sonnet-4-6", + source: "client", + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-reroute-source-2"), + threadId: ThreadId.makeUnsafe("thread-1"), + message: { + messageId: asMessageId("user-message-reroute-source-2"), + role: "user", + text: "second", + attachments: [], + }, + provider: "claudeAgent", + model: "claude-sonnet-4-6", + handoffSourceModel: "gpt-5.4", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length === 2); + await waitFor(() => harness.sendTurn.mock.calls.length === 2); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("switching from GPT-5.4 to Claude Sonnet 4.6"), + }); + }); + it("preserves handoff history when switching providers after the previous session stopped", async () => { const harness = await createHarness(); const now = new Date().toISOString(); @@ -900,6 +972,16 @@ describe("ProviderCommandReactor", () => { expect(stoppedThread?.session?.status).toBe("stopped"); expect(stoppedThread?.session?.providerName).toBe("codex"); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.model.set", + commandId: CommandId.makeUnsafe("cmd-model-set-before-stopped-provider-switch"), + threadId: ThreadId.makeUnsafe("thread-1"), + model: "claude-sonnet-4-6", + source: "client", + }), + ); + await Effect.runPromise( harness.engine.dispatch({ type: "thread.turn.start", @@ -913,6 +995,7 @@ describe("ProviderCommandReactor", () => { }, provider: "claudeAgent", model: "claude-sonnet-4-6", + handoffSourceModel: "gpt-5-codex", interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, runtimeMode: "approval-required", createdAt: now, @@ -930,6 +1013,9 @@ describe("ProviderCommandReactor", () => { expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ model: "claude-sonnet-4-6", }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + input: expect.stringContaining("switching from GPT-5 Codex to Claude Sonnet 4.6"), + }); expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ input: expect.stringContaining("You are continuing an existing conversation"), }); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 1fb7284f2a..a5f0bf12c8 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -419,6 +419,7 @@ const make = Effect.gen(function* () { readonly attachments?: ReadonlyArray; readonly provider?: ProviderKind; readonly model?: string; + readonly handoffSourceModel?: string; readonly modelOptions?: ProviderModelOptions; readonly providerOptions?: ProviderStartOptions; readonly interactionMode?: "default" | "plan"; @@ -451,6 +452,8 @@ const make = Effect.gen(function* () { ? "in-session" : (yield* providerService.getCapabilities(activeSession.provider)).sessionModelSwitch; const modelForTurn = sessionModelSwitch === "unsupported" ? activeSession?.model : input.model; + const previousModelForHandoff = + input.handoffSourceModel ?? thread.model ?? ensuredSession.previousModel; const handoffPrompt = ensuredSession.providerSwitched && ensuredSession.previousProvider !== undefined && @@ -459,7 +462,7 @@ const make = Effect.gen(function* () { messagesBeforeCurrent: getMessagesBeforeMessage(thread.messages, input.messageId), previousProvider: ensuredSession.previousProvider, nextProvider: ensuredSession.desiredProvider, - previousModel: ensuredSession.previousModel, + previousModel: previousModelForHandoff, nextModel: modelForTurn ?? ensuredSession.desiredModel, latestUserText: input.messageText, ...(input.attachments !== undefined ? { latestAttachments: input.attachments } : {}), @@ -590,6 +593,9 @@ const make = Effect.gen(function* () { ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), ...(event.payload.provider !== undefined ? { provider: event.payload.provider } : {}), ...(event.payload.model !== undefined ? { model: event.payload.model } : {}), + ...(event.payload.handoffSourceModel !== undefined + ? { handoffSourceModel: event.payload.handoffSourceModel } + : {}), ...(event.payload.modelOptions !== undefined ? { modelOptions: event.payload.modelOptions } : {}), diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 89091fedfb..9b78a06b50 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -361,6 +361,9 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" messageId: command.message.messageId, ...(command.provider !== undefined ? { provider: command.provider } : {}), ...(command.model !== undefined ? { model: command.model } : {}), + ...(command.handoffSourceModel !== undefined + ? { handoffSourceModel: command.handoffSourceModel } + : {}), ...(command.modelOptions !== undefined ? { modelOptions: command.modelOptions } : {}), ...(command.providerOptions !== undefined ? { providerOptions: command.providerOptions } diff --git a/apps/server/src/orchestration/providerHandoff.test.ts b/apps/server/src/orchestration/providerHandoff.test.ts new file mode 100644 index 0000000000..fd61a457d3 --- /dev/null +++ b/apps/server/src/orchestration/providerHandoff.test.ts @@ -0,0 +1,40 @@ +import { MessageId } from "@t3tools/contracts"; +import { describe, expect, it } from "vitest"; + +import { buildProviderHandoffPrompt, getMessagesBeforeMessage } from "./providerHandoff.ts"; + +const asMessageId = (value: string): MessageId => MessageId.makeUnsafe(value); + +describe("providerHandoff", () => { + it("caps a single oversized transcript message", () => { + const prompt = buildProviderHandoffPrompt({ + messagesBeforeCurrent: [ + { + id: asMessageId("msg-1"), + role: "assistant", + text: "A".repeat(20_000), + }, + ], + previousProvider: "codex", + nextProvider: "claudeAgent", + previousModel: "gpt-5.4", + nextModel: "claude-sonnet-4-6", + latestUserText: "continue", + }); + + expect(prompt).toBeDefined(); + expect(prompt).toContain("[Message truncated for length]"); + expect(prompt?.length).toBeLessThanOrEqual(12_500); + }); + + it("returns messages before the current message only", () => { + const messages = [ + { id: asMessageId("msg-1") }, + { id: asMessageId("msg-2") }, + { id: asMessageId("msg-3") }, + ]; + + expect(getMessagesBeforeMessage(messages, asMessageId("msg-2"))).toEqual([messages[0]]); + expect(getMessagesBeforeMessage(messages, asMessageId("msg-1"))).toEqual([]); + }); +}); diff --git a/apps/server/src/orchestration/providerHandoff.ts b/apps/server/src/orchestration/providerHandoff.ts index a93accb7d8..8252b44882 100644 --- a/apps/server/src/orchestration/providerHandoff.ts +++ b/apps/server/src/orchestration/providerHandoff.ts @@ -10,6 +10,7 @@ type HandoffMessage = { const MAX_HANDOFF_TRANSCRIPT_CHARS = 12_000; const MAX_HANDOFF_MESSAGES = 24; +const TRUNCATED_TRANSCRIPT_SLICE_MARKER = "\n[Message truncated for length]"; function formatAttachmentSummary( attachments: ReadonlyArray | undefined, @@ -37,10 +38,22 @@ function formatTranscriptMessage(message: HandoffMessage): string { return sections.join("\n"); } +function truncateTranscriptSlice(slice: string, maxChars: number): string { + if (slice.length <= maxChars) { + return slice; + } + if (maxChars <= TRUNCATED_TRANSCRIPT_SLICE_MARKER.length) { + return slice.slice(0, maxChars); + } + const availableChars = maxChars - TRUNCATED_TRANSCRIPT_SLICE_MARKER.length; + return `${slice.slice(0, availableChars)}${TRUNCATED_TRANSCRIPT_SLICE_MARKER}`; +} + function buildTranscript(messages: ReadonlyArray): string { const slices: string[] = []; let usedChars = 0; let usedCount = 0; + let truncatedForLength = false; for (let index = messages.length - 1; index >= 0; index -= 1) { const message = messages[index]; @@ -48,19 +61,28 @@ function buildTranscript(messages: ReadonlyArray): string { continue; } const nextSlice = formatTranscriptMessage(message); - const nextCost = nextSlice.length + 2; - if (usedCount > 0 && usedChars + nextCost > MAX_HANDOFF_TRANSCRIPT_CHARS) { + const separatorCost = usedCount > 0 ? 2 : 0; + const remainingChars = MAX_HANDOFF_TRANSCRIPT_CHARS - usedChars - separatorCost; + if (remainingChars <= 0) { + truncatedForLength = true; + break; + } + if (nextSlice.length > remainingChars) { + slices.unshift(truncateTranscriptSlice(nextSlice, remainingChars)); + usedChars += remainingChars + separatorCost; + usedCount += 1; + truncatedForLength = true; break; } slices.unshift(nextSlice); - usedChars += nextCost; + usedChars += nextSlice.length + separatorCost; usedCount += 1; if (usedCount >= MAX_HANDOFF_MESSAGES || usedChars >= MAX_HANDOFF_TRANSCRIPT_CHARS) { break; } } - const truncated = slices.length < messages.length; + const truncated = truncatedForLength || slices.length < messages.length; return [truncated ? "[Earlier conversation omitted for brevity]\n" : null, slices.join("\n\n")] .filter((value): value is string => typeof value === "string" && value.length > 0) .join(""); diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index ede1ecd744..0ee84cf21b 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -2625,6 +2625,7 @@ export default function ChatView({ threadId }: ChatViewProps) { attachments: turnAttachments, }, model: selectedModel || undefined, + ...(serverThread?.model ? { handoffSourceModel: serverThread.model } : {}), ...(selectedModelOptionsForDispatch ? { modelOptions: selectedModelOptionsForDispatch } : {}), @@ -2911,6 +2912,7 @@ export default function ChatView({ threadId }: ChatViewProps) { }, provider: selectedProvider, model: selectedModel || undefined, + ...(activeThread.model ? { handoffSourceModel: activeThread.model } : {}), ...(selectedModelOptionsForDispatch ? { modelOptions: selectedModelOptionsForDispatch } : {}), diff --git a/packages/contracts/src/orchestration.test.ts b/packages/contracts/src/orchestration.test.ts index 711129f7e5..a7dfdb5cfa 100644 --- a/packages/contracts/src/orchestration.test.ts +++ b/packages/contracts/src/orchestration.test.ts @@ -273,6 +273,29 @@ it.effect("decodes thread model changed activity payload for user and reroute no }), ); +it.effect("decodes thread.turn.start handoff source model when present", () => + Effect.gen(function* () { + const parsed = yield* decodeThreadTurnStartCommand({ + type: "thread.turn.start", + commandId: "cmd-turn-start-handoff-model", + threadId: "thread-1", + message: { + messageId: "msg-1", + role: "user", + text: "continue", + attachments: [], + }, + provider: "claudeAgent", + model: "claude-sonnet-4-6", + handoffSourceModel: "gpt-5.4", + runtimeMode: "approval-required", + interactionMode: "default", + createdAt: "2026-01-01T00:00:00.000Z", + }); + assert.strictEqual(parsed.handoffSourceModel, "gpt-5.4"); + }), +); + it.effect( "decodes thread.turn-start-requested defaults for provider, runtime mode, and interaction mode", () => @@ -294,6 +317,7 @@ it.effect("decodes thread.turn-start-requested source proposed plan metadata whe const parsed = yield* decodeThreadTurnStartRequestedPayload({ threadId: "thread-2", messageId: "msg-2", + handoffSourceModel: "gpt-5.4", sourceProposedPlan: { threadId: "thread-1", planId: "plan-1", @@ -304,6 +328,7 @@ it.effect("decodes thread.turn-start-requested source proposed plan metadata whe threadId: "thread-1", planId: "plan-1", }); + assert.strictEqual(parsed.handoffSourceModel, "gpt-5.4"); }), ); diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 2305d33e01..28bfd2b7b1 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -411,6 +411,7 @@ export const ThreadTurnStartCommand = Schema.Struct({ }), provider: Schema.optional(ProviderKind), model: Schema.optional(TrimmedNonEmptyString), + handoffSourceModel: Schema.optional(TrimmedNonEmptyString), modelOptions: Schema.optional(ProviderModelOptions), providerOptions: Schema.optional(ProviderStartOptions), assistantDeliveryMode: Schema.optional(AssistantDeliveryMode), @@ -434,6 +435,7 @@ const ClientThreadTurnStartCommand = Schema.Struct({ }), provider: Schema.optional(ProviderKind), model: Schema.optional(TrimmedNonEmptyString), + handoffSourceModel: Schema.optional(TrimmedNonEmptyString), modelOptions: Schema.optional(ProviderModelOptions), providerOptions: Schema.optional(ProviderStartOptions), assistantDeliveryMode: Schema.optional(AssistantDeliveryMode), @@ -728,6 +730,7 @@ export const ThreadTurnStartRequestedPayload = Schema.Struct({ messageId: MessageId, provider: Schema.optional(ProviderKind), model: Schema.optional(TrimmedNonEmptyString), + handoffSourceModel: Schema.optional(TrimmedNonEmptyString), modelOptions: Schema.optional(ProviderModelOptions), providerOptions: Schema.optional(ProviderStartOptions), assistantDeliveryMode: Schema.optional(AssistantDeliveryMode), From 972758f8e929d6e98699a4056e58cbc33acef1cf Mon Sep 17 00:00:00 2001 From: Michael Brown Date: Sat, 21 Mar 2026 22:42:38 -0400 Subject: [PATCH 3/3] Fix model change reactor start effect type --- apps/server/src/orchestration/Layers/ModelChangeReactor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/server/src/orchestration/Layers/ModelChangeReactor.ts b/apps/server/src/orchestration/Layers/ModelChangeReactor.ts index 6d93dc5370..20e9935c40 100644 --- a/apps/server/src/orchestration/Layers/ModelChangeReactor.ts +++ b/apps/server/src/orchestration/Layers/ModelChangeReactor.ts @@ -57,7 +57,7 @@ const make = Effect.gen(function* () { } return worker.enqueue(event); }), - ); + ).pipe(Effect.asVoid); return { start,