forked from liuup/claude-code-analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsessionStorage.ts
More file actions
5105 lines (4706 loc) · 176 KB
/
sessionStorage.ts
File metadata and controls
5105 lines (4706 loc) · 176 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import { feature } from 'bun:bundle'
import type { UUID } from 'crypto'
import type { Dirent } from 'fs'
// Sync fs primitives for readFileTailSync — separate from fs/promises
// imports above. Named (not wildcard) per CLAUDE.md style; no collisions
// with the async-suffixed names.
import { closeSync, fstatSync, openSync, readSync } from 'fs'
import {
appendFile as fsAppendFile,
open as fsOpen,
mkdir,
readdir,
readFile,
stat,
unlink,
writeFile,
} from 'fs/promises'
import memoize from 'lodash-es/memoize.js'
import { basename, dirname, join } from 'path'
import {
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
logEvent,
} from 'src/services/analytics/index.js'
import {
getOriginalCwd,
getPlanSlugCache,
getPromptId,
getSessionId,
getSessionProjectDir,
isSessionPersistenceDisabled,
switchSession,
} from '../bootstrap/state.js'
import { builtInCommandNames } from '../commands.js'
import { COMMAND_NAME_TAG, TICK_TAG } from '../constants/xml.js'
import { getFeatureValue_CACHED_MAY_BE_STALE } from '../services/analytics/growthbook.js'
import * as sessionIngress from '../services/api/sessionIngress.js'
import { REPL_TOOL_NAME } from '../tools/REPLTool/constants.js'
import {
type AgentId,
asAgentId,
asSessionId,
type SessionId,
} from '../types/ids.js'
import type { AttributionSnapshotMessage } from '../types/logs.js'
import {
type ContentReplacementEntry,
type ContextCollapseCommitEntry,
type ContextCollapseSnapshotEntry,
type Entry,
type FileHistorySnapshotMessage,
type LogOption,
type PersistedWorktreeSession,
type SerializedMessage,
sortLogs,
type TranscriptMessage,
} from '../types/logs.js'
import type {
AssistantMessage,
AttachmentMessage,
Message,
SystemCompactBoundaryMessage,
SystemMessage,
UserMessage,
} from '../types/message.js'
import type { QueueOperationMessage } from '../types/messageQueueTypes.js'
import { uniq } from './array.js'
import { registerCleanup } from './cleanupRegistry.js'
import { updateSessionName } from './concurrentSessions.js'
import { getCwd } from './cwd.js'
import { logForDebugging } from './debug.js'
import { logForDiagnosticsNoPII } from './diagLogs.js'
import { getClaudeConfigHomeDir, isEnvTruthy } from './envUtils.js'
import { isFsInaccessible } from './errors.js'
import type { FileHistorySnapshot } from './fileHistory.js'
import { formatFileSize } from './format.js'
import { getFsImplementation } from './fsOperations.js'
import { getWorktreePaths } from './getWorktreePaths.js'
import { getBranch } from './git.js'
import { gracefulShutdownSync, isShuttingDown } from './gracefulShutdown.js'
import { parseJSONL } from './json.js'
import { logError } from './log.js'
import { extractTag, isCompactBoundaryMessage } from './messages.js'
import { sanitizePath } from './path.js'
import {
extractJsonStringField,
extractLastJsonStringField,
LITE_READ_BUF_SIZE,
readHeadAndTail,
readTranscriptForLoad,
SKIP_PRECOMPACT_THRESHOLD,
} from './sessionStoragePortable.js'
import { getSettings_DEPRECATED } from './settings/settings.js'
import { jsonParse, jsonStringify } from './slowOperations.js'
import type { ContentReplacementRecord } from './toolResultStorage.js'
import { validateUuid } from './uuid.js'
// Cache MACRO.VERSION at module level to work around bun --define bug in async contexts
// See: https://github.com/oven-sh/bun/issues/26168
const VERSION = typeof MACRO !== 'undefined' ? MACRO.VERSION : 'unknown'
type Transcript = (
| UserMessage
| AssistantMessage
| AttachmentMessage
| SystemMessage
)[]
// Use getOriginalCwd() at each call site instead of capturing at module load
// time. getCwd() at import time may run before bootstrap resolves symlinks via
// realpathSync, causing a different sanitized project directory than what
// getOriginalCwd() returns after bootstrap. This split-brain made sessions
// saved under one path invisible when loaded via the other.
/**
* Pre-compiled regex to skip non-meaningful messages when extracting first prompt.
* Matches anything starting with a lowercase XML-like tag (IDE context, hook
* output, task notifications, channel messages, etc.) or a synthetic interrupt
* marker. Kept in sync with sessionStoragePortable.ts — generic pattern avoids
* an ever-growing allowlist that falls behind as new notification types ship.
*/
// 50MB — prevents OOM in the tombstone slow path which reads + rewrites the
// entire session file. Session files can grow to multiple GB (inc-3930).
const MAX_TOMBSTONE_REWRITE_BYTES = 50 * 1024 * 1024
const SKIP_FIRST_PROMPT_PATTERN =
/^(?:\s*<[a-z][\w-]*[\s>]|\[Request interrupted by user[^\]]*\])/
/**
* Type guard to check if an entry is a transcript message.
* Transcript messages include user, assistant, attachment, and system messages.
* IMPORTANT: This is the single source of truth for what constitutes a transcript message.
* loadTranscriptFile() uses this to determine which messages to load into the chain.
*
* Progress messages are NOT transcript messages. They are ephemeral UI state
* and must not be persisted to the JSONL or participate in the parentUuid
* chain. Including them caused chain forks that orphaned real conversation
* messages on resume (see #14373, #23537).
*/
export function isTranscriptMessage(entry: Entry): entry is TranscriptMessage {
return (
entry.type === 'user' ||
entry.type === 'assistant' ||
entry.type === 'attachment' ||
entry.type === 'system'
)
}
/**
* Entries that participate in the parentUuid chain. Used on the write path
* (insertMessageChain, useLogMessages) to skip progress when assigning
* parentUuid. Old transcripts with progress already in the chain are handled
* by the progressBridge rewrite in loadTranscriptFile.
*/
export function isChainParticipant(m: Pick<Message, 'type'>): boolean {
return m.type !== 'progress'
}
type LegacyProgressEntry = {
type: 'progress'
uuid: UUID
parentUuid: UUID | null
}
/**
* Progress entries in transcripts written before PR #24099. They are not
* in the Entry type union anymore but still exist on disk with uuid and
* parentUuid fields. loadTranscriptFile bridges the chain across them.
*/
function isLegacyProgressEntry(entry: unknown): entry is LegacyProgressEntry {
return (
typeof entry === 'object' &&
entry !== null &&
'type' in entry &&
entry.type === 'progress' &&
'uuid' in entry &&
typeof entry.uuid === 'string'
)
}
/**
* High-frequency tool progress ticks (1/sec for Sleep, per-chunk for Bash).
* These are UI-only: not sent to the API, not rendered after the tool
* completes. Used by REPL.tsx to replace-in-place instead of appending, and
* by loadTranscriptFile to skip legacy entries from old transcripts.
*/
const EPHEMERAL_PROGRESS_TYPES = new Set([
'bash_progress',
'powershell_progress',
'mcp_progress',
...(feature('PROACTIVE') || feature('KAIROS')
? (['sleep_progress'] as const)
: []),
])
export function isEphemeralToolProgress(dataType: unknown): boolean {
return typeof dataType === 'string' && EPHEMERAL_PROGRESS_TYPES.has(dataType)
}
export function getProjectsDir(): string {
return join(getClaudeConfigHomeDir(), 'projects')
}
export function getTranscriptPath(): string {
const projectDir = getSessionProjectDir() ?? getProjectDir(getOriginalCwd())
return join(projectDir, `${getSessionId()}.jsonl`)
}
export function getTranscriptPathForSession(sessionId: string): string {
// When asking for the CURRENT session's transcript, honor sessionProjectDir
// the same way getTranscriptPath() does. Without this, hooks get a
// transcript_path computed from originalCwd while the actual file was
// written to sessionProjectDir (set by switchActiveSession on resume/branch)
// — different directories, so the hook sees MISSING (gh-30217). CC-34
// made sessionId + sessionProjectDir atomic precisely to prevent this
// kind of drift; this function just wasn't updated to read both.
//
// For OTHER session IDs we can only guess via originalCwd — we don't
// track a sessionId→projectDir map. Callers wanting a specific other
// session's path should pass fullPath explicitly (most save* functions
// already accept this).
if (sessionId === getSessionId()) {
return getTranscriptPath()
}
const projectDir = getProjectDir(getOriginalCwd())
return join(projectDir, `${sessionId}.jsonl`)
}
// 50 MB — session JSONL can grow to multiple GB (inc-3930). Callers that
// read the raw transcript must bail out above this threshold to avoid OOM.
export const MAX_TRANSCRIPT_READ_BYTES = 50 * 1024 * 1024
// In-memory map of agentId → subdirectory for grouping related subagent
// transcripts (e.g. workflow runs write to subagents/workflows/<runId>/).
// Populated before the agent runs; consulted by getAgentTranscriptPath.
const agentTranscriptSubdirs = new Map<string, string>()
export function setAgentTranscriptSubdir(
agentId: string,
subdir: string,
): void {
agentTranscriptSubdirs.set(agentId, subdir)
}
export function clearAgentTranscriptSubdir(agentId: string): void {
agentTranscriptSubdirs.delete(agentId)
}
export function getAgentTranscriptPath(agentId: AgentId): string {
// Same sessionProjectDir consistency as getTranscriptPathForSession —
// subagent transcripts live under the session dir, so if the session
// transcript is at sessionProjectDir, subagent transcripts are too.
const projectDir = getSessionProjectDir() ?? getProjectDir(getOriginalCwd())
const sessionId = getSessionId()
const subdir = agentTranscriptSubdirs.get(agentId)
const base = subdir
? join(projectDir, sessionId, 'subagents', subdir)
: join(projectDir, sessionId, 'subagents')
return join(base, `agent-${agentId}.jsonl`)
}
function getAgentMetadataPath(agentId: AgentId): string {
return getAgentTranscriptPath(agentId).replace(/\.jsonl$/, '.meta.json')
}
export type AgentMetadata = {
agentType: string
/** Worktree path if the agent was spawned with isolation: "worktree" */
worktreePath?: string
/** Original task description from the AgentTool input. Persisted so a
* resumed agent's notification can show the original description instead
* of a placeholder. Optional — older metadata files lack this field. */
description?: string
}
/**
* Persist the agentType used to launch a subagent. Read by resume to
* route correctly when subagent_type is omitted — without this, resuming
* a fork silently degrades to general-purpose (4KB system prompt, no
* inherited history). Sidecar file avoids JSONL schema changes.
*
* Also stores the worktreePath when the agent was spawned with worktree
* isolation, enabling resume to restore the correct cwd.
*/
export async function writeAgentMetadata(
agentId: AgentId,
metadata: AgentMetadata,
): Promise<void> {
const path = getAgentMetadataPath(agentId)
await mkdir(dirname(path), { recursive: true })
await writeFile(path, JSON.stringify(metadata))
}
export async function readAgentMetadata(
agentId: AgentId,
): Promise<AgentMetadata | null> {
const path = getAgentMetadataPath(agentId)
try {
const raw = await readFile(path, 'utf-8')
return JSON.parse(raw) as AgentMetadata
} catch (e) {
if (isFsInaccessible(e)) return null
throw e
}
}
export type RemoteAgentMetadata = {
taskId: string
remoteTaskType: string
/** CCR session ID — used to fetch live status from the Sessions API on resume. */
sessionId: string
title: string
command: string
spawnedAt: number
toolUseId?: string
isLongRunning?: boolean
isUltraplan?: boolean
isRemoteReview?: boolean
remoteTaskMetadata?: Record<string, unknown>
}
function getRemoteAgentsDir(): string {
// Same sessionProjectDir fallback as getAgentTranscriptPath — the project
// dir (containing the .jsonl), not the session dir, so sessionId is joined.
const projectDir = getSessionProjectDir() ?? getProjectDir(getOriginalCwd())
return join(projectDir, getSessionId(), 'remote-agents')
}
function getRemoteAgentMetadataPath(taskId: string): string {
return join(getRemoteAgentsDir(), `remote-agent-${taskId}.meta.json`)
}
/**
* Persist metadata for a remote-agent task so it can be restored on session
* resume. Per-task sidecar file (sibling dir to subagents/) survives
* hydrateSessionFromRemote's .jsonl wipe; status is always fetched fresh
* from CCR on restore — only identity is persisted locally.
*/
export async function writeRemoteAgentMetadata(
taskId: string,
metadata: RemoteAgentMetadata,
): Promise<void> {
const path = getRemoteAgentMetadataPath(taskId)
await mkdir(dirname(path), { recursive: true })
await writeFile(path, JSON.stringify(metadata))
}
export async function readRemoteAgentMetadata(
taskId: string,
): Promise<RemoteAgentMetadata | null> {
const path = getRemoteAgentMetadataPath(taskId)
try {
const raw = await readFile(path, 'utf-8')
return JSON.parse(raw) as RemoteAgentMetadata
} catch (e) {
if (isFsInaccessible(e)) return null
throw e
}
}
export async function deleteRemoteAgentMetadata(taskId: string): Promise<void> {
const path = getRemoteAgentMetadataPath(taskId)
try {
await unlink(path)
} catch (e) {
if (isFsInaccessible(e)) return
throw e
}
}
/**
* Scan the remote-agents/ directory for all persisted metadata files.
* Used by restoreRemoteAgentTasks to reconnect to still-running CCR sessions.
*/
export async function listRemoteAgentMetadata(): Promise<
RemoteAgentMetadata[]
> {
const dir = getRemoteAgentsDir()
let entries: Dirent[]
try {
entries = await readdir(dir, { withFileTypes: true })
} catch (e) {
if (isFsInaccessible(e)) return []
throw e
}
const results: RemoteAgentMetadata[] = []
for (const entry of entries) {
if (!entry.isFile() || !entry.name.endsWith('.meta.json')) continue
try {
const raw = await readFile(join(dir, entry.name), 'utf-8')
results.push(JSON.parse(raw) as RemoteAgentMetadata)
} catch (e) {
// Skip unreadable or corrupt files — a partial write from a crashed
// fire-and-forget persist shouldn't take down the whole restore.
logForDebugging(
`listRemoteAgentMetadata: skipping ${entry.name}: ${String(e)}`,
)
}
}
return results
}
export function sessionIdExists(sessionId: string): boolean {
const projectDir = getProjectDir(getOriginalCwd())
const sessionFile = join(projectDir, `${sessionId}.jsonl`)
const fs = getFsImplementation()
try {
fs.statSync(sessionFile)
return true
} catch {
return false
}
}
// exported for testing
export function getNodeEnv(): string {
return process.env.NODE_ENV || 'development'
}
// exported for testing
export function getUserType(): string {
return process.env.USER_TYPE || 'external'
}
function getEntrypoint(): string | undefined {
return process.env.CLAUDE_CODE_ENTRYPOINT
}
export function isCustomTitleEnabled(): boolean {
return true
}
// Memoized: called 12+ times per turn via hooks.ts createBaseHookInput
// (PostToolUse path, 5×/turn) + various save* functions. Input is a cwd
// string; homedir/env/regex are all session-invariant so the result is
// stable for a given input. Worktree switches just change the key — no
// cache clear needed.
export const getProjectDir = memoize((projectDir: string): string => {
return join(getProjectsDir(), sanitizePath(projectDir))
})
let project: Project | null = null
let cleanupRegistered = false
function getProject(): Project {
if (!project) {
project = new Project()
// Register flush as a cleanup handler (only once)
if (!cleanupRegistered) {
registerCleanup(async () => {
// Flush queued writes first, then re-append session metadata
// (customTitle, tag) so they always appear in the last 64KB tail
// window. readLiteMetadata only reads the tail to extract these
// fields — if enough messages are appended after a /rename, the
// custom-title entry gets pushed outside the window and --resume
// shows the auto-generated firstPrompt instead.
await project?.flush()
try {
project?.reAppendSessionMetadata()
} catch {
// Best-effort — don't let metadata re-append crash the cleanup
}
})
cleanupRegistered = true
}
}
return project
}
/**
* Reset the Project singleton's flush state for testing.
* This ensures tests don't interfere with each other via shared counter state.
*/
export function resetProjectFlushStateForTesting(): void {
project?._resetFlushState()
}
/**
* Reset the entire Project singleton for testing.
* This ensures tests with different CLAUDE_CONFIG_DIR values
* don't share stale sessionFile paths.
*/
export function resetProjectForTesting(): void {
project = null
}
export function setSessionFileForTesting(path: string): void {
getProject().sessionFile = path
}
type InternalEventWriter = (
eventType: string,
payload: Record<string, unknown>,
options?: { isCompaction?: boolean; agentId?: string },
) => Promise<void>
/**
* Register a CCR v2 internal event writer for transcript persistence.
* When set, transcript messages are written as internal worker events
* instead of going through v1 Session Ingress.
*/
export function setInternalEventWriter(writer: InternalEventWriter): void {
getProject().setInternalEventWriter(writer)
}
type InternalEventReader = () => Promise<
{ payload: Record<string, unknown>; agent_id?: string }[] | null
>
/**
* Register a CCR v2 internal event reader for session resume.
* When set, hydrateFromCCRv2InternalEvents() can fetch foreground and
* subagent internal events to reconstruct conversation state on reconnection.
*/
export function setInternalEventReader(
reader: InternalEventReader,
subagentReader: InternalEventReader,
): void {
getProject().setInternalEventReader(reader)
getProject().setInternalSubagentEventReader(subagentReader)
}
/**
* Set the remote ingress URL on the current Project for testing.
* This simulates what hydrateRemoteSession does in production.
*/
export function setRemoteIngressUrlForTesting(url: string): void {
getProject().setRemoteIngressUrl(url)
}
const REMOTE_FLUSH_INTERVAL_MS = 10
class Project {
// Minimal cache for current session only (not all sessions)
currentSessionTag: string | undefined
currentSessionTitle: string | undefined
currentSessionAgentName: string | undefined
currentSessionAgentColor: string | undefined
currentSessionLastPrompt: string | undefined
currentSessionAgentSetting: string | undefined
currentSessionMode: 'coordinator' | 'normal' | undefined
// Tri-state: undefined = never touched (don't write), null = exited worktree,
// object = currently in worktree. reAppendSessionMetadata writes null so
// --resume knows the session exited (vs. crashed while inside).
currentSessionWorktree: PersistedWorktreeSession | null | undefined
currentSessionPrNumber: number | undefined
currentSessionPrUrl: string | undefined
currentSessionPrRepository: string | undefined
sessionFile: string | null = null
// Entries buffered while sessionFile is null. Flushed by materializeSessionFile
// on the first user/assistant message — prevents metadata-only session files.
private pendingEntries: Entry[] = []
private remoteIngressUrl: string | null = null
private internalEventWriter: InternalEventWriter | null = null
private internalEventReader: InternalEventReader | null = null
private internalSubagentEventReader: InternalEventReader | null = null
private pendingWriteCount: number = 0
private flushResolvers: Array<() => void> = []
// Per-file write queues. Each entry carries a resolve callback so
// callers of enqueueWrite can optionally await their specific write.
private writeQueues = new Map<
string,
Array<{ entry: Entry; resolve: () => void }>
>()
private flushTimer: ReturnType<typeof setTimeout> | null = null
private activeDrain: Promise<void> | null = null
private FLUSH_INTERVAL_MS = 100
private readonly MAX_CHUNK_BYTES = 100 * 1024 * 1024
constructor() {}
/** @internal Reset flush/queue state for testing. */
_resetFlushState(): void {
this.pendingWriteCount = 0
this.flushResolvers = []
if (this.flushTimer) clearTimeout(this.flushTimer)
this.flushTimer = null
this.activeDrain = null
this.writeQueues = new Map()
}
private incrementPendingWrites(): void {
this.pendingWriteCount++
}
private decrementPendingWrites(): void {
this.pendingWriteCount--
if (this.pendingWriteCount === 0) {
// Resolve all waiting flush promises
for (const resolve of this.flushResolvers) {
resolve()
}
this.flushResolvers = []
}
}
private async trackWrite<T>(fn: () => Promise<T>): Promise<T> {
this.incrementPendingWrites()
try {
return await fn()
} finally {
this.decrementPendingWrites()
}
}
private enqueueWrite(filePath: string, entry: Entry): Promise<void> {
return new Promise<void>(resolve => {
let queue = this.writeQueues.get(filePath)
if (!queue) {
queue = []
this.writeQueues.set(filePath, queue)
}
queue.push({ entry, resolve })
this.scheduleDrain()
})
}
private scheduleDrain(): void {
if (this.flushTimer) {
return
}
this.flushTimer = setTimeout(async () => {
this.flushTimer = null
this.activeDrain = this.drainWriteQueue()
await this.activeDrain
this.activeDrain = null
// If more items arrived during drain, schedule again
if (this.writeQueues.size > 0) {
this.scheduleDrain()
}
}, this.FLUSH_INTERVAL_MS)
}
private async appendToFile(filePath: string, data: string): Promise<void> {
try {
await fsAppendFile(filePath, data, { mode: 0o600 })
} catch {
// Directory may not exist — some NFS-like filesystems return
// unexpected error codes, so don't discriminate on code.
await mkdir(dirname(filePath), { recursive: true, mode: 0o700 })
await fsAppendFile(filePath, data, { mode: 0o600 })
}
}
private async drainWriteQueue(): Promise<void> {
for (const [filePath, queue] of this.writeQueues) {
if (queue.length === 0) {
continue
}
const batch = queue.splice(0)
let content = ''
const resolvers: Array<() => void> = []
for (const { entry, resolve } of batch) {
const line = jsonStringify(entry) + '\n'
if (content.length + line.length >= this.MAX_CHUNK_BYTES) {
// Flush chunk and resolve its entries before starting a new one
await this.appendToFile(filePath, content)
for (const r of resolvers) {
r()
}
resolvers.length = 0
content = ''
}
content += line
resolvers.push(resolve)
}
if (content.length > 0) {
await this.appendToFile(filePath, content)
for (const r of resolvers) {
r()
}
}
}
// Clean up empty queues
for (const [filePath, queue] of this.writeQueues) {
if (queue.length === 0) {
this.writeQueues.delete(filePath)
}
}
}
resetSessionFile(): void {
this.sessionFile = null
this.pendingEntries = []
}
/**
* Re-append cached session metadata to the end of the transcript file.
* This ensures metadata stays within the tail window that readLiteMetadata
* reads during progressive loading.
*
* Called from two contexts with different file-ordering implications:
* - During compaction (compact.ts, reactiveCompact.ts): writes metadata
* just before the boundary marker is emitted - these entries end up
* before the boundary and are recovered by scanPreBoundaryMetadata.
* - On session exit (cleanup handler): writes metadata at EOF after all
* boundaries - this is what enables loadTranscriptFile's pre-compact
* skip to find metadata without a forward scan.
*
* External-writer safety for SDK-mutable fields (custom-title, tag):
* before re-appending, refresh the cache from the tail scan window. If an
* external process (SDK renameSession/tagSession) wrote a fresher value,
* our stale cache absorbs it and the re-append below persists it — not
* the stale CLI value. If no entry is in the tail (evicted, or never
* written by the SDK), the cache is the only source of truth and is
* re-appended as-is.
*
* Re-append is unconditional (even when the value is already in the
* tail): during compaction, a title 40KB from EOF is inside the current
* tail window but will fall out once the post-compaction session grows.
* Skipping the re-append would defeat the purpose of this call. Fields
* the SDK cannot touch (last-prompt, agent-*, mode, pr-link) have no
* external-writer concern — their caches are authoritative.
*/
reAppendSessionMetadata(skipTitleRefresh = false): void {
if (!this.sessionFile) return
const sessionId = getSessionId() as UUID
if (!sessionId) return
// One sync tail read to refresh SDK-mutable fields. Same
// LITE_READ_BUF_SIZE window readLiteMetadata uses. Empty string on
// failure → extract returns null → cache is the only source of truth.
const tail = readFileTailSync(this.sessionFile)
// Absorb any fresher SDK-written title/tag into our cache. If the SDK
// wrote while we had the session open, our cache is stale — the tail
// value is authoritative. If the tail has nothing (evicted or never
// written externally), the cache stands.
//
// Filter with startsWith to match only top-level JSONL entries (col 0)
// and not "type":"tag" appearing inside a nested tool_use input that
// happens to be JSON-serialized into a message.
const tailLines = tail.split('\n')
if (!skipTitleRefresh) {
const titleLine = tailLines.findLast(l =>
l.startsWith('{"type":"custom-title"'),
)
if (titleLine) {
const tailTitle = extractLastJsonStringField(titleLine, 'customTitle')
// `!== undefined` distinguishes no-match from empty-string match.
// renameSession rejects empty titles, but the CLI is defensive: an
// external writer with customTitle:"" should clear the cache so the
// re-append below skips it (instead of resurrecting a stale title).
if (tailTitle !== undefined) {
this.currentSessionTitle = tailTitle || undefined
}
}
}
const tagLine = tailLines.findLast(l => l.startsWith('{"type":"tag"'))
if (tagLine) {
const tailTag = extractLastJsonStringField(tagLine, 'tag')
// Same: tagSession(id, null) writes `tag:""` to clear.
if (tailTag !== undefined) {
this.currentSessionTag = tailTag || undefined
}
}
// lastPrompt is re-appended so readLiteMetadata can show what the
// user was most recently doing. Written first so customTitle/tag/etc
// land closer to EOF (they're the more critical fields for tail reads).
if (this.currentSessionLastPrompt) {
appendEntryToFile(this.sessionFile, {
type: 'last-prompt',
lastPrompt: this.currentSessionLastPrompt,
sessionId,
})
}
// Unconditional: cache was refreshed from tail above; re-append keeps
// the entry at EOF so compaction-pushed content doesn't evict it.
if (this.currentSessionTitle) {
appendEntryToFile(this.sessionFile, {
type: 'custom-title',
customTitle: this.currentSessionTitle,
sessionId,
})
}
if (this.currentSessionTag) {
appendEntryToFile(this.sessionFile, {
type: 'tag',
tag: this.currentSessionTag,
sessionId,
})
}
if (this.currentSessionAgentName) {
appendEntryToFile(this.sessionFile, {
type: 'agent-name',
agentName: this.currentSessionAgentName,
sessionId,
})
}
if (this.currentSessionAgentColor) {
appendEntryToFile(this.sessionFile, {
type: 'agent-color',
agentColor: this.currentSessionAgentColor,
sessionId,
})
}
if (this.currentSessionAgentSetting) {
appendEntryToFile(this.sessionFile, {
type: 'agent-setting',
agentSetting: this.currentSessionAgentSetting,
sessionId,
})
}
if (this.currentSessionMode) {
appendEntryToFile(this.sessionFile, {
type: 'mode',
mode: this.currentSessionMode,
sessionId,
})
}
if (this.currentSessionWorktree !== undefined) {
appendEntryToFile(this.sessionFile, {
type: 'worktree-state',
worktreeSession: this.currentSessionWorktree,
sessionId,
})
}
if (
this.currentSessionPrNumber !== undefined &&
this.currentSessionPrUrl &&
this.currentSessionPrRepository
) {
appendEntryToFile(this.sessionFile, {
type: 'pr-link',
sessionId,
prNumber: this.currentSessionPrNumber,
prUrl: this.currentSessionPrUrl,
prRepository: this.currentSessionPrRepository,
timestamp: new Date().toISOString(),
})
}
}
async flush(): Promise<void> {
// Cancel pending timer
if (this.flushTimer) {
clearTimeout(this.flushTimer)
this.flushTimer = null
}
// Wait for any in-flight drain to finish
if (this.activeDrain) {
await this.activeDrain
}
// Drain anything remaining in the queues
await this.drainWriteQueue()
// Wait for non-queue tracked operations (e.g. removeMessageByUuid)
if (this.pendingWriteCount === 0) {
return
}
return new Promise<void>(resolve => {
this.flushResolvers.push(resolve)
})
}
/**
* Remove a message from the transcript by UUID.
* Used for tombstoning orphaned messages from failed streaming attempts.
*
* The target is almost always the most recently appended entry, so we
* read only the tail, locate the line, and splice it out with a
* positional write + truncate instead of rewriting the whole file.
*/
async removeMessageByUuid(targetUuid: UUID): Promise<void> {
return this.trackWrite(async () => {
if (this.sessionFile === null) return
try {
let fileSize = 0
const fh = await fsOpen(this.sessionFile, 'r+')
try {
const { size } = await fh.stat()
fileSize = size
if (size === 0) return
const chunkLen = Math.min(size, LITE_READ_BUF_SIZE)
const tailStart = size - chunkLen
const buf = Buffer.allocUnsafe(chunkLen)
const { bytesRead } = await fh.read(buf, 0, chunkLen, tailStart)
const tail = buf.subarray(0, bytesRead)
// Entries are serialized via JSON.stringify (no key-value
// whitespace). Search for the full `"uuid":"..."` pattern, not
// just the bare UUID, so we do not match the same value sitting
// in `parentUuid` of a child entry. UUIDs are pure ASCII so a
// byte-level search is correct.
const needle = `"uuid":"${targetUuid}"`
const matchIdx = tail.lastIndexOf(needle)
if (matchIdx >= 0) {
// 0x0a never appears inside a UTF-8 multi-byte sequence, so
// byte-scanning for line boundaries is safe even if the chunk
// starts mid-character.
const prevNl = tail.lastIndexOf(0x0a, matchIdx)
// If the preceding newline is outside our chunk and we did not
// read from the start of the file, the line is longer than the
// window - fall through to the slow path.
if (prevNl >= 0 || tailStart === 0) {
const lineStart = prevNl + 1 // 0 when prevNl === -1
const nextNl = tail.indexOf(0x0a, matchIdx + needle.length)
const lineEnd = nextNl >= 0 ? nextNl + 1 : bytesRead
const absLineStart = tailStart + lineStart
const afterLen = bytesRead - lineEnd
// Truncate first, then re-append the trailing lines. In the
// common case (target is the last entry) afterLen is 0 and
// this is a single ftruncate.
await fh.truncate(absLineStart)
if (afterLen > 0) {
await fh.write(tail, lineEnd, afterLen, absLineStart)
}
return
}
}
} finally {
await fh.close()
}
// Slow path: target was not in the last 64KB. Rare - requires many
// large entries to have landed between the write and the tombstone.
if (fileSize > MAX_TOMBSTONE_REWRITE_BYTES) {
logForDebugging(
`Skipping tombstone removal: session file too large (${formatFileSize(fileSize)})`,
{ level: 'warn' },
)
return
}
const content = await readFile(this.sessionFile, { encoding: 'utf-8' })
const lines = content.split('\n').filter((line: string) => {
if (!line.trim()) return true
try {
const entry = jsonParse(line)
return entry.uuid !== targetUuid
} catch {
return true // Keep malformed lines
}
})
await writeFile(this.sessionFile, lines.join('\n'), {
encoding: 'utf8',
})
} catch {
// Silently ignore errors - the file might not exist yet
}
})
}
/**
* True when test env / cleanupPeriodDays=0 / --no-session-persistence /
* CLAUDE_CODE_SKIP_PROMPT_HISTORY should suppress all transcript writes.
* Shared guard for appendEntry and materializeSessionFile so both skip
* consistently. The env var is set by tmuxSocket.ts so Tungsten-spawned
* test sessions don't pollute the user's --resume list.
*/
private shouldSkipPersistence(): boolean {
const allowTestPersistence = isEnvTruthy(
process.env.TEST_ENABLE_SESSION_PERSISTENCE,
)
return (
(getNodeEnv() === 'test' && !allowTestPersistence) ||
getSettings_DEPRECATED()?.cleanupPeriodDays === 0 ||
isSessionPersistenceDisabled() ||
isEnvTruthy(process.env.CLAUDE_CODE_SKIP_PROMPT_HISTORY)
)
}
/**
* Create the session file, write cached startup metadata, and flush
* buffered entries. Called on the first user/assistant message.
*/
private async materializeSessionFile(): Promise<void> {
// Guard here too — reAppendSessionMetadata writes via appendEntryToFile
// (not appendEntry) so it would bypass the per-entry persistence check
// and create a metadata-only file despite --no-session-persistence.
if (this.shouldSkipPersistence()) return
this.ensureCurrentSessionFile()
// mode/agentSetting are cache-only pre-materialization; write them now.
this.reAppendSessionMetadata()
if (this.pendingEntries.length > 0) {
const buffered = this.pendingEntries
this.pendingEntries = []
for (const entry of buffered) {
await this.appendEntry(entry)
}
}
}
async insertMessageChain(
messages: Transcript,
isSidechain: boolean = false,
agentId?: string,
startingParentUuid?: UUID | null,
teamInfo?: { teamName?: string; agentName?: string },
) {
return this.trackWrite(async () => {