Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 46 additions & 143 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package channeldb

import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
Expand Down Expand Up @@ -183,6 +184,16 @@ var (
// have any channels state.
ErrNoChanInfoFound = fmt.Errorf("no chan info found")

// ErrChannelPendingCleanup is returned when a channel bucket exists
// in openChannelBucket but chanInfoKey is absent and the
// pendingCleanupKey sentinel is present. This means CloseChannel
// (Phase 1) has completed — the channel is logically closed — but
// PurgeClosedChannelData (Phase 2) has not yet deleted the bulk
// historical data from this bucket. Callers that scan all open
// channels should skip such buckets; callers doing targeted lookups
// should treat the channel as not found.
ErrChannelPendingCleanup = fmt.Errorf("channel pending cleanup")
Comment on lines +187 to +195
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment for ErrChannelPendingCleanup is quite long and could be more concise to improve readability while still explaining the purpose.


// ErrNoRevocationsFound is returned when revocation state for a
// particular channel cannot be found.
ErrNoRevocationsFound = fmt.Errorf("no revocations found")
Expand Down Expand Up @@ -3969,155 +3980,38 @@ type ChannelCloseSummary struct {
LastChanSyncMsg *lnwire.ChannelReestablish
}

// CloseChannel closes a previously active Lightning channel. Closing a channel
// entails deleting all saved state within the database concerning this
// channel. This method also takes a struct that summarizes the state of the
// channel at closing, this compact representation will be the only component
// of a channel left over after a full closing. It takes an optional set of
// channel statuses which will be written to the historical channel bucket.
// These statuses are used to record close initiators.
// CloseChannel closes a previously active Lightning channel. Closing a
// channel entails atomically recording the close summary, archiving the
// channel state, and updating the outpoint index — but intentionally defers
// the deletion of bulk historical data (revocation log, forwarding packages)
// so that it can be purged in smaller batches via the ChannelCloser interface
// on the backing ChannelStateDB. This avoids holding a single large DB
// transaction that would block other writers on SQLite / Postgres backends.
//
// It takes an optional set of channel statuses which will be written to the
// historical channel bucket to record close initiators.
func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
statuses ...ChannelStatus) error {

c.Lock()
defer c.Unlock()

return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error {
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoChanDBExists
}

nodePub := c.IdentityPub.SerializeCompressed()
nodeChanBucket := openChanBucket.NestedReadWriteBucket(nodePub)
if nodeChanBucket == nil {
return ErrNoActiveChannels
}

chainBucket := nodeChanBucket.NestedReadWriteBucket(c.ChainHash[:])
if chainBucket == nil {
return ErrNoActiveChannels
}

var chanPointBuf bytes.Buffer
err := graphdb.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return err
}
chanKey := chanPointBuf.Bytes()
chanBucket := chainBucket.NestedReadWriteBucket(
chanKey,
)
if chanBucket == nil {
return ErrNoActiveChannels
}

// Before we delete the channel state, we'll read out the full
// details, as we'll also store portions of this information
// for record keeping.
chanState, err := fetchOpenChannel(
chanBucket, &c.FundingOutpoint,
)
if err != nil {
return err
}

// Delete all the forwarding packages stored for this particular
// channel.
if err = chanState.Packager.Wipe(tx); err != nil {
return err
}

// Now that the index to this channel has been deleted, purge
// the remaining channel metadata from the database.
err = deleteOpenChannel(chanBucket)
if err != nil {
return err
}

// We'll also remove the channel from the frozen channel bucket
// if we need to.
if c.ChanType.IsFrozen() || c.ChanType.HasLeaseExpiration() {
err := deleteThawHeight(chanBucket)
if err != nil {
return err
}
}

// With the base channel data deleted, attempt to delete the
// information stored within the revocation log.
if err := deleteLogBucket(chanBucket); err != nil {
return err
}

err = chainBucket.DeleteNestedBucket(chanPointBuf.Bytes())
if err != nil {
return err
}

// Fetch the outpoint bucket to see if the outpoint exists or
// not.
opBucket := tx.ReadWriteBucket(outpointBucket)
if opBucket == nil {
return ErrNoChanDBExists
}

// Add the closed outpoint to our outpoint index. This should
// replace an open outpoint in the index.
if opBucket.Get(chanPointBuf.Bytes()) == nil {
return ErrMissingIndexEntry
}

status := uint8(outpointClosed)

// Write the IndexStatus of this outpoint as the first entry in a tlv
// stream.
statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status)
opStream, err := tlv.NewStream(statusRecord)
if err != nil {
return err
}

