From 3d4c962c287bdbed905b2094dda51cdfd3fd65c3 Mon Sep 17 00:00:00 2001 From: Igor Barakaiev Date: Tue, 23 Dec 2025 13:59:22 -0800 Subject: [PATCH] fix(electric-db-collection): prevent orphan transactions after must-refetch in progressive mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a `must-refetch` message is received in progressive mode, it starts a transaction with `begin()` and calls `truncate()`. This resets `hasReceivedUpToDate` to `false`, causing `isBufferingInitialSync()` to return `true`. The bug: subsequent messages after must-refetch were being buffered instead of written to the existing transaction. When `up-to-date` was received, the atomic swap code would create a NEW transaction, leaving the first transaction (from must-refetch) uncommitted forever. This "orphan transaction" caused the collection to become corrupted with undefined values. The fix: Add `&& !transactionStarted` checks to 5 places so that when a transaction is already started (from must-refetch), messages are written directly to it instead of being buffered for atomic swap. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .changeset/fix-progressive-must-refetch.md | 9 +++++++ .../electric-db-collection/src/electric.ts | 27 +++++++++++++++---- 2 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 .changeset/fix-progressive-must-refetch.md diff --git a/.changeset/fix-progressive-must-refetch.md b/.changeset/fix-progressive-must-refetch.md new file mode 100644 index 000000000..1c279ff69 --- /dev/null +++ b/.changeset/fix-progressive-must-refetch.md @@ -0,0 +1,9 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Fix orphan transactions after `must-refetch` in progressive sync mode + +When a `must-refetch` message was received in progressive mode, it started a transaction with `truncate()` but reset `hasReceivedUpToDate`, causing subsequent messages to be buffered instead of written to the existing transaction. On `up-to-date`, the atomic swap code would create a new transaction, leaving the first one uncommitted forever. This caused collections to become corrupted with undefined values. + +The fix ensures that when a transaction is already started (e.g., from must-refetch), messages are written directly to it instead of being buffered for atomic swap. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 65d3b7cc0..0755c520a 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1303,7 +1303,12 @@ function createElectricSync>( // Check for txids in the message and add them to our store // Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap) - if (hasTxids(message) && !isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), track txids + // to avoid losing them when messages are written to the existing transaction. + if ( + hasTxids(message) && + (!isBufferingInitialSync() || transactionStarted) + ) { message.headers.txids?.forEach((txid) => newTxids.add(txid)) } @@ -1338,7 +1343,9 @@ function createElectricSync>( } // In buffered initial sync of progressive mode, buffer messages instead of writing - if (isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), write + // directly to it instead of buffering. This prevents orphan transactions. + if (isBufferingInitialSync() && !transactionStarted) { bufferedMessages.push(message) } else { // Normal processing: write changes immediately @@ -1352,7 +1359,9 @@ function createElectricSync>( } else if (isSnapshotEndMessage(message)) { // Track postgres snapshot metadata for resolving awaiting mutations // Skip during buffered initial sync (will be extracted during atomic swap) - if (!isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), track snapshots + // to avoid losing them when messages are written to the existing transaction. + if (!isBufferingInitialSync() || transactionStarted) { newSnapshots.push(parseSnapshotMessage(message)) } } else if (isUpToDateMessage(message)) { @@ -1365,7 +1374,9 @@ function createElectricSync>( } } else if (isMoveOutMessage(message)) { // Handle move-out event: buffer if buffering, otherwise process immediately - if (isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), process + // immediately to avoid orphan transactions. + if (isBufferingInitialSync() && !transactionStarted) { bufferedMessages.push(message) } else { // Normal processing: process move-out immediately @@ -1405,7 +1416,13 @@ function createElectricSync>( if (commitPoint !== null) { // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) - if (isBufferingInitialSync() && commitPoint === `up-to-date`) { + // EXCEPTION: Skip atomic swap if a transaction is already started (e.g., from must-refetch). + // In that case, do a normal commit to properly close the existing transaction. + if ( + isBufferingInitialSync() && + commitPoint === `up-to-date` && + !transactionStarted + ) { debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, )