diff --git a/channeldb/channel.go b/channeldb/channel.go index 947226c8176..a0d2ae85989 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -2,6 +2,7 @@ package channeldb import ( "bytes" + "context" "crypto/hmac" "crypto/sha256" "encoding/binary" @@ -183,6 +184,16 @@ var ( // have any channels state. ErrNoChanInfoFound = fmt.Errorf("no chan info found") + // ErrChannelPendingCleanup is returned when a channel bucket exists + // in openChannelBucket but chanInfoKey is absent and the + // pendingCleanupKey sentinel is present. This means CloseChannel + // (Phase 1) has completed — the channel is logically closed — but + // PurgeClosedChannelData (Phase 2) has not yet deleted the bulk + // historical data from this bucket. Callers that scan all open + // channels should skip such buckets; callers doing targeted lookups + // should treat the channel as not found. + ErrChannelPendingCleanup = fmt.Errorf("channel pending cleanup") + // ErrNoRevocationsFound is returned when revocation state for a // particular channel cannot be found. ErrNoRevocationsFound = fmt.Errorf("no revocations found") @@ -3969,155 +3980,38 @@ type ChannelCloseSummary struct { LastChanSyncMsg *lnwire.ChannelReestablish } -// CloseChannel closes a previously active Lightning channel. Closing a channel -// entails deleting all saved state within the database concerning this -// channel. This method also takes a struct that summarizes the state of the -// channel at closing, this compact representation will be the only component -// of a channel left over after a full closing. It takes an optional set of -// channel statuses which will be written to the historical channel bucket. -// These statuses are used to record close initiators. +// CloseChannel closes a previously active Lightning channel. Closing a +// channel entails atomically recording the close summary, archiving the +// channel state, and updating the outpoint index — but intentionally defers +// the deletion of bulk historical data (revocation log, forwarding packages) +// so that it can be purged in smaller batches via the ChannelCloser interface +// on the backing ChannelStateDB. This avoids holding a single large DB +// transaction that would block other writers on SQLite / Postgres backends. +// +// It takes an optional set of channel statuses which will be written to the +// historical channel bucket to record close initiators. func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, statuses ...ChannelStatus) error { c.Lock() defer c.Unlock() - return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error { - openChanBucket := tx.ReadWriteBucket(openChannelBucket) - if openChanBucket == nil { - return ErrNoChanDBExists - } - - nodePub := c.IdentityPub.SerializeCompressed() - nodeChanBucket := openChanBucket.NestedReadWriteBucket(nodePub) - if nodeChanBucket == nil { - return ErrNoActiveChannels - } - - chainBucket := nodeChanBucket.NestedReadWriteBucket(c.ChainHash[:]) - if chainBucket == nil { - return ErrNoActiveChannels - } - - var chanPointBuf bytes.Buffer - err := graphdb.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint) - if err != nil { - return err - } - chanKey := chanPointBuf.Bytes() - chanBucket := chainBucket.NestedReadWriteBucket( - chanKey, - ) - if chanBucket == nil { - return ErrNoActiveChannels - } - - // Before we delete the channel state, we'll read out the full - // details, as we'll also store portions of this information - // for record keeping. - chanState, err := fetchOpenChannel( - chanBucket, &c.FundingOutpoint, - ) - if err != nil { - return err - } - - // Delete all the forwarding packages stored for this particular - // channel. - if err = chanState.Packager.Wipe(tx); err != nil { - return err - } - - // Now that the index to this channel has been deleted, purge - // the remaining channel metadata from the database. - err = deleteOpenChannel(chanBucket) - if err != nil { - return err - } - - // We'll also remove the channel from the frozen channel bucket - // if we need to. - if c.ChanType.IsFrozen() || c.ChanType.HasLeaseExpiration() { - err := deleteThawHeight(chanBucket) - if err != nil { - return err - } - } - - // With the base channel data deleted, attempt to delete the - // information stored within the revocation log. - if err := deleteLogBucket(chanBucket); err != nil { - return err - } - - err = chainBucket.DeleteNestedBucket(chanPointBuf.Bytes()) - if err != nil { - return err - } - - // Fetch the outpoint bucket to see if the outpoint exists or - // not. - opBucket := tx.ReadWriteBucket(outpointBucket) - if opBucket == nil { - return ErrNoChanDBExists - } - - // Add the closed outpoint to our outpoint index. This should - // replace an open outpoint in the index. - if opBucket.Get(chanPointBuf.Bytes()) == nil { - return ErrMissingIndexEntry - } - - status := uint8(outpointClosed) - - // Write the IndexStatus of this outpoint as the first entry in a tlv - // stream. - statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status) - opStream, err := tlv.NewStream(statusRecord) - if err != nil { - return err - } - - var b bytes.Buffer - if err := opStream.Encode(&b); err != nil { - return err - } - - // Finally add the closed outpoint and tlv stream to the index. - if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil { - return err - } - - // Add channel state to the historical channel bucket. - historicalBucket, err := tx.CreateTopLevelBucket( - historicalChannelBucket, - ) - if err != nil { - return err - } - - historicalChanBucket, err := - historicalBucket.CreateBucketIfNotExists(chanKey) - if err != nil { - return err - } - - // Apply any additional statuses to the channel state. - for _, status := range statuses { - chanState.chanStatus |= status - } - - err = putOpenChannel(historicalChanBucket, chanState) - if err != nil { - return err - } - - // Finally, create a summary of this channel in the closed - // channel bucket for this node. - return putChannelCloseSummary( - tx, chanPointBuf.Bytes(), summary, chanState, - ) - }, func() {}) + closeSummary := *summary + + // The fields ChannelStateDB.CloseChannel needs for bucket navigation + // and for reconstructing the forwarding-package packager in Phase 2 + // come from the channel struct itself, not from the caller-supplied + // summary. The original implementation read them directly from c; + // we preserve that behaviour here so that callers do not have to + // populate these fields in the summary. + closeSummary.RemotePub = c.IdentityPub + closeSummary.ChainHash = c.ChainHash + closeSummary.ShortChanID = c.ShortChannelID + + return c.Db.CloseChannel( + context.Background(), c.FundingOutpoint, &closeSummary, + statuses..., + ) } // ChannelSnapshot is a frozen snapshot of the current channel state. A @@ -4739,6 +4633,15 @@ func readChanConfig(b io.Reader, c *ChannelConfig) error { func fetchChanInfo(chanBucket kvdb.RBucket, channel *OpenChannel) error { infoBytes := chanBucket.Get(chanInfoKey) if infoBytes == nil { + // Distinguish an intentional Phase 1-complete state (where + // CloseChannel deleted chanInfoKey but left the bulk data for + // background purge) from genuine data corruption. The sentinel + // key is written by CloseChannel and removed only when Phase 2 + // finishes deleting the bucket. + if chanBucket.Get(pendingCleanupKey) != nil { + return ErrChannelPendingCleanup + } + return ErrNoChanInfoFound } r := bytes.NewReader(infoBytes) diff --git a/channeldb/channel_closer.go b/channeldb/channel_closer.go new file mode 100644 index 00000000000..e9ae1030e56 --- /dev/null +++ b/channeldb/channel_closer.go @@ -0,0 +1,64 @@ +package channeldb + +import ( + "context" + + "github.com/btcsuite/btcd/wire" +) + +// ChannelCloser manages the lifecycle of channel closure and the cleanup of +// associated bulk historical data. The interface is intentionally backend- +// agnostic: the current KV implementation on *ChannelStateDB satisfies it, +// and a future native-SQL implementation will provide a different concrete +// type that satisfies the same contract without any change to callers. +// +// The close operation is deliberately split into two phases to prevent a +// single large DB transaction from holding the write lock for an unbounded +// amount of time (which is especially harmful on SQLite and Postgres backends, +// where bulk cascade-deletes inside one transaction can block all other +// writers for seconds): +// +// - CloseChannel performs a fast, atomic Phase 1: it records the close +// summary, archives the channel state, updates the outpoint index, and +// deletes the small per-channel keys — but intentionally leaves the bulk +// historical data (revocation log, forwarding packages) in place and +// registers a cleanup task for them. +// +// - PurgeClosedChannelData performs the heavy Phase 2 across many small +// transactions, deleting up to batchSize entries per call so that other +// writers can interleave. It is idempotent and crash-safe: the cleanup +// task registered by CloseChannel persists across restarts and is resumed +// via FetchChannelsPendingCleanup. +type ChannelCloser interface { + // CloseChannel atomically records the channel as closed. It writes + // the close summary to the closed-channel bucket, archives the full + // channel state to the historical bucket, updates the outpoint index, + // deletes the small per-channel state keys, and registers a cleanup + // task for the bulk historical data that is too large to delete here. + // + // NOTE: summary.RemotePub and summary.ChainHash must be populated so + // that the KV implementation can navigate to the channel bucket. + CloseChannel(ctx context.Context, chanPoint wire.OutPoint, + summary *ChannelCloseSummary, + statuses ...ChannelStatus) error + + // PurgeClosedChannelData removes up to batchSize entries of historical + // data for a closed channel (revocation log entries, forwarding + // packages). Each call executes in one or more small transactions. + // Returns done=true once all bulk data has been removed and the cleanup + // task has been deregistered. Safe to call multiple times; idempotent + // once done=true has been returned. + PurgeClosedChannelData(ctx context.Context, chanPoint wire.OutPoint, + batchSize int) (bool, error) + + // FetchChannelsPendingCleanup returns the outpoints of channels that + // have been closed (Phase 1 complete) but whose bulk historical data + // has not yet been fully deleted (Phase 2 incomplete). Used at startup + // to resume any purges that were interrupted by a crash or restart. + FetchChannelsPendingCleanup(ctx context.Context) ([]wire.OutPoint, + error) +} + +// Ensure the current KV-backed channel state store satisfies the backend- +// agnostic close/cleanup contract. +var _ ChannelCloser = (*ChannelStateDB)(nil) diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 47504067780..9f56897efdf 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -964,8 +964,18 @@ func TestChannelStateTransition(t *testing.T) { len(channels)) } - // Attempting to find previous states on the channel should fail as the - // revocation log has been deleted. + // Run Phase 2 to delete the bulk historical data (revocation log and + // forwarding packages). Phase 1 (CloseChannel) intentionally leaves + // them in place to avoid a single long-held write lock. + _, err = cdb.PurgeClosedChannelData( + t.Context(), + channel.FundingOutpoint, + DefaultPurgeBatchSize, + ) + require.NoError(t, err, "phase 2 purge failed") + + // Attempting to find previous states on the channel should fail now + // that the revocation log has been deleted by Phase 2. _, _, err = updatedChannel[0].FindPreviousState( oldRemoteCommit.CommitHeight, ) @@ -1143,6 +1153,13 @@ func TestFetchClosedChannels(t *testing.T) { t.Fatalf("unable to close channel: %v", err) } + expected := *summary + expected.RemotePub = state.IdentityPub + expected.ChainHash = state.ChainHash + expected.ShortChanID = state.ShortChannelID + expected.RemoteCurrentRevocation = state.RemoteCurrentRevocation + expected.RemoteNextRevocation = state.RemoteNextRevocation + // Query the database to ensure that the channel has now been properly // closed. We should get the same result whether querying for pending // channels only, or not. @@ -1152,9 +1169,9 @@ func TestFetchClosedChannels(t *testing.T) { t.Fatalf("incorrect number of pending closed channels: expecting %v,"+ "got %v", 1, len(pendingClosed)) } - if !reflect.DeepEqual(summary, pendingClosed[0]) { + if !reflect.DeepEqual(&expected, pendingClosed[0]) { t.Fatalf("database summaries don't match: expected %v got %v", - spew.Sdump(summary), spew.Sdump(pendingClosed[0])) + spew.Sdump(&expected), spew.Sdump(pendingClosed[0])) } closed, err := cdb.FetchClosedChannels(false) require.NoError(t, err, "failed fetching all closed channels") @@ -1162,9 +1179,9 @@ func TestFetchClosedChannels(t *testing.T) { t.Fatalf("incorrect number of closed channels: expecting %v, "+ "got %v", 1, len(closed)) } - if !reflect.DeepEqual(summary, closed[0]) { + if !reflect.DeepEqual(&expected, closed[0]) { t.Fatalf("database summaries don't match: expected %v got %v", - spew.Sdump(summary), spew.Sdump(closed[0])) + spew.Sdump(&expected), spew.Sdump(closed[0])) } // Mark the channel as fully closed. @@ -1601,6 +1618,33 @@ func TestCloseChannelStatus(t *testing.T) { } } +func TestOpenChannelCloseChannelDoesNotMutateSummary(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + channel := createTestChannel(t, cdb, openChannelOption()) + + localBalance := channel.LocalCommitment.LocalBalance.ToSatoshis() + + summary := &ChannelCloseSummary{ + ChanPoint: channel.FundingOutpoint, + ClosingTXID: rev, + Capacity: channel.Capacity, + SettledBalance: localBalance, + TimeLockedBalance: localBalance + 1, + CloseType: RemoteForceClose, + IsPending: true, + LocalChanConfig: channel.LocalChanCfg, + } + original := *summary + + require.NoError(t, channel.CloseChannel(summary)) + require.Equal(t, original, *summary) +} + // TestHasChanStatus asserts the behavior of HasChanStatus by checking the // behavior of various status flags in addition to the special case of // ChanStatusDefault which is treated like a flag in the code base even though diff --git a/channeldb/close_channel_test.go b/channeldb/close_channel_test.go new file mode 100644 index 00000000000..bc8dfea3aed --- /dev/null +++ b/channeldb/close_channel_test.go @@ -0,0 +1,416 @@ +package channeldb + +import ( + "bytes" + "testing" + + "github.com/btcsuite/btcd/wire" + graphdb "github.com/lightningnetwork/lnd/graph/db" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/stretchr/testify/require" +) + +// writeTestRevlogEntries writes n entries into the revocationLogBucket for +// the given channel. It navigates the raw KV tree directly so that the +// test does not depend on the higher-level commit-chain machinery. +func writeTestRevlogEntries(t *testing.T, ch *OpenChannel, n int) { + t.Helper() + + writeTestRevlogEntriesToBucket(t, ch, revocationLogBucket, n) +} + +// writeTestRevlogEntriesToBucket writes n entries into the specified +// revocation-log bucket for the given channel. +func writeTestRevlogEntriesToBucket(t *testing.T, ch *OpenChannel, + bucketKey []byte, n int) { + + t.Helper() + + err := kvdb.Update(ch.Db.backend, func(tx kvdb.RwTx) error { + openChanBkt := tx.ReadWriteBucket(openChannelBucket) + if openChanBkt == nil { + t.Fatal("openChannelBucket missing") + } + + nodePub := ch.IdentityPub.SerializeCompressed() + nodeBkt := openChanBkt.NestedReadWriteBucket(nodePub) + if nodeBkt == nil { + t.Fatal("node bucket missing") + } + + chainBkt := nodeBkt.NestedReadWriteBucket( + ch.ChainHash[:], + ) + if chainBkt == nil { + t.Fatal("chain bucket missing") + } + + var chanKeyBuf bytes.Buffer + if err := graphdb.WriteOutpoint( + &chanKeyBuf, &ch.FundingOutpoint, + ); err != nil { + return err + } + + chanBkt := chainBkt.NestedReadWriteBucket( + chanKeyBuf.Bytes(), + ) + if chanBkt == nil { + t.Fatal("channel bucket missing") + } + + logBkt, err := chanBkt.CreateBucketIfNotExists(bucketKey) + if err != nil { + return err + } + + for i := range n { + commit := testChannelCommit + commit.CommitHeight = uint64(i) + + if err := putRevocationLog( + logBkt, &commit, 0, 1, false, + ); err != nil { + return err + } + } + + return nil + }, func() {}) + require.NoError(t, err) +} + +// countRevlogEntries returns the number of entries in the +// revocationLogBucket for the given channel, or -1 if the channel bucket +// no longer exists in openChannelBucket (Phase 2 complete). +func countRevlogEntries(t *testing.T, ch *OpenChannel) int { + t.Helper() + + return countRevlogEntriesInBucket(t, ch, revocationLogBucket) +} + +// countRevlogEntriesInBucket returns the number of entries in the specified +// revocation-log bucket for the given channel, or -1 if the channel bucket no +// longer exists in openChannelBucket (Phase 2 complete). +func countRevlogEntriesInBucket(t *testing.T, ch *OpenChannel, + bucketKey []byte) int { + + t.Helper() + + count := -1 + err := kvdb.View(ch.Db.backend, func(tx kvdb.RTx) error { + openChanBkt := tx.ReadBucket(openChannelBucket) + if openChanBkt == nil { + return nil + } + + nodePub := ch.IdentityPub.SerializeCompressed() + nodeBkt := openChanBkt.NestedReadBucket(nodePub) + if nodeBkt == nil { + return nil + } + + chainBkt := nodeBkt.NestedReadBucket(ch.ChainHash[:]) + if chainBkt == nil { + return nil + } + + var chanKeyBuf bytes.Buffer + if err := graphdb.WriteOutpoint( + &chanKeyBuf, &ch.FundingOutpoint, + ); err != nil { + return err + } + + chanBkt := chainBkt.NestedReadBucket(chanKeyBuf.Bytes()) + if chanBkt == nil { + // Channel bucket gone — Phase 2 complete. + return nil + } + + logBkt := chanBkt.NestedReadBucket(bucketKey) + if logBkt == nil { + count = 0 + return nil + } + + c := 0 + if err := logBkt.ForEach(func(k, v []byte) error { + c++ + return nil + }); err != nil { + return err + } + + count = c + + return nil + }, func() {}) + require.NoError(t, err) + + return count +} + +func TestCloseChannelRejectsInvalidSummary(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + ctx := t.Context() + chanPoint := wire.OutPoint{Index: 99} + channel := createTestChannel(t, cdb, openChannelOption()) + + err = cdb.CloseChannel(ctx, chanPoint, nil) + require.ErrorIs(t, err, ErrChannelCloseSummaryNil) + + err = cdb.CloseChannel(ctx, chanPoint, &ChannelCloseSummary{}) + require.ErrorIs(t, err, ErrChannelCloseSummaryMissingRemotePub) + + summary := &ChannelCloseSummary{ + RemotePub: channel.IdentityPub, + } + err = cdb.CloseChannel(ctx, chanPoint, summary) + require.ErrorIs(t, err, ErrNoActiveChannels) +} + +// TestCloseChannelPhase1RemovesFromOpenScans verifies that after Phase 1 +// (ChannelStateDB.CloseChannel) the closed channel no longer appears in +// any open-channel scan (FetchAllChannels, FetchOpenChannels, +// FetchPermAndTempPeers), and that FetchChannelsPendingCleanup returns the +// outpoint so Phase 2 can clean up the bulk data. +func TestCloseChannelPhase1RemovesFromOpenScans(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + ctx := t.Context() + + // Create two open channels so we can verify the zombie channel does + // not contaminate the remaining open one. + ch1 := createTestChannel(t, cdb, openChannelOption()) + ch2 := createTestChannel(t, cdb, openChannelOption()) + + // Write a handful of revlog entries so Phase 2 has something to + // delete. + const numRevlogEntries = 5 + writeTestRevlogEntries(t, ch1, numRevlogEntries) + + // Confirm both channels are visible before we close one. + openChans, err := cdb.FetchAllChannels() + require.NoError(t, err) + require.Len(t, openChans, 2) + + // Phase 1: close ch1. + summary := &ChannelCloseSummary{ + ChanPoint: ch1.FundingOutpoint, + RemotePub: ch1.IdentityPub, + ChainHash: ch1.ChainHash, + CloseType: CooperativeClose, + ShortChanID: ch1.ShortChannelID, + } + require.NoError(t, cdb.CloseChannel( + ctx, ch1.FundingOutpoint, summary, + )) + + // After Phase 1 the channel must not appear in any open-channel scan. + openChans, err = cdb.FetchAllChannels() + require.NoError(t, err) + require.Len(t, openChans, 1, "closed channel must not appear") + require.Equal( + t, ch2.FundingOutpoint, openChans[0].FundingOutpoint, + ) + + // Both test channels share the same IdentityPub, so after ch1 is + // closed only ch2 should be returned. + openChans, err = cdb.FetchOpenChannels(ch1.IdentityPub) + require.NoError(t, err) + require.Len(t, openChans, 1) + require.Equal( + t, ch2.FundingOutpoint, openChans[0].FundingOutpoint, + ) + + // FetchPermAndTempPeers must not error on the zombie bucket. + _, err = cdb.FetchPermAndTempPeers(ch1.ChainHash[:]) + require.NoError(t, err) + + // The cleanup task must be registered so a restart can resume it. + pending, err := cdb.FetchChannelsPendingCleanup(ctx) + require.NoError(t, err) + require.Len(t, pending, 1) + require.Equal(t, ch1.FundingOutpoint, pending[0]) + + // The revlog entries must still be present (Phase 2 not run yet). + require.Equal(t, numRevlogEntries, countRevlogEntries(t, ch1)) +} + +// TestCloseChannelPhase2PurgesDataInBatches verifies that +// PurgeClosedChannelData deletes revlog entries in small batches, that it +// is idempotent once done=true has been returned, and that afterwards +// FetchChannelsPendingCleanup no longer lists the channel. +func TestCloseChannelPhase2PurgesDataInBatches(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + ctx := t.Context() + + ch := createTestChannel(t, cdb, openChannelOption()) + + const numRevlogEntries = 7 + writeTestRevlogEntries(t, ch, numRevlogEntries) + + summary := &ChannelCloseSummary{ + ChanPoint: ch.FundingOutpoint, + RemotePub: ch.IdentityPub, + ChainHash: ch.ChainHash, + CloseType: CooperativeClose, + } + require.NoError(t, cdb.CloseChannel( + ctx, ch.FundingOutpoint, summary, + )) + + // Ensure cleanup is registered. + pending, err := cdb.FetchChannelsPendingCleanup(ctx) + require.NoError(t, err) + require.Len(t, pending, 1) + + // Drive Phase 2 with a batch size of 2 — it must take multiple + // calls before completing. + const batchSize = 2 + var done bool + for range 20 { + done, err = cdb.PurgeClosedChannelData( + ctx, ch.FundingOutpoint, batchSize, + ) + require.NoError(t, err) + + if done { + break + } + } + require.True(t, done, "Phase 2 must eventually complete") + + // The channel bucket must be gone (countRevlogEntries returns -1). + require.Equal(t, -1, countRevlogEntries(t, ch), + "channel bucket must be deleted after Phase 2") + + // The cleanup task must be deregistered. + pending, err = cdb.FetchChannelsPendingCleanup(ctx) + require.NoError(t, err) + require.Empty(t, pending) + + // Calling PurgeClosedChannelData again must be idempotent. + done, err = cdb.PurgeClosedChannelData( + ctx, ch.FundingOutpoint, batchSize, + ) + require.NoError(t, err) + require.True(t, done) +} + +// TestCloseChannelPhase2PurgesMixedRevocationLogBuckets verifies that Phase 2 +// removes entries from both the current and deprecated revocation-log buckets +// before deleting the channel bucket and deregistering pending cleanup. +func TestCloseChannelPhase2PurgesMixedRevocationLogBuckets(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + ctx := t.Context() + + ch := createTestChannel(t, cdb, openChannelOption()) + + writeTestRevlogEntriesToBucket(t, ch, revocationLogBucket, 3) + writeTestRevlogEntriesToBucket( + t, ch, revocationLogBucketDeprecated, 4, + ) + + summary := &ChannelCloseSummary{ + ChanPoint: ch.FundingOutpoint, + RemotePub: ch.IdentityPub, + ChainHash: ch.ChainHash, + CloseType: CooperativeClose, + } + require.NoError(t, cdb.CloseChannel( + ctx, ch.FundingOutpoint, summary, + )) + + var done bool + for range 20 { + done, err = cdb.PurgeClosedChannelData( + ctx, ch.FundingOutpoint, 2, + ) + require.NoError(t, err) + + if done { + break + } + } + require.True(t, done, "Phase 2 must eventually complete") + + require.Equal(t, -1, countRevlogEntriesInBucket( + t, ch, revocationLogBucket, + )) + require.Equal(t, -1, countRevlogEntriesInBucket( + t, ch, revocationLogBucketDeprecated, + )) + + pending, err := cdb.FetchChannelsPendingCleanup(ctx) + require.NoError(t, err) + require.Empty(t, pending) +} + +// TestCloseChannelPendingCleanupPersists verifies that a cleanup task +// registered by Phase 1 is still visible via FetchChannelsPendingCleanup +// when queried from a fresh ChannelStateDB wrapping the same backend, +// simulating the startup scan after a node restart. +func TestCloseChannelPendingCleanupPersists(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + ctx := t.Context() + + ch := createTestChannel(t, cdb, openChannelOption()) + writeTestRevlogEntries(t, ch, 3) + + summary := &ChannelCloseSummary{ + ChanPoint: ch.FundingOutpoint, + RemotePub: ch.IdentityPub, + ChainHash: ch.ChainHash, + CloseType: CooperativeClose, + } + require.NoError(t, cdb.CloseChannel( + ctx, ch.FundingOutpoint, summary, + )) + + // Simulate a node restart by creating a new ChannelStateDB wrapping + // the same backend. The cleanup task must survive. + freshCDB := &ChannelStateDB{backend: cdb.backend} + + pending, err := freshCDB.FetchChannelsPendingCleanup(ctx) + require.NoError(t, err) + require.Len(t, pending, 1) + require.Equal(t, ch.FundingOutpoint, pending[0]) + + // Phase 2 on the fresh instance must complete successfully. + done, err := freshCDB.PurgeClosedChannelData( + ctx, ch.FundingOutpoint, DefaultPurgeBatchSize, + ) + require.NoError(t, err) + require.True(t, done) + + // After Phase 2 the cleanup task must be gone. + pending, err = freshCDB.FetchChannelsPendingCleanup(ctx) + require.NoError(t, err) + require.Empty(t, pending) +} diff --git a/channeldb/db.go b/channeldb/db.go index 999d2cdcc0f..a45101e2d0c 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -37,6 +37,7 @@ import ( "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/tlv" "github.com/stretchr/testify/require" ) @@ -49,6 +50,16 @@ var ( // but we intentionally did not commit the result. ErrDryRunMigrationOK = errors.New("dry run migration successful") + // ErrChannelCloseSummaryNil signals that CloseChannel was called with + // a nil close summary. + ErrChannelCloseSummaryNil = errors.New("channel close summary is nil") + + // ErrChannelCloseSummaryMissingRemotePub signals that CloseChannel was + // called without the remote pubkey needed to locate the channel bucket. + ErrChannelCloseSummaryMissingRemotePub = errors.New( + "channel close summary missing remote pubkey", + ) + // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs // bucket does not exist. ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " + @@ -452,6 +463,30 @@ func (d *DB) Path() string { return d.dbPath } +const ( + // DefaultPurgeBatchSize is the number of revocation-log entries + // deleted per transaction during PurgeClosedChannelData. Keeping + // batches small limits how long each individual write transaction holds + // the DB lock, allowing other writers to interleave between batches. + DefaultPurgeBatchSize = 500 +) + +var ( + // pendingChanCleanupBucket tracks channels whose bulk historical data + // (revocation log, forwarding packages) has not yet been deleted after + // close. Each key is a serialized wire.OutPoint; the value is empty. + // The presence of a key means Phase 2 (PurgeClosedChannelData) is still + // outstanding for that channel. + pendingChanCleanupBucket = []byte("pending-chan-cleanup") + + // pendingCleanupKey is written into a channel's own bucket by Phase 1 + // (CloseChannel) to mark that bulk data deletion is still pending. It + // allows fetchNodeChannels to distinguish intentional Phase 1 state + // from genuine data corruption when chanInfoKey is absent. The key is + // removed automatically when Phase 2 deletes the channel bucket. + pendingCleanupKey = []byte("phase2-pending") +) + var dbTopLevelBuckets = [][]byte{ openChannelBucket, closedChannelBucket, @@ -467,6 +502,7 @@ var dbTopLevelBuckets = [][]byte{ outpointBucket, chanIDBucket, historicalChannelBucket, + pendingChanCleanupBucket, } // Wipe completely deletes all saved state within all used buckets within the @@ -663,6 +699,12 @@ func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) ( return err } oChannel, err := fetchOpenChannel(chanBucket, &outPoint) + if errors.Is(err, ErrChannelPendingCleanup) { + // Phase 1 complete, Phase 2 not yet run: the channel is + // logically closed but the bulk data has not yet been + // purged. Not an open channel — skip. + return nil + } if err != nil { return fmt.Errorf("unable to read channel data for "+ "chan_point=%v: %w", outPoint, err) @@ -824,6 +866,12 @@ func (c *ChannelStateDB) FetchPermAndTempPeers( openChan, err := fetchOpenChannel( chanBucket, &op, ) + if errors.Is(err, ErrChannelPendingCleanup) { + // Channel is logically closed (Phase 1 + // done) but bulk data not yet purged. + // Not an open channel — skip. + return nil + } if err != nil { return err } @@ -1034,6 +1082,13 @@ func (c *ChannelStateDB) channelScanner(tx kvdb.RTx, channel, err := fetchOpenChannel( chanBucket, chanPoint, ) + if errors.Is(err, ErrChannelPendingCleanup) { + // Channel is logically closed (Phase 1 + // done) but bulk data not yet purged. + // Treat as not found for open-channel + // lookups. + return ErrChannelNotFound + } if err != nil { return err } @@ -1447,6 +1502,408 @@ func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { }, func() {}) } +// Compile-time assertion: *ChannelStateDB must satisfy ChannelCloser. +var _ ChannelCloser = (*ChannelStateDB)(nil) + +// CloseChannel atomically records the closure of the channel identified by +// chanPoint. It is the Phase 1 implementation of the ChannelCloser interface: +// it writes the close summary, archives the full channel state, updates the +// outpoint index, and deletes the small per-channel state keys. The bulk +// historical data (revocation log, forwarding packages) is intentionally left +// in place and a cleanup task is registered in pendingChanCleanupBucket so +// that Phase 2 (PurgeClosedChannelData) can remove it incrementally across +// many small transactions, avoiding a single long-held write lock. +// +// NOTE: summary.RemotePub and summary.ChainHash must be populated. +func (c *ChannelStateDB) CloseChannel(ctx context.Context, + chanPoint wire.OutPoint, summary *ChannelCloseSummary, + statuses ...ChannelStatus) error { + + switch { + case summary == nil: + return ErrChannelCloseSummaryNil + + case summary.RemotePub == nil: + return ErrChannelCloseSummaryMissingRemotePub + } + + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + if openChanBucket == nil { + return ErrNoChanDBExists + } + + nodePub := summary.RemotePub.SerializeCompressed() + nodeChanBucket := openChanBucket.NestedReadWriteBucket(nodePub) + if nodeChanBucket == nil { + return ErrNoActiveChannels + } + + chainBucket := nodeChanBucket.NestedReadWriteBucket( + summary.ChainHash[:], + ) + if chainBucket == nil { + return ErrNoActiveChannels + } + + var chanPointBuf bytes.Buffer + if err := graphdb.WriteOutpoint( + &chanPointBuf, &chanPoint, + ); err != nil { + return err + } + chanKey := chanPointBuf.Bytes() + + chanBucket := chainBucket.NestedReadWriteBucket(chanKey) + if chanBucket == nil { + return ErrNoActiveChannels + } + + // Read the full channel state before we start modifying + // anything — we need it for the historical archive and to + // determine whether the channel is frozen. + chanState, err := fetchOpenChannel(chanBucket, &chanPoint) + if err != nil { + return err + } + + // Delete the small per-channel state keys. The revocation log + // nested buckets and the channel bucket itself are left intact + // intentionally; they are removed by PurgeClosedChannelData. + if err := deleteOpenChannel(chanBucket); err != nil { + return err + } + if chanState.ChanType.IsFrozen() || + chanState.ChanType.HasLeaseExpiration() { + + if err := deleteThawHeight(chanBucket); err != nil { + return err + } + } + + // Write a sentinel into the channel bucket so that + // fetchNodeChannels can tell the difference between this + // intentional Phase 1 state (chanInfoKey absent, bulk data + // still present) and genuine data corruption (chanInfoKey + // absent for unknown reasons). The sentinel is removed + // automatically when Phase 2 deletes the channel bucket. + err = chanBucket.Put(pendingCleanupKey, []byte{}) + if err != nil { + return err + } + + // Update the outpoint index: mark this outpoint as closed. + opBucket := tx.ReadWriteBucket(outpointBucket) + if opBucket == nil { + return ErrNoChanDBExists + } + if opBucket.Get(chanKey) == nil { + return ErrMissingIndexEntry + } + closeStatus := uint8(outpointClosed) + statusRecord := tlv.MakePrimitiveRecord( + indexStatusType, &closeStatus, + ) + opStream, err := tlv.NewStream(statusRecord) + if err != nil { + return err + } + var statusBuf bytes.Buffer + if err := opStream.Encode(&statusBuf); err != nil { + return err + } + if err := opBucket.Put(chanKey, statusBuf.Bytes()); err != nil { + return err + } + + // Archive the full channel state in the historical bucket. + historicalBucket, err := tx.CreateTopLevelBucket( + historicalChannelBucket, + ) + if err != nil { + return err + } + historicalChanBucket, err := + historicalBucket.CreateBucketIfNotExists(chanKey) + if err != nil { + return err + } + for _, s := range statuses { + chanState.chanStatus |= s + } + err = putOpenChannel(historicalChanBucket, chanState) + if err != nil { + return err + } + + // Write the close summary to the closed-channel bucket. + if err := putChannelCloseSummary( + tx, chanKey, summary, chanState, + ); err != nil { + return err + } + + // Register a cleanup task so that PurgeClosedChannelData knows + // to finish Phase 2 even if we restart before it completes. + cleanupBucket, err := tx.CreateTopLevelBucket( + pendingChanCleanupBucket, + ) + if err != nil { + return err + } + + return cleanupBucket.Put(chanKey, []byte{}) + }, func() {}) +} + +// PurgeClosedChannelData is the Phase 2 implementation of the ChannelCloser +// interface. It removes bulk historical data for a closed channel that was +// deliberately left in place by CloseChannel to keep its transaction short. +// +// The revocation log (both current and deprecated buckets) is deleted in +// batches of batchSize entries, each batch in its own transaction, so that +// other writers can interleave between batches. Once all revocation log data +// is gone the now-empty channel bucket is deleted, forwarding packages are +// wiped, and the cleanup task is deregistered. +// +// The function is crash-safe and idempotent: if the process restarts mid- +// purge, FetchChannelsPendingCleanup will return this channel and the caller +// can simply invoke PurgeClosedChannelData again to resume where it left off. +// Returns done=true once all cleanup is complete. +func (c *ChannelStateDB) PurgeClosedChannelData(ctx context.Context, + chanPoint wire.OutPoint, batchSize int) (bool, error) { + + // Look up the close summary — it carries RemotePub, ChainHash, and + // ShortChanID, which are everything we need to navigate to the channel + // bucket and reconstruct the forwarding-package packager. + summary, err := c.FetchClosedChannel(&chanPoint) + if err != nil { + return false, fmt.Errorf("fetching closed channel for "+ + "purge: %w", err) + } + + var chanPointBuf bytes.Buffer + if err := graphdb.WriteOutpoint(&chanPointBuf, &chanPoint); err != nil { + return false, err + } + chanKey := chanPointBuf.Bytes() + + nodePub := summary.RemotePub.SerializeCompressed() + chainHash := summary.ChainHash[:] + + // navigateRO returns the channel bucket for a read transaction, + // or nil if any intermediate bucket is absent. + navigateRO := func(tx kvdb.RTx) kvdb.RBucket { + openBkt := tx.ReadBucket(openChannelBucket) + if openBkt == nil { + return nil + } + nodeBkt := openBkt.NestedReadBucket(nodePub) + if nodeBkt == nil { + return nil + } + chainBkt := nodeBkt.NestedReadBucket(chainHash) + if chainBkt == nil { + return nil + } + + return chainBkt.NestedReadBucket(chanKey) + } + + // navigateRW returns the chain bucket and channel bucket for a write + // transaction. Either may be nil if the bucket no longer exists. + navigateRW := func(tx kvdb.RwTx) (kvdb.RwBucket, kvdb.RwBucket) { + openBkt := tx.ReadWriteBucket(openChannelBucket) + if openBkt == nil { + return nil, nil + } + nodeBkt := openBkt.NestedReadWriteBucket(nodePub) + if nodeBkt == nil { + return nil, nil + } + chainBkt := nodeBkt.NestedReadWriteBucket(chainHash) + if chainBkt == nil { + return nil, nil + } + + return chainBkt, chainBkt.NestedReadWriteBucket(chanKey) + } + + // purgeBucketBatched drains all entries from the nested bucket + // identified by nestedKey within the channel bucket. Each batch of + // up to batchSize keys is collected in a read transaction and then + // deleted in a separate write transaction, so the write lock is held + // only briefly per batch. + purgeBucketBatched := func(nestedKey []byte) error { + for { + if err := ctx.Err(); err != nil { + return err + } + + // Collect the next batch of keys to delete. + var keys [][]byte + err := kvdb.View(c.backend, func(tx kvdb.RTx) error { + if err := ctx.Err(); err != nil { + return err + } + + chanBkt := navigateRO(tx) + if chanBkt == nil { + return nil + } + logBkt := chanBkt.NestedReadBucket(nestedKey) + if logBkt == nil { + return nil + } + + cursor := logBkt.ReadCursor() + k, _ := cursor.First() + for k != nil && len(keys) < batchSize { + keyCopy := make([]byte, len(k)) + copy(keyCopy, k) + keys = append(keys, keyCopy) + k, _ = cursor.Next() + } + + return nil + }, func() { + keys = nil + }) + if err != nil { + return err + } + if len(keys) == 0 { + return nil + } + + // Delete this batch. + err = kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + if err := ctx.Err(); err != nil { + return err + } + + _, chanBkt := navigateRW(tx) + if chanBkt == nil { + return nil + } + logBkt := chanBkt.NestedReadWriteBucket( + nestedKey, + ) + if logBkt == nil { + return nil + } + + for _, k := range keys { + if err := logBkt.Delete(k); err != nil { + return err + } + } + + return nil + }, func() {}) + if err != nil { + return err + } + } + } + + // Drain both the current and the deprecated revocation log buckets. + if err := purgeBucketBatched(revocationLogBucket); err != nil { + return false, fmt.Errorf("purging revocation log: %w", err) + } + if err := purgeBucketBatched(revocationLogBucketDeprecated); err != nil { //nolint:ll + return false, fmt.Errorf("purging deprecated revocation "+ + "log: %w", err) + } + + // Final cleanup: the revocation log buckets are now empty so the + // cascade on DeleteNestedBucket will only touch a handful of rows. + // We also wipe forwarding packages (usually empty at close time) and + // remove the pending-cleanup marker in the same transaction. + err = kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + if err := ctx.Err(); err != nil { + return err + } + + chainBkt, _ := navigateRW(tx) + if chainBkt != nil { + err := chainBkt.DeleteNestedBucket(chanKey) + if err != nil && !errors.Is( + err, walletdb.ErrBucketNotFound, + ) { + + return err + } + } + + // Wipe forwarding packages for this channel. At close time + // these are usually empty (cleaned up incrementally during + // normal HTLC processing), but we handle them here for + // correctness. + if err := ctx.Err(); err != nil { + return err + } + + packager := NewChannelPackager(summary.ShortChanID) + if err := packager.Wipe(tx); err != nil { + return err + } + + // Deregister the cleanup task. + cleanupBkt := tx.ReadWriteBucket(pendingChanCleanupBucket) + if cleanupBkt == nil { + return nil + } + + return cleanupBkt.Delete(chanKey) + }, func() {}) + if err != nil { + return false, fmt.Errorf("final channel purge cleanup: %w", + err) + } + + return true, nil +} + +// FetchChannelsPendingCleanup returns the outpoints of all channels that have +// completed Phase 1 (CloseChannel) but whose bulk historical data has not yet +// been fully deleted by Phase 2 (PurgeClosedChannelData). Callers should +// invoke PurgeClosedChannelData for each returned outpoint to resume the +// cleanup, typically at startup after a crash or restart interrupted an +// in-progress purge. +func (c *ChannelStateDB) FetchChannelsPendingCleanup( + ctx context.Context) ([]wire.OutPoint, error) { + + var outpoints []wire.OutPoint + + err := kvdb.View(c.backend, func(tx kvdb.RTx) error { + cleanupBkt := tx.ReadBucket(pendingChanCleanupBucket) + if cleanupBkt == nil { + return nil + } + + return cleanupBkt.ForEach(func(k, _ []byte) error { + var op wire.OutPoint + if err := graphdb.ReadOutpoint( + bytes.NewReader(k), &op, + ); err != nil { + return err + } + + outpoints = append(outpoints, op) + + return nil + }) + }, func() { + outpoints = nil + }) + if err != nil { + return nil, err + } + + return outpoints, nil +} + // pruneLinkNode determines whether we should garbage collect a link node from // the database due to no longer having any open channels with it. // diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index eac63cb32d1..bc47457ea9b 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -285,6 +285,8 @@ type ChainArbitrator struct { // cleanups. resolvedChan chan wire.OutPoint + cleanupMgr *closedChannelCleanupManager + quit chan struct{} wg sync.WaitGroup @@ -304,6 +306,10 @@ func NewChainArbitrator(cfg ChainArbitratorConfig, resolvedChan: make(chan wire.OutPoint), } + c.cleanupMgr = newClosedChannelCleanupManager( + c.quit, newClosedChannelPurgeFn(db.ChannelStateDB()), + ) + // Mount the block consumer. c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name()) @@ -452,7 +458,14 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, return err } c.cfg.NotifyClosedChannel(summary.ChanPoint) - return nil + + // Phase 1 is complete. Immediately start Phase 2 + // (bulk data deletion) in the background so cleanup + // doesn't wait for the next node restart. If the node + // shuts down mid-purge the startup scan in server.go + // will resume it automatically. + return c.enqueueClosedChannelCleanup(summary.ChanPoint) + }, IsPendingClose: false, ChainArbitratorConfig: c.cfg, @@ -589,6 +602,10 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // Set the current beat. c.beat = beat + if err := c.cleanupMgr.Start(); err != nil { + return err + } + // Start the goroutine which listens for signals to mark the channel as // resolved. // @@ -918,6 +935,7 @@ func (c *ChainArbitrator) Stop() error { defer log.Debug("ChainArbitrator shutdown complete") close(c.quit) + defer c.cleanupMgr.Stop() var ( activeWatchers = make(map[wire.OutPoint]*chainWatcher) @@ -960,6 +978,27 @@ func (c *ChainArbitrator) Stop() error { return nil } +// enqueueClosedChannelCleanup registers a closed channel for Phase 2 cleanup. +func (c *ChainArbitrator) enqueueClosedChannelCleanup( + chanPoint wire.OutPoint) error { + + return c.cleanupMgr.Enqueue(chanPoint) +} + +// ResumeClosedChannelCleanup registers closed channels discovered during +// startup so Phase 2 cleanup can resume through the arbitrator-owned manager. +func (c *ChainArbitrator) ResumeClosedChannelCleanup( + chanPoints ...wire.OutPoint) error { + + for _, chanPoint := range chanPoints { + if err := c.enqueueClosedChannelCleanup(chanPoint); err != nil { + return err + } + } + + return nil +} + // ContractUpdate is a message packages the latest set of active HTLCs on a // commitment, and also identifies which commitment received a new set of // HTLCs. diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go index 622686f76c4..4ecc4a74dd8 100644 --- a/contractcourt/chain_arbitrator_test.go +++ b/contractcourt/chain_arbitrator_test.go @@ -1,8 +1,11 @@ package contractcourt import ( + "context" "net" + "sync/atomic" "testing" + "time" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" @@ -219,3 +222,175 @@ func TestResolveContract(t *testing.T) { err = chainArb.ResolveContract(channel.FundingOutpoint) require.NoError(t, err, "second resolve call shouldn't fail") } + +// TestMarkChannelClosedTriggersRuntimeCleanup verifies that closing a channel +// through the chain arbitrator immediately starts Phase 2 cleanup instead of +// waiting for the next process restart. +func TestMarkChannelClosedTriggersRuntimeCleanup(t *testing.T) { + t.Parallel() + + db := channeldb.OpenForTesting(t, t.TempDir()) + + lChannel, _, err := lnwallet.CreateTestChannels( + t, channeldb.SingleFunderTweaklessBit, + ) + require.NoError(t, err) + + channel := lChannel.State() + channel.Db = db.ChannelStateDB() + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + require.NoError(t, channel.SyncPending(addr, 101)) + + chainArb := NewChainArbitrator(ChainArbitratorConfig{ + ChainIO: &mock.ChainIO{}, + Notifier: &mock.ChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail), + ConfChan: make(chan *chainntnfs.TxConfirmation), + }, + NotifyClosedChannel: func(wire.OutPoint) {}, + Clock: clock.NewDefaultClock(), + Budget: *DefaultBudgetConfig(), + }, db) + require.NoError(t, chainArb.cleanupMgr.Start()) + t.Cleanup(func() { + require.NoError(t, chainArb.Stop()) + }) + + channelArb, err := newActiveChannelArbitrator(channel, chainArb, nil) + require.NoError(t, err) + + summary := &channeldb.ChannelCloseSummary{ + ChanPoint: channel.FundingOutpoint, + RemotePub: channel.IdentityPub, + ChainHash: channel.ChainHash, + CloseType: channeldb.CooperativeClose, + ShortChanID: channel.ShortChannelID, + } + require.NoError(t, channelArb.cfg.MarkChannelClosed(summary)) + + require.Eventually(t, func() bool { + pending, err := db.ChannelStateDB().FetchChannelsPendingCleanup( + t.Context(), + ) + + return err == nil && len(pending) == 0 + }, time.Second, 10*time.Millisecond) +} + +func TestClosedChannelCleanupManagerSkipsDuplicateRegistration(t *testing.T) { + t.Parallel() + + var ( + callCount int32 + blocker = make(chan struct{}) + quit = make(chan struct{}) + ) + + manager := newClosedChannelCleanupManager(quit, + func(context.Context, wire.OutPoint) (bool, error) { + atomic.AddInt32(&callCount, 1) + <-blocker + return true, nil + }, + ) + require.NoError(t, manager.Start()) + + chanPoint := wire.OutPoint{Index: 7} + require.NoError(t, manager.Enqueue(chanPoint)) + + require.Eventually(t, func() bool { + return atomic.LoadInt32(&callCount) == 1 + }, time.Second, 10*time.Millisecond) + + require.NoError(t, manager.Enqueue(chanPoint)) + + close(blocker) + manager.Stop() + + require.EqualValues(t, 1, atomic.LoadInt32(&callCount)) +} + +func TestClosedChannelCleanupManagerDrainsPreStartQueue(t *testing.T) { + t.Parallel() + + var ( + callCount int32 + quit = make(chan struct{}) + ) + + manager := newClosedChannelCleanupManager(quit, + func(context.Context, wire.OutPoint) (bool, error) { + atomic.AddInt32(&callCount, 1) + return true, nil + }, + ) + + require.NoError(t, manager.Enqueue(wire.OutPoint{Index: 11})) + require.NoError(t, manager.Start()) + + require.Eventually(t, func() bool { + return atomic.LoadInt32(&callCount) == 1 + }, time.Second, 10*time.Millisecond) + + manager.Stop() +} + +func TestClosedChannelCleanupManagerResumesPartialCleanup(t *testing.T) { + t.Parallel() + + var ( + chanPoint = wire.OutPoint{Index: 21} + phase int32 = 1 + callCount int32 + quit1 = make(chan struct{}) + ) + + var remaining int32 = 2 + purgeFn := func(ctx context.Context, cp wire.OutPoint) (bool, error) { + require.Equal(t, chanPoint, cp) + + currentCall := atomic.AddInt32(&callCount, 1) + switch currentCall { + case 1: + atomic.AddInt32(&remaining, -1) + return false, nil + + case 2: + <-ctx.Done() + return false, ctx.Err() + + default: + require.EqualValues(t, 2, atomic.LoadInt32(&phase)) + atomic.AddInt32(&remaining, -1) + return true, nil + } + } + + manager1 := newClosedChannelCleanupManager(quit1, purgeFn) + require.NoError(t, manager1.Enqueue(chanPoint)) + require.NoError(t, manager1.Start()) + + require.Eventually(t, func() bool { + return atomic.LoadInt32(&callCount) >= 2 + }, time.Second, 10*time.Millisecond) + + manager1.Stop() + + require.EqualValues(t, 1, atomic.LoadInt32(&remaining)) + + atomic.StoreInt32(&phase, 2) + quit2 := make(chan struct{}) + manager2 := newClosedChannelCleanupManager(quit2, purgeFn) + require.NoError(t, manager2.Enqueue(chanPoint)) + require.NoError(t, manager2.Start()) + + require.Eventually(t, func() bool { + return atomic.LoadInt32(&remaining) == 0 + }, time.Second, 10*time.Millisecond) + + manager2.Stop() +} diff --git a/contractcourt/closed_channel_cleanup.go b/contractcourt/closed_channel_cleanup.go new file mode 100644 index 00000000000..64ab56a72b2 --- /dev/null +++ b/contractcourt/closed_channel_cleanup.go @@ -0,0 +1,276 @@ +package contractcourt + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" +) + +type closedChannelPurgeFn func(context.Context, wire.OutPoint) (bool, error) + +// closedChannelCleanupManager runs Phase 2 closed-channel cleanup with bounded +// concurrency and prevents the same channel from being registered twice at the +// same time. +type closedChannelCleanupManager struct { + started int32 + stopped int32 + + purgeFn closedChannelPurgeFn + shutdown <-chan struct{} + quit chan struct{} + closeQuit sync.Once + + mu sync.Mutex + registered map[wire.OutPoint]struct{} + pending []wire.OutPoint + + work chan wire.OutPoint + notify chan struct{} + wg sync.WaitGroup +} + +// newClosedChannelCleanupManager constructs a cleanup manager that reacts to +// parent shutdown while owning its own local stop signal. +func newClosedChannelCleanupManager(shutdown <-chan struct{}, + purgeFn closedChannelPurgeFn) *closedChannelCleanupManager { + + return &closedChannelCleanupManager{ + purgeFn: purgeFn, + shutdown: shutdown, + quit: make(chan struct{}), + registered: make(map[wire.OutPoint]struct{}), + work: make(chan wire.OutPoint), + notify: make(chan struct{}, 1), + } +} + +// Start launches the cleanup manager's dispatcher and worker goroutines. +func (m *closedChannelCleanupManager) Start() error { + if !atomic.CompareAndSwapInt32(&m.started, 0, 1) { + return nil + } + + log.Debug("Starting closed-channel cleanup manager") + + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.dispatch() + }() + + m.wg.Add(1) + go func() { + defer m.wg.Done() + + for { + select { + case chanPoint := <-m.work: + m.cleanupChannel(chanPoint) + + case <-m.shutdown: + return + case <-m.quit: + return + } + } + }() + + m.signal() + + return nil +} + +// Stop signals the cleanup manager to exit and waits for all goroutines to +// shut down. +func (m *closedChannelCleanupManager) Stop() { + if !atomic.CompareAndSwapInt32(&m.stopped, 0, 1) { + return + } + + log.Debug("Stopping closed-channel cleanup manager") + + m.closeQuit.Do(func() { + close(m.quit) + }) + + m.wg.Wait() + + log.Debug("Closed-channel cleanup manager stopped") +} + +// Enqueue registers a closed channel for Phase 2 cleanup and schedules it for +// dispatch once the manager is running. +func (m *closedChannelCleanupManager) Enqueue(chanPoint wire.OutPoint) error { + m.mu.Lock() + if _, ok := m.registered[chanPoint]; ok { + m.mu.Unlock() + + log.Warnf("Closed-channel cleanup already registered for %v, "+ + "skipping duplicate enqueue", chanPoint) + + return nil + } + + m.registered[chanPoint] = struct{}{} + started := atomic.LoadInt32(&m.started) == 1 + m.pending = append(m.pending, chanPoint) + m.mu.Unlock() + + log.Debugf("Registered closed-channel cleanup for %v (started=%v)", + chanPoint, started) + + if !started { + return nil + } + + m.signal() + + return nil +} + +// dispatch forwards registered channels from the pending queue to the cleanup +// worker until the manager is shut down. +func (m *closedChannelCleanupManager) dispatch() { + for { + select { + case <-m.notify: + log.Debug("Closed-channel cleanup dispatcher woke up") + case <-m.shutdown: + log.Debug("Closed-channel cleanup dispatcher " + + "shutting down due to parent shutdown") + return + case <-m.quit: + log.Debug("Closed-channel cleanup dispatcher " + + "shutting down") + return + } + + for { + chanPoint, ok := m.nextPending() + if !ok { + break + } + + log.Debugf("Dispatching closed-channel cleanup for %v", + chanPoint) + + select { + case m.work <- chanPoint: + case <-m.shutdown: + log.Debug("Closed-channel cleanup dispatch " + + "interrupted by parent shutdown") + return + case <-m.quit: + log.Debug("Closed-channel cleanup dispatch " + + "interrupted by manager stop") + return + } + } + } +} + +// nextPending returns the next channel awaiting dispatch, if any. +func (m *closedChannelCleanupManager) nextPending() (wire.OutPoint, bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if len(m.pending) == 0 { + return wire.OutPoint{}, false + } + + chanPoint := m.pending[0] + m.pending = m.pending[1:] + + return chanPoint, true +} + +// signal wakes the dispatcher when pending cleanup work is available. +func (m *closedChannelCleanupManager) signal() { + select { + case m.notify <- struct{}{}: + default: + } +} + +// cleanupChannel runs Phase 2 cleanup for a single closed channel until it is +// fully purged or canceled by shutdown. +func (m *closedChannelCleanupManager) cleanupChannel(chanPoint wire.OutPoint) { + log.Debugf("Starting closed-channel cleanup for %v", chanPoint) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + select { + case <-m.shutdown: + cancel() + case <-m.quit: + cancel() + case <-ctx.Done(): + } + }() + + for { + done, err := m.purgeFn(ctx, chanPoint) + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorf("Failed to purge closed channel "+ + "data for %v: %v", chanPoint, err) + } else { + log.Debugf("Closed-channel cleanup canceled "+ + "for %v", chanPoint) + } + + m.finish(chanPoint) + + return + } + + if done { + log.Debugf("Closed-channel cleanup completed for %v", + chanPoint) + m.finish(chanPoint) + + return + } + + log.Debugf("Closed-channel cleanup has more work pending "+ + "for %v", chanPoint) + } +} + +// finish removes a channel from the registered set after cleanup completes or +// exits, allowing it to be re-enqueued later if needed. +func (m *closedChannelCleanupManager) finish(chanPoint wire.OutPoint) { + m.mu.Lock() + delete(m.registered, chanPoint) + m.mu.Unlock() + + log.Debugf("Closed-channel cleanup registration cleared for %v", + chanPoint) +} + +// newClosedChannelPurgeFn adapts the channeldb ChannelCloser interface to the +// cleanup manager's single-channel purge function. +func newClosedChannelPurgeFn( + chanCloser channeldb.ChannelCloser) closedChannelPurgeFn { + + return func(ctx context.Context, + chanPoint wire.OutPoint) (bool, error) { + + done, err := chanCloser.PurgeClosedChannelData( + ctx, chanPoint, channeldb.DefaultPurgeBatchSize, + ) + if err != nil { + return false, fmt.Errorf("purging closed channel"+ + "data: %w", err) + } + + return done, nil + } +} diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index b8a37789fb8..16d996011cb 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -409,6 +409,17 @@ `native-sql` setting enabled. +* [Channel close is now split into two phases to avoid long write-lock + hold times on the KV-over-SQL + backends](https://github.com/lightningnetwork/lnd/pull/10732). Phase 1 + atomically removes the channel from all open-channel views in a small + O(1) transaction. Phase 2 deletes the bulk historical data (revocation + log entries, forwarding packages) in small batches of + `DefaultPurgeBatchSize` (500) entries per transaction, keeping each + write-lock window short. Pending Phase 2 cleanups are persisted and + automatically resumed on node restart, so no data is left dangling + after an interrupted shutdown. + ## Code Health * [Update taproot detection](https://github.com/lightningnetwork/lnd/pull/10683) diff --git a/server.go b/server.go index 9eddf92e691..4a3a0512827 100644 --- a/server.go +++ b/server.go @@ -2205,6 +2205,39 @@ func (s *server) Start(ctx context.Context) error { return } + // Resume any channel data purges that were interrupted by a + // previous crash or restart. CloseChannel (Phase 1) may have + // completed but PurgeClosedChannelData (Phase 2) may not have + // finished deleting the bulk historical data. We restart the + // purge for each such channel before other subsystems load + // channel state. + pendingCleanup, err := s.chanStateDB.FetchChannelsPendingCleanup( //nolint:ll + ctx, + ) + if err != nil { + srvrLog.Errorf("Failed to fetch channels pending "+ + "cleanup: %v", err) + + startErr = err + + return + } + for _, chanPoint := range pendingCleanup { + srvrLog.Infof("Resuming data purge for closed "+ + "channel %v", chanPoint) + + err := s.chainArb.ResumeClosedChannelCleanup(chanPoint) + if err != nil { + srvrLog.Errorf("Failed to enqueue closed "+ + "channel cleanup for %v: %v", chanPoint, + err) + + startErr = err + + return + } + } + cleanup = cleanup.add(s.customMessageServer.Stop) if err := s.customMessageServer.Start(); err != nil { startErr = err