var b bytes.Buffer
if err := opStream.Encode(&b); err != nil {
return err
}

// Finally add the closed outpoint and tlv stream to the index.
if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil {
return err
}

// Add channel state to the historical channel bucket.
historicalBucket, err := tx.CreateTopLevelBucket(
historicalChannelBucket,
)
if err != nil {
return err
}

historicalChanBucket, err :=
historicalBucket.CreateBucketIfNotExists(chanKey)
if err != nil {
return err
}

// Apply any additional statuses to the channel state.
for _, status := range statuses {
chanState.chanStatus |= status
}

err = putOpenChannel(historicalChanBucket, chanState)
if err != nil {
return err
}

// Finally, create a summary of this channel in the closed
// channel bucket for this node.
return putChannelCloseSummary(
tx, chanPointBuf.Bytes(), summary, chanState,
)
}, func() {})
closeSummary := *summary

// The fields ChannelStateDB.CloseChannel needs for bucket navigation
// and for reconstructing the forwarding-package packager in Phase 2
// come from the channel struct itself, not from the caller-supplied
// summary. The original implementation read them directly from c;
// we preserve that behaviour here so that callers do not have to
// populate these fields in the summary.
closeSummary.RemotePub = c.IdentityPub
closeSummary.ChainHash = c.ChainHash
closeSummary.ShortChanID = c.ShortChannelID

return c.Db.CloseChannel(
context.Background(), c.FundingOutpoint, &closeSummary,
statuses...,
)
}

// ChannelSnapshot is a frozen snapshot of the current channel state. A
Expand Down Expand Up @@ -4739,6 +4633,15 @@ func readChanConfig(b io.Reader, c *ChannelConfig) error {
func fetchChanInfo(chanBucket kvdb.RBucket, channel *OpenChannel) error {
infoBytes := chanBucket.Get(chanInfoKey)
if infoBytes == nil {
// Distinguish an intentional Phase 1-complete state (where
// CloseChannel deleted chanInfoKey but left the bulk data for
// background purge) from genuine data corruption. The sentinel
// key is written by CloseChannel and removed only when Phase 2
// finishes deleting the bucket.
if chanBucket.Get(pendingCleanupKey) != nil {
return ErrChannelPendingCleanup
}

return ErrNoChanInfoFound
}
r := bytes.NewReader(infoBytes)
Expand Down
64 changes: 64 additions & 0 deletions channeldb/channel_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package channeldb

import (
"context"

"github.com/btcsuite/btcd/wire"
)

// ChannelCloser manages the lifecycle of channel closure and the cleanup of
// associated bulk historical data. The interface is intentionally backend-
// agnostic: the current KV implementation on *ChannelStateDB satisfies it,
// and a future native-SQL implementation will provide a different concrete
// type that satisfies the same contract without any change to callers.
//
// The close operation is deliberately split into two phases to prevent a
// single large DB transaction from holding the write lock for an unbounded
// amount of time (which is especially harmful on SQLite and Postgres backends,
// where bulk cascade-deletes inside one transaction can block all other
// writers for seconds):
//
// - CloseChannel performs a fast, atomic Phase 1: it records the close
// summary, archives the channel state, updates the outpoint index, and
// deletes the small per-channel keys — but intentionally leaves the bulk
// historical data (revocation log, forwarding packages) in place and
// registers a cleanup task for them.
//
// - PurgeClosedChannelData performs the heavy Phase 2 across many small
// transactions, deleting up to batchSize entries per call so that other
// writers can interleave. It is idempotent and crash-safe: the cleanup
// task registered by CloseChannel persists across restarts and is resumed
// via FetchChannelsPendingCleanup.
type ChannelCloser interface {
// CloseChannel atomically records the channel as closed. It writes
// the close summary to the closed-channel bucket, archives the full
// channel state to the historical bucket, updates the outpoint index,
// deletes the small per-channel state keys, and registers a cleanup
// task for the bulk historical data that is too large to delete here.
//
// NOTE: summary.RemotePub and summary.ChainHash must be populated so
// that the KV implementation can navigate to the channel bucket.
CloseChannel(ctx context.Context, chanPoint wire.OutPoint,
summary *ChannelCloseSummary,
statuses ...ChannelStatus) error

// PurgeClosedChannelData removes up to batchSize entries of historical
// data for a closed channel (revocation log entries, forwarding
// packages). Each call executes in one or more small transactions.
// Returns done=true once all bulk data has been removed and the cleanup
// task has been deregistered. Safe to call multiple times; idempotent
// once done=true has been returned.
PurgeClosedChannelData(ctx context.Context, chanPoint wire.OutPoint,
batchSize int) (bool, error)

// FetchChannelsPendingCleanup returns the outpoints of channels that
// have been closed (Phase 1 complete) but whose bulk historical data
// has not yet been fully deleted (Phase 2 incomplete). Used at startup
// to resume any purges that were interrupted by a crash or restart.
FetchChannelsPendingCleanup(ctx context.Context) ([]wire.OutPoint,
error)
}

// Ensure the current KV-backed channel state store satisfies the backend-
// agnostic close/cleanup contract.
var _ ChannelCloser = (*ChannelStateDB)(nil)
56 changes: 50 additions & 6 deletions channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,8 +964,18 @@ func TestChannelStateTransition(t *testing.T) {
len(channels))
}

// Attempting to find previous states on the channel should fail as the
// revocation log has been deleted.
// Run Phase 2 to delete the bulk historical data (revocation log and
// forwarding packages). Phase 1 (CloseChannel) intentionally leaves
// them in place to avoid a single long-held write lock.
_, err = cdb.PurgeClosedChannelData(
t.Context(),
channel.FundingOutpoint,
DefaultPurgeBatchSize,
)
require.NoError(t, err, "phase 2 purge failed")

// Attempting to find previous states on the channel should fail now
// that the revocation log has been deleted by Phase 2.
_, _, err = updatedChannel[0].FindPreviousState(
oldRemoteCommit.CommitHeight,
)
Expand Down Expand Up @@ -1143,6 +1153,13 @@ func TestFetchClosedChannels(t *testing.T) {
t.Fatalf("unable to close channel: %v", err)
}

expected := *summary
expected.RemotePub = state.IdentityPub
expected.ChainHash = state.ChainHash
expected.ShortChanID = state.ShortChannelID
expected.RemoteCurrentRevocation = state.RemoteCurrentRevocation
expected.RemoteNextRevocation = state.RemoteNextRevocation

// Query the database to ensure that the channel has now been properly
// closed. We should get the same result whether querying for pending
// channels only, or not.
Expand All @@ -1152,19 +1169,19 @@ func TestFetchClosedChannels(t *testing.T) {
t.Fatalf("incorrect number of pending closed channels: expecting %v,"+
"got %v", 1, len(pendingClosed))
}
if !reflect.DeepEqual(summary, pendingClosed[0]) {
if !reflect.DeepEqual(&expected, pendingClosed[0]) {
t.Fatalf("database summaries don't match: expected %v got %v",
spew.Sdump(summary), spew.Sdump(pendingClosed[0]))
spew.Sdump(&expected), spew.Sdump(pendingClosed[0]))
}
closed, err := cdb.FetchClosedChannels(false)
require.NoError(t, err, "failed fetching all closed channels")
if len(closed) != 1 {
t.Fatalf("incorrect number of closed channels: expecting %v, "+
"got %v", 1, len(closed))
}
if !reflect.DeepEqual(summary, closed[0]) {
if !reflect.DeepEqual(&expected, closed[0]) {
t.Fatalf("database summaries don't match: expected %v got %v",
spew.Sdump(summary), spew.Sdump(closed[0]))
spew.Sdump(&expected), spew.Sdump(closed[0]))
}

// Mark the channel as fully closed.
Expand Down Expand Up @@ -1601,6 +1618,33 @@ func TestCloseChannelStatus(t *testing.T) {
}
}

func TestOpenChannelCloseChannelDoesNotMutateSummary(t *testing.T) {
t.Parallel()

fullDB, err := MakeTestDB(t)
require.NoError(t, err)

cdb := fullDB.ChannelStateDB()
channel := createTestChannel(t, cdb, openChannelOption())

localBalance := channel.LocalCommitment.LocalBalance.ToSatoshis()

summary := &ChannelCloseSummary{
ChanPoint: channel.FundingOutpoint,
ClosingTXID: rev,
Capacity: channel.Capacity,
SettledBalance: localBalance,
TimeLockedBalance: localBalance + 1,
CloseType: RemoteForceClose,
IsPending: true,
LocalChanConfig: channel.LocalChanCfg,
}
original := *summary

require.NoError(t, channel.CloseChannel(summary))
require.Equal(t, original, *summary)
}

// TestHasChanStatus asserts the behavior of HasChanStatus by checking the
// behavior of various status flags in addition to the special case of
// ChanStatusDefault which is treated like a flag in the code base even though
Expand Down
Loading
Loading