Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/fix-progressive-must-refetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@tanstack/electric-db-collection': patch
---

Fix orphan transactions after `must-refetch` in progressive sync mode

When a `must-refetch` message was received in progressive mode, it started a transaction with `truncate()` but reset `hasReceivedUpToDate`, causing subsequent messages to be buffered instead of written to the existing transaction. On `up-to-date`, the atomic swap code would create a new transaction, leaving the first one uncommitted forever. This caused collections to become corrupted with undefined values.

The fix ensures that when a transaction is already started (e.g., from must-refetch), messages are written directly to it instead of being buffered for atomic swap.
27 changes: 22 additions & 5 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,12 @@ function createElectricSync<T extends Row<unknown>>(

// Check for txids in the message and add them to our store
// Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap)
if (hasTxids(message) && !isBufferingInitialSync()) {
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), track txids
// to avoid losing them when messages are written to the existing transaction.
if (
hasTxids(message) &&
(!isBufferingInitialSync() || transactionStarted)
) {
message.headers.txids?.forEach((txid) => newTxids.add(txid))
}

Expand Down Expand Up @@ -1338,7 +1343,9 @@ function createElectricSync<T extends Row<unknown>>(
}

// In buffered initial sync of progressive mode, buffer messages instead of writing
if (isBufferingInitialSync()) {
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), write
// directly to it instead of buffering. This prevents orphan transactions.
if (isBufferingInitialSync() && !transactionStarted) {
bufferedMessages.push(message)
} else {
// Normal processing: write changes immediately
Expand All @@ -1352,7 +1359,9 @@ function createElectricSync<T extends Row<unknown>>(
} else if (isSnapshotEndMessage(message)) {
// Track postgres snapshot metadata for resolving awaiting mutations
// Skip during buffered initial sync (will be extracted during atomic swap)
if (!isBufferingInitialSync()) {
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), track snapshots
// to avoid losing them when messages are written to the existing transaction.
if (!isBufferingInitialSync() || transactionStarted) {
newSnapshots.push(parseSnapshotMessage(message))
}
} else if (isUpToDateMessage(message)) {
Expand All @@ -1365,7 +1374,9 @@ function createElectricSync<T extends Row<unknown>>(
}
} else if (isMoveOutMessage(message)) {
// Handle move-out event: buffer if buffering, otherwise process immediately
if (isBufferingInitialSync()) {
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), process
// immediately to avoid orphan transactions.
if (isBufferingInitialSync() && !transactionStarted) {
bufferedMessages.push(message)
} else {
// Normal processing: process move-out immediately
Expand Down Expand Up @@ -1405,7 +1416,13 @@ function createElectricSync<T extends Row<unknown>>(

if (commitPoint !== null) {
// PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end)
if (isBufferingInitialSync() && commitPoint === `up-to-date`) {
// EXCEPTION: Skip atomic swap if a transaction is already started (e.g., from must-refetch).
// In that case, do a normal commit to properly close the existing transaction.
if (
isBufferingInitialSync() &&
commitPoint === `up-to-date` &&
!transactionStarted
) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`,
)
Expand Down
Loading