diff --git a/bucket.go b/bucket.go index 6371ace97..e344cd089 100644 --- a/bucket.go +++ b/bucket.go @@ -884,6 +884,11 @@ func (b *Bucket) node(pgId common.Pgid, parent *node) *node { }) } + // If the page is compressed, decompress it before reading. + if p.IsCompressed() { + p = b.tx.decompressedPage(p) + } + // Read the page into the node and cache it. n.read(p) b.nodes[pgId] = n @@ -945,7 +950,14 @@ func (b *Bucket) pageNode(id common.Pgid) (*common.Page, *node) { } // Finally lookup the page from the transaction if no node is materialized. - return b.tx.page(id), nil + p := b.tx.page(id) + + // If the page is compressed, decompress it transparently. + if p.IsCompressed() { + p = b.tx.decompressedPage(p) + } + + return p, nil } // BucketStats records statistics about resources used by a bucket. diff --git a/cmd/bbolt/command/command_bench.go b/cmd/bbolt/command/command_bench.go index c29d29a3a..596a89fb7 100644 --- a/cmd/bbolt/command/command_bench.go +++ b/cmd/bbolt/command/command_bench.go @@ -41,6 +41,7 @@ type benchOptions struct { pageSize int initialMmapSize int deleteFraction float64 // Fraction of keys of last tx to delete during writes. works only with "seq-del" write mode. + compression bool } func (o *benchOptions) AddFlags(fs *pflag.FlagSet) { @@ -61,6 +62,7 @@ func (o *benchOptions) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.goBenchOutput, "gobench-output", false, "") fs.IntVar(&o.pageSize, "page-size", common.DefaultPageSize, "Set page size in bytes.") fs.IntVar(&o.initialMmapSize, "initial-mmap-size", 0, "Set initial mmap size in bytes for database file.") + fs.BoolVar(&o.compression, "compression", false, "Enables compression.") } // Returns an error if `bench` options are not valid. @@ -145,6 +147,7 @@ func benchFunc(cmd *cobra.Command, options *benchOptions) error { dbOptions := *bolt.DefaultOptions dbOptions.PageSize = options.pageSize dbOptions.InitialMmapSize = options.initialMmapSize + dbOptions.Compression = options.compression db, err := bolt.Open(options.path, 0600, &dbOptions) if err != nil { return err diff --git a/cmd/bbolt/command/command_bench_test.go b/cmd/bbolt/command/command_bench_test.go index b54e61a11..533441a26 100644 --- a/cmd/bbolt/command/command_bench_test.go +++ b/cmd/bbolt/command/command_bench_test.go @@ -38,8 +38,9 @@ func TestBenchCommand_Run(t *testing.T) { tests := map[string]struct { args []string }{ - "no-args": {}, - "100k count": {[]string{"--count", "100000"}}, + "no-args": {}, + "100k count": {[]string{"--count", "100000"}}, + "compression": {[]string{"--compression"}}, } for name, test := range tests { diff --git a/compression_test.go b/compression_test.go new file mode 100644 index 000000000..c371e087e --- /dev/null +++ b/compression_test.go @@ -0,0 +1,493 @@ +package bbolt_test + +import ( + "bytes" + "encoding/binary" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + bolt "go.etcd.io/bbolt" + "go.etcd.io/bbolt/internal/btesting" +) + +// TestCompression_BasicWriteRead verifies that data written with compression +// enabled can be read back correctly. +func TestCompression_BasicWriteRead(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + // Write data. + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("test")) + if err != nil { + return err + } + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + // Use repeating bytes to make data compressible. + value := bytes.Repeat([]byte(fmt.Sprintf("value-%05d-", i)), 10) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Read data back. + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("test")) + require.NotNil(t, b) + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + expected := bytes.Repeat([]byte(fmt.Sprintf("value-%05d-", i)), 10) + value := b.Get(key) + require.Equal(t, expected, value, "mismatch for key %s", key) + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_CursorTraversal verifies that cursor operations work +// correctly with compressed pages. +func TestCompression_CursorTraversal(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + const numKeys = 200 + + // Write data. + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("cursor-test")) + if err != nil { + return err + } + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte{byte(i % 256)}, 100) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Traverse with cursor. + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("cursor-test")) + require.NotNil(t, b) + + c := b.Cursor() + count := 0 + for k, v := c.First(); k != nil; k, v = c.Next() { + expected := []byte(fmt.Sprintf("key-%05d", count)) + require.Equal(t, expected, k) + require.Len(t, v, 100) + count++ + } + require.Equal(t, numKeys, count) + + // Seek. + k, v := c.Seek([]byte("key-00050")) + require.Equal(t, []byte("key-00050"), k) + require.Len(t, v, 100) + + return nil + }) + require.NoError(t, err) +} + +// TestCompression_MultipleTransactions verifies that multiple write +// transactions work correctly with compression. +func TestCompression_MultipleTransactions(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + // First transaction: write initial data. + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("multi")) + if err != nil { + return err + } + for i := 0; i < 50; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte("a"), 200) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Second transaction: add more data. + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("multi")) + for i := 50; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte("b"), 200) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Verify all data. + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("multi")) + require.NotNil(t, b) + for i := 0; i < 50; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := b.Get(key) + require.Equal(t, bytes.Repeat([]byte("a"), 200), value) + } + for i := 50; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := b.Get(key) + require.Equal(t, bytes.Repeat([]byte("b"), 200), value) + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_NestedBuckets verifies that nested buckets work +// correctly with compression. +func TestCompression_NestedBuckets(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + err := db.Update(func(tx *bolt.Tx) error { + parent, err := tx.CreateBucket([]byte("parent")) + if err != nil { + return err + } + child, err := parent.CreateBucket([]byte("child")) + if err != nil { + return err + } + for i := 0; i < 50; i++ { + key := []byte(fmt.Sprintf("nested-key-%05d", i)) + value := bytes.Repeat([]byte("nested"), 20) + if err := child.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + err = db.View(func(tx *bolt.Tx) error { + parent := tx.Bucket([]byte("parent")) + require.NotNil(t, parent) + child := parent.Bucket([]byte("child")) + require.NotNil(t, child) + for i := 0; i < 50; i++ { + key := []byte(fmt.Sprintf("nested-key-%05d", i)) + expected := bytes.Repeat([]byte("nested"), 20) + value := child.Get(key) + require.Equal(t, expected, value) + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_DeleteAndRewrite verifies that deleting keys and +// rewriting them works correctly with compression. +func TestCompression_DeleteAndRewrite(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + // Write initial data. + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("deltest")) + if err != nil { + return err + } + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte("original"), 20) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Delete even-numbered keys. + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("deltest")) + for i := 0; i < 100; i += 2 { + key := []byte(fmt.Sprintf("key-%05d", i)) + if err := b.Delete(key); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Verify only odd-numbered keys remain. + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("deltest")) + require.NotNil(t, b) + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := b.Get(key) + if i%2 == 0 { + require.Nil(t, value, "expected deleted key %s to be nil", key) + } else { + require.Equal(t, bytes.Repeat([]byte("original"), 20), value) + } + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_LargeValues verifies that pages with overflow work +// correctly with compression. +func TestCompression_LargeValues(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + // Write large values that span multiple pages. + largeValue := bytes.Repeat([]byte("LARGE"), 2000) // 10KB of repeating data + + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("large")) + if err != nil { + return err + } + for i := 0; i < 10; i++ { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, uint64(i)) + if err := b.Put(key, largeValue); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Verify. + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("large")) + require.NotNil(t, b) + for i := 0; i < 10; i++ { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, uint64(i)) + value := b.Get(key) + require.Equal(t, largeValue, value) + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_Check verifies that db.Check() passes with +// compressed pages. +func TestCompression_Check(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("check")) + if err != nil { + return err + } + for i := 0; i < 200; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte(fmt.Sprintf("v%05d", i)), 10) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Run Check. + err = db.View(func(tx *bolt.Tx) error { + for err := range tx.Check(bolt.WithKVStringer(bolt.HexKVStringer())) { + return err + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_ReopenDB verifies that a compressed DB can be closed +// and reopened, and all data can still be read. +func TestCompression_ReopenDB(t *testing.T) { + dir := t.TempDir() + path := dir + "/reopen.db" + + // Create and populate. + db, err := bolt.Open(path, 0600, &bolt.Options{Compression: true}) + require.NoError(t, err) + + err = db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("reopen")) + if err != nil { + return err + } + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte("reopen-data"), 15) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + require.NoError(t, db.Close()) + + // Reopen. + db, err = bolt.Open(path, 0600, &bolt.Options{Compression: true}) + require.NoError(t, err) + defer db.Close() + + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("reopen")) + require.NotNil(t, b) + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + expected := bytes.Repeat([]byte("reopen-data"), 15) + value := b.Get(key) + require.Equal(t, expected, value) + } + return nil + }) + require.NoError(t, err) +} + +// TestCompression_ConcurrentReads verifies that multiple goroutines can +// concurrently read from a compressed database using the same read-only +// transaction without triggering a data race. +func TestCompression_ConcurrentReads(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{Compression: true}) + + const numKeys = 500 + + // Populate the database with enough data to span many pages. + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("concurrent")) + if err != nil { + return err + } + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte(fmt.Sprintf("val-%05d-", i)), 10) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Open a single read-only transaction and share it across goroutines, + // which is the pattern etcd uses. + tx, err := db.Begin(false) + require.NoError(t, err) + defer func() { require.NoError(t, tx.Rollback()) }() + + b := tx.Bucket([]byte("concurrent")) + require.NotNil(t, b) + + const numGoroutines = 16 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func() { + defer wg.Done() + // Each goroutine creates its own cursor and reads all keys. + c := b.Cursor() + count := 0 + for k, v := c.First(); k != nil; k, v = c.Next() { + expected := []byte(fmt.Sprintf("key-%05d", count)) + if !bytes.Equal(k, expected) { + t.Errorf("key mismatch: got %q, want %q", k, expected) + return + } + if len(v) == 0 { + t.Errorf("empty value for key %q", k) + return + } + count++ + } + if count != numKeys { + t.Errorf("expected %d keys, got %d", numKeys, count) + } + }() + } + + wg.Wait() +} + +// TestCompression_ReducesAllocatedPages verifies that compression actually +// reduces the number of pages allocated compared to an uncompressed database +// with the same data. We compare tx.Size() which reflects the logical +// database size (highest page ID * page size), not the mmap/file size. +func TestCompression_ReducesAllocatedPages(t *testing.T) { + dir := t.TempDir() + pathCompressed := dir + "/compressed.db" + pathUncompressed := dir + "/uncompressed.db" + + const numKeys = 1000 + // Use highly compressible values (repeating patterns). + writeData := func(db *bolt.DB) { + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("data")) + if err != nil { + return err + } + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("key-%05d", i)) + value := bytes.Repeat([]byte(fmt.Sprintf("value-%05d-", i)), 50) + if err := b.Put(key, value); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + } + + readTxSize := func(db *bolt.DB) int64 { + var size int64 + err := db.View(func(tx *bolt.Tx) error { + size = tx.Size() + return nil + }) + require.NoError(t, err) + return size + } + + // Create compressed DB. + dbC, err := bolt.Open(pathCompressed, 0600, &bolt.Options{Compression: true}) + require.NoError(t, err) + writeData(dbC) + sizeC := readTxSize(dbC) + require.NoError(t, dbC.Close()) + + // Create uncompressed DB. + dbU, err := bolt.Open(pathUncompressed, 0600, nil) + require.NoError(t, err) + writeData(dbU) + sizeU := readTxSize(dbU) + require.NoError(t, dbU.Close()) + + t.Logf("Uncompressed logical size: %d bytes (%d pages)", sizeU, sizeU/4096) + t.Logf("Compressed logical size: %d bytes (%d pages)", sizeC, sizeC/4096) + t.Logf("Ratio: %.1f%%", float64(sizeC)/float64(sizeU)*100) + + require.Less(t, sizeC, sizeU, + "compressed DB logical size (%d) should be smaller than uncompressed (%d)", + sizeC, sizeU) +} diff --git a/db.go b/db.go index 5d3e26496..b1aa03720 100644 --- a/db.go +++ b/db.go @@ -116,6 +116,13 @@ type DB struct { // Supported only on Unix via mlock/munlock syscalls. Mlock bool + // Compression enables zstd compression of leaf and branch page data. + // When enabled, page data is compressed before writing to disk and + // transparently decompressed when reading. This can significantly + // reduce database file size for compressible data at the cost of + // some CPU overhead. + Compression bool + logger Logger path string @@ -192,6 +199,7 @@ func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) { db.FreelistType = options.FreelistType db.Mlock = options.Mlock db.MaxSize = options.MaxSize + db.Compression = options.Compression // Set default values for later DB operations. db.MaxBatchSize = common.DefaultMaxBatchSize @@ -1347,6 +1355,11 @@ type Options struct { // used memory can't be reclaimed. (UNIX only) Mlock bool + // Compression enables zstd compression of leaf and branch page data. + // When enabled, page data is compressed before writing to disk and + // transparently decompressed when reading. + Compression bool + // Logger is the logger used for bbolt. Logger Logger @@ -1361,8 +1374,8 @@ func (o *Options) String() string { return "{}" } - return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, MaxSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p, NoStatistics: %t}", - o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.MaxSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger, o.NoStatistics) + return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, MaxSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Compression: %t, Logger: %p, NoStatistics: %t}", + o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.MaxSize, o.NoSync, o.OpenFile, o.Mlock, o.Compression, o.Logger, o.NoStatistics) } diff --git a/go.mod b/go.mod index e6a9c121a..f6f74d308 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 toolchain go1.24.13 require ( + github.com/klauspost/compress v1.18.4 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 diff --git a/go.sum b/go.sum index 3d304ecb0..b5c79d32a 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/internal/common/compression.go b/internal/common/compression.go new file mode 100644 index 000000000..c664145f0 --- /dev/null +++ b/internal/common/compression.go @@ -0,0 +1,169 @@ +package common + +import ( + "encoding/binary" + "fmt" + "sync" + "unsafe" + + "github.com/klauspost/compress/zstd" +) + +// compressedDataLenSize is the size in bytes used to store the compressed +// data length right after the page header in a compressed page. This allows +// the decompressor to know exactly how many bytes to read. +const compressedDataLenSize = 4 + +var ( + zstdEncoderOnce sync.Once + zstdDecoderOnce sync.Once + zstdEncoder *zstd.Encoder + zstdDecoder *zstd.Decoder +) + +func getZstdEncoder() *zstd.Encoder { + zstdEncoderOnce.Do(func() { + var err error + zstdEncoder, err = zstd.NewWriter(nil, + zstd.WithEncoderLevel(zstd.SpeedDefault), + zstd.WithEncoderConcurrency(1), + ) + if err != nil { + panic("bbolt: failed to create zstd encoder: " + err.Error()) + } + }) + return zstdEncoder +} + +func getZstdDecoder() *zstd.Decoder { + zstdDecoderOnce.Do(func() { + var err error + zstdDecoder, err = zstd.NewReader(nil, + zstd.WithDecoderConcurrency(1), + ) + if err != nil { + panic("bbolt: failed to create zstd decoder: " + err.Error()) + } + }) + return zstdDecoder +} + +// CompressInodes serializes the given inodes into a temporary page buffer, +// compresses the data portion, and returns the compressed page buffer that +// can be copied directly into the allocated page. The returned buffer +// includes the page header (with the compressed flag set), the 4-byte +// compressed length, and the compressed data. +// +// If compression does not reduce the number of pages needed, nil is returned +// to signal that the node should be written uncompressed. +// +// On success, the caller should allocate ceil(len(result)/pageSize) pages +// and copy the result into the allocated page buffer. +func CompressInodes(inodes Inodes, isLeaf bool, pageSize int) []byte { + // First, figure out how large the uncompressed page would be. + uncompressedSize := int(PageHeaderSize) + var elemSize uintptr + if isLeaf { + elemSize = LeafPageElementSize + } else { + elemSize = BranchPageElementSize + } + for i := 0; i < len(inodes); i++ { + uncompressedSize += int(elemSize) + len(inodes[i].Key()) + len(inodes[i].Value()) + } + uncompressedPages := (uncompressedSize + pageSize - 1) / pageSize + + // Serialize the inodes into a scratch buffer large enough for the + // uncompressed page(s). We need a real page layout so that + // WriteInodeToPage can use unsafe pointer arithmetic. + scratchSize := uncompressedPages * pageSize + scratch := make([]byte, scratchSize) + p := (*Page)(unsafe.Pointer(&scratch[0])) + if isLeaf { + p.SetFlags(LeafPageFlag) + } else { + p.SetFlags(BranchPageFlag) + } + p.SetCount(uint16(len(inodes))) + WriteInodeToPage(inodes, p) + + // Compress the data portion (everything after the page header). + dataSize := scratchSize - int(PageHeaderSize) + data := scratch[PageHeaderSize:] + compressed := getZstdEncoder().EncodeAll(data[:dataSize], nil) + + // Total compressed page size: header + 4-byte length + compressed data. + compressedTotalSize := int(PageHeaderSize) + compressedDataLenSize + len(compressed) + compressedPages := (compressedTotalSize + pageSize - 1) / pageSize + + // Only use compression if it actually reduces the page count. + if compressedPages >= uncompressedPages { + return nil + } + + // Build the final compressed page buffer, sized to compressedPages. + buf := make([]byte, compressedPages*pageSize) + // Copy the header (id will be set by the caller, flags and count are set). + copy(buf, scratch[:PageHeaderSize]) + // Set the compressed flag on the output header. + cp := (*Page)(unsafe.Pointer(&buf[0])) + cp.SetCompressed(true) + cp.SetOverflow(uint32(compressedPages - 1)) + + // Write the compressed data length, then the compressed data. + binary.LittleEndian.PutUint32(buf[PageHeaderSize:], uint32(len(compressed))) + copy(buf[int(PageHeaderSize)+compressedDataLenSize:], compressed) + + return buf +} + +// DecompressPage takes a compressed page (from mmap or a buffer) and returns +// a new Page backed by a heap-allocated buffer with the decompressed data. +// The returned page has the compressed flag cleared. +// +// The caller is responsible for keeping the returned buffer alive as long +// as the page (and any slices derived from it) are in use. +func DecompressPage(p *Page, pageSize int) (*Page, []byte, error) { + allocSize := (int(p.Overflow()) + 1) * pageSize + + // Read the compressed data length from the 4 bytes after the header. + lenBytes := UnsafeByteSlice(unsafe.Pointer(p), PageHeaderSize, 0, compressedDataLenSize) + compressedLen := int(binary.LittleEndian.Uint32(lenBytes)) + + // Sanity check. + maxCompressed := allocSize - int(PageHeaderSize) - compressedDataLenSize + if compressedLen <= 0 || compressedLen > maxCompressed { + return nil, nil, fmt.Errorf("invalid compressed data length %d (max %d) on page %d", compressedLen, maxCompressed, p.Id()) + } + + // Get the compressed data. + compressedData := UnsafeByteSlice(unsafe.Pointer(p), PageHeaderSize+uintptr(compressedDataLenSize), 0, compressedLen) + + // Decompress the data. + decompressed, err := getZstdDecoder().DecodeAll(compressedData, nil) + if err != nil { + return nil, nil, fmt.Errorf("zstd DecodeAll on page %d: %w", p.Id(), err) + } + + // Allocate a buffer large enough for the header + decompressed data. + // The buffer must be at least as large as the decompressed page content, + // but we don't change the overflow — it must reflect the on-disk + // allocation size so freelist accounting remains correct. + bufSize := int(PageHeaderSize) + len(decompressed) + buf := make([]byte, bufSize) + + // Copy the header. + headerBytes := UnsafeByteSlice(unsafe.Pointer(p), 0, 0, int(PageHeaderSize)) + copy(buf, headerBytes) + + // Copy the decompressed data after the header. + copy(buf[PageHeaderSize:], decompressed) + + // Clear the compressed flag. Overflow is preserved from the original + // page header — it reflects the on-disk allocation, not the + // decompressed data size. + newPage := (*Page)(unsafe.Pointer(&buf[0])) + newPage.SetCompressed(false) + + return newPage, buf, nil +} diff --git a/internal/common/compression_test.go b/internal/common/compression_test.go new file mode 100644 index 000000000..8a256847f --- /dev/null +++ b/internal/common/compression_test.go @@ -0,0 +1,221 @@ +package common + +import ( + "testing" + "unsafe" +) + +func TestCompressDecompressInodes(t *testing.T) { + const pageSize = 4096 + const numInodes = 30 + const valSize = 500 + + // Build inodes with highly compressible data that spans multiple + // pages uncompressed so that compression can actually reduce the + // page count. + inodes := make(Inodes, numInodes) + for i := range inodes { + inodes[i].SetKey([]byte{byte(i + 1)}) + val := make([]byte, valSize) + for j := range val { + val[j] = byte(i) + } + inodes[i].SetValue(val) + } + + // Sanity: verify uncompressed data spans more than 1 page. + uncompressedSize := int(PageHeaderSize) + numInodes*(int(LeafPageElementSize)+1+valSize) + if uncompressedSize <= pageSize { + t.Fatalf("test data should span multiple pages, only %d bytes", uncompressedSize) + } + + // Compress the inodes. + compressed := CompressInodes(inodes, true, pageSize) + if compressed == nil { + t.Fatal("expected compression to succeed for highly compressible multi-page data") + } + + // Verify compressed buffer is page-aligned. + if len(compressed)%pageSize != 0 { + t.Fatalf("compressed buffer size (%d) is not page-aligned", len(compressed)) + } + + // Verify the compressed page header. + cp := (*Page)(unsafe.Pointer(&compressed[0])) + if !cp.IsCompressed() { + t.Fatal("compressed page should have compressed flag set") + } + if !cp.IsLeafPage() { + t.Fatal("compressed page should still be a leaf page") + } + if cp.Count() != numInodes { + t.Fatalf("expected count %d, got %d", numInodes, cp.Count()) + } + + // Verify it actually reduced page count. + compressedPages := len(compressed) / pageSize + uncompressedPages := (uncompressedSize + pageSize - 1) / pageSize + if compressedPages >= uncompressedPages { + t.Fatalf("compressed pages (%d) should be fewer than uncompressed (%d)", + compressedPages, uncompressedPages) + } + + // Decompress the page. + dp, _, err := DecompressPage(cp, pageSize) + if err != nil { + t.Fatalf("decompression failed: %v", err) + } + + // Verify the decompressed page. + if dp.IsCompressed() { + t.Fatal("decompressed page should not have compressed flag") + } + if !dp.IsLeafPage() { + t.Fatal("decompressed page should be a leaf page") + } + if dp.Count() != numInodes { + t.Fatalf("expected count %d, got %d", numInodes, dp.Count()) + } + + // Read back the inodes and verify. + dInodes := ReadInodeFromPage(dp) + if len(dInodes) != numInodes { + t.Fatalf("expected %d inodes, got %d", numInodes, len(dInodes)) + } + for i, in := range dInodes { + if len(in.Key()) != 1 || in.Key()[0] != byte(i+1) { + t.Fatalf("inode %d: key mismatch", i) + } + if len(in.Value()) != valSize { + t.Fatalf("inode %d: value length %d, expected %d", i, len(in.Value()), valSize) + } + for j, b := range in.Value() { + if b != byte(i) { + t.Fatalf("inode %d value byte %d: expected 0x%02x, got 0x%02x", i, j, byte(i), b) + } + } + } +} + +func TestCompressInodes_NoSaving(t *testing.T) { + const pageSize = 4096 + + // Build inodes with random (incompressible) data. + // Fill a single page worth of data — compression can't reduce below 1 page. + inodes := make(Inodes, 1) + inodes[0].SetKey([]byte("key1")) + val := make([]byte, pageSize/2) + state := uint64(0xdeadbeefcafebabe) + for i := range val { + state ^= state << 13 + state ^= state >> 7 + state ^= state << 17 + val[i] = byte(state) + } + inodes[0].SetValue(val) + + // If the data fits in 1 page uncompressed, compression can't reduce + // the page count, so CompressInodes should return nil. + compressed := CompressInodes(inodes, true, pageSize) + if compressed != nil { + t.Fatal("expected nil when compression cannot reduce page count") + } +} + +func TestCompressInodes_LargeOverflow(t *testing.T) { + const pageSize = 4096 + + // Build inodes with lots of highly compressible data that span + // multiple pages uncompressed. + inodes := make(Inodes, 50) + for i := range inodes { + inodes[i].SetKey([]byte{byte(i + 1)}) // nonzero key + val := make([]byte, 500) + for j := range val { + val[j] = byte(i) + } + inodes[i].SetValue(val) + } + + compressed := CompressInodes(inodes, true, pageSize) + if compressed == nil { + t.Fatal("expected compression to succeed for multi-page compressible data") + } + + // The compressed buffer should be fewer pages. + compressedPages := len(compressed) / pageSize + uncompressedSize := int(PageHeaderSize) + 50*(int(LeafPageElementSize)+1+500) + uncompressedPages := (uncompressedSize + pageSize - 1) / pageSize + if compressedPages >= uncompressedPages { + t.Fatalf("compressed pages (%d) should be less than uncompressed pages (%d)", + compressedPages, uncompressedPages) + } + + // Verify round-trip. + cp := (*Page)(unsafe.Pointer(&compressed[0])) + dp, _, err := DecompressPage(cp, pageSize) + if err != nil { + t.Fatalf("decompression failed: %v", err) + } + dInodes := ReadInodeFromPage(dp) + if len(dInodes) != 50 { + t.Fatalf("expected 50 inodes, got %d", len(dInodes)) + } + for i, in := range dInodes { + if len(in.Key()) != 1 || in.Key()[0] != byte(i+1) { + t.Fatalf("inode %d: key mismatch", i) + } + if len(in.Value()) != 500 { + t.Fatalf("inode %d: value length %d, expected 500", i, len(in.Value())) + } + for j, b := range in.Value() { + if b != byte(i) { + t.Fatalf("inode %d value byte %d: expected 0x%02x, got 0x%02x", i, j, byte(i), b) + } + } + } +} + +func TestPageFlags_CompressedWithType(t *testing.T) { + p := &Page{} + + p.SetFlags(LeafPageFlag | CompressedPageFlag) + if !p.IsLeafPage() { + t.Fatal("should be a leaf page") + } + if !p.IsCompressed() { + t.Fatal("should be compressed") + } + if p.IsBranchPage() { + t.Fatal("should not be a branch page") + } + + p.SetFlags(BranchPageFlag | CompressedPageFlag) + if !p.IsBranchPage() { + t.Fatal("should be a branch page") + } + if !p.IsCompressed() { + t.Fatal("should be compressed") + } + if p.IsLeafPage() { + t.Fatal("should not be a leaf page") + } + + p.SetCompressed(false) + if p.IsCompressed() { + t.Fatal("should not be compressed after clearing") + } + if !p.IsBranchPage() { + t.Fatal("should still be a branch page after clearing compressed flag") + } +} + +func TestFastCheck_CompressedPage(t *testing.T) { + p := &Page{} + p.SetId(42) + p.SetFlags(LeafPageFlag | CompressedPageFlag) + p.FastCheck(42) + + p.SetFlags(BranchPageFlag | CompressedPageFlag) + p.FastCheck(42) +} diff --git a/internal/common/page.go b/internal/common/page.go index 4453160bb..d147b43b5 100644 --- a/internal/common/page.go +++ b/internal/common/page.go @@ -20,6 +20,11 @@ const ( LeafPageFlag = 0x02 MetaPageFlag = 0x04 FreelistPageFlag = 0x10 + + // CompressedPageFlag is an orthogonal bit flag that indicates the page + // data (everything after the header) is zstd-compressed. It can be + // combined with any page-type flag above (e.g. LeafPageFlag | CompressedPageFlag). + CompressedPageFlag = 0x20 ) const ( @@ -59,19 +64,33 @@ func (p *Page) Typ() string { } func (p *Page) IsBranchPage() bool { - return p.flags == BranchPageFlag + return p.flags&BranchPageFlag != 0 } func (p *Page) IsLeafPage() bool { - return p.flags == LeafPageFlag + return p.flags&LeafPageFlag != 0 } func (p *Page) IsMetaPage() bool { - return p.flags == MetaPageFlag + return p.flags&MetaPageFlag != 0 } func (p *Page) IsFreelistPage() bool { - return p.flags == FreelistPageFlag + return p.flags&FreelistPageFlag != 0 +} + +// IsCompressed returns true if the page data is zstd-compressed. +func (p *Page) IsCompressed() bool { + return p.flags&CompressedPageFlag != 0 +} + +// SetCompressed sets or clears the compressed flag on the page. +func (p *Page) SetCompressed(compressed bool) { + if compressed { + p.flags |= CompressedPageFlag + } else { + p.flags &^= CompressedPageFlag + } } // Meta returns a pointer to the metadata section of the page. @@ -81,11 +100,12 @@ func (p *Page) Meta() *Meta { func (p *Page) FastCheck(id Pgid) { Assert(p.id == id, "Page expected to be: %v, but self identifies as %v", id, p.id) - // Only one flag of page-type can be set. - Assert(p.IsBranchPage() || - p.IsLeafPage() || - p.IsMetaPage() || - p.IsFreelistPage(), + // Mask out the compressed flag before checking page type. + typeFlags := p.flags &^ CompressedPageFlag + Assert(typeFlags == BranchPageFlag || + typeFlags == LeafPageFlag || + typeFlags == MetaPageFlag || + typeFlags == FreelistPageFlag, "page %v: has unexpected type/flags: %x", p.id, p.flags) } diff --git a/node.go b/node.go index 022b1001e..821646f85 100644 --- a/node.go +++ b/node.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "sort" + "unsafe" "go.etcd.io/bbolt/internal/common" ) @@ -312,7 +313,18 @@ func (n *node) spill() error { n.children = nil // Split nodes into appropriate sizes. The first node will always be n. - var nodes = n.split(uintptr(tx.db.pageSize)) + // When compression is enabled, use a larger effective page size for + // splitting so that nodes accumulate more data before being split. + // Compression can then pack the larger node into fewer actual pages. + splitPageSize := uintptr(tx.db.pageSize) + if tx.db.Compression { + // Allow nodes to grow up to 4x the page size before splitting. + // Zstd typically achieves good compression on structured data, + // so this gives the compressor enough data to work with while + // keeping individual nodes at a reasonable size. + splitPageSize *= 4 + } + var nodes = n.split(splitPageSize) for _, node := range nodes { // Add node's page to the freelist if it's not new. if node.pgid > 0 { @@ -320,8 +332,19 @@ func (n *node) spill() error { node.pgid = 0 } - // Allocate contiguous space for the node. - p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize) + // Try to compress the node data to reduce page allocation. + var compressedBuf []byte + uncompressedPages := (node.size() + tx.db.pageSize - 1) / tx.db.pageSize + if tx.db.Compression { + compressedBuf = common.CompressInodes(node.inodes, node.isLeaf, tx.db.pageSize) + } + + // Allocate pages: use compressed page count if compression succeeded. + allocPages := uncompressedPages + if compressedBuf != nil { + allocPages = len(compressedBuf) / tx.db.pageSize + } + p, err := tx.allocate(allocPages) if err != nil { return err } @@ -331,7 +354,17 @@ func (n *node) spill() error { panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.Id(), tx.meta.Pgid())) } node.pgid = p.Id() - node.write(p) + if compressedBuf != nil { + // Copy the compressed buffer into the allocated page. + // The page header's id is already correct from allocate(), + // so we preserve it and copy the rest from the compressed buffer. + pgid := p.Id() + buf := common.UnsafeByteSlice(unsafe.Pointer(p), 0, 0, len(compressedBuf)) + copy(buf, compressedBuf) + p.SetId(pgid) + } else { + node.write(p) + } node.spilled = true // Insert into parent inodes. diff --git a/simulation_compression_test.go b/simulation_compression_test.go new file mode 100644 index 000000000..a39a495a7 --- /dev/null +++ b/simulation_compression_test.go @@ -0,0 +1,47 @@ +package bbolt_test + +import ( + "testing" + + bolt "go.etcd.io/bbolt" +) + +func TestSimulateCompression_1op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 1, 1) +} +func TestSimulateCompression_10op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 10, 1) +} +func TestSimulateCompression_100op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 100, 1) +} +func TestSimulateCompression_1000op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 1000, 1) +} +func TestSimulateCompression_10000op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 10000, 1) +} +func TestSimulateCompression_10op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 10, 10) +} +func TestSimulateCompression_100op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 100, 10) +} +func TestSimulateCompression_1000op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 1000, 10) +} +func TestSimulateCompression_10000op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 10000, 10) +} +func TestSimulateCompression_100op_100p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 100, 100) +} +func TestSimulateCompression_1000op_100p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 1000, 100) +} +func TestSimulateCompression_10000op_100p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 10000, 100) +} +func TestSimulateCompression_10000op_1000p(t *testing.T) { + testSimulate(t, &bolt.Options{Compression: true}, 8, 10000, 1000) +} diff --git a/tx.go b/tx.go index aa0066bd3..69f7fb349 100644 --- a/tx.go +++ b/tx.go @@ -8,6 +8,7 @@ import ( "runtime" "sort" "strings" + "sync" "sync/atomic" "time" "unsafe" @@ -34,6 +35,12 @@ type Tx struct { stats TxStats commitHandlers []func() + // decompressedPages caches pages that have been decompressed from + // compressed on-disk format. The buffers keep the decompressed data + // alive for the lifetime of the transaction. A sync.Map is used + // because read-only transactions may be accessed concurrently. + decompressedPages sync.Map // Pgid -> *common.Page + // WriteFlag specifies the flag for write-related methods like WriteTo(). // Tx opens the database file with the specified flag to copy the data. // @@ -530,8 +537,8 @@ func (tx *Tx) write() error { // Write pages to disk in order. for _, p := range pages { - rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize) offset := int64(p.Id()) * int64(tx.db.pageSize) + rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize) var written uintptr // Write out page in "max allocation" sized chunks. @@ -641,6 +648,26 @@ func (tx *Tx) page(id common.Pgid) *common.Page { return p } +// decompressedPage returns the decompressed version of a compressed page. +// Results are cached for the lifetime of the transaction. This method is +// safe for concurrent use by multiple goroutines, which is required because +// read-only transactions may be shared across goroutines. +func (tx *Tx) decompressedPage(p *common.Page) *common.Page { + if dp, ok := tx.decompressedPages.Load(p.Id()); ok { + return dp.(*common.Page) + } + + dp, _, err := common.DecompressPage(p, tx.db.pageSize) + if err != nil { + panic(fmt.Sprintf("decompress page %d: %v", p.Id(), err)) + } + + // LoadOrStore ensures that if two goroutines race to decompress the same + // page, only one result is kept and both get the same pointer. + actual, _ := tx.decompressedPages.LoadOrStore(p.Id(), dp) + return actual.(*common.Page) +} + // forEachPage iterates over every page within a given page and executes a function. func (tx *Tx) forEachPage(pgidnum common.Pgid, fn func(*common.Page, int, []common.Pgid)) { stack := make([]common.Pgid, 10) @@ -651,6 +678,11 @@ func (tx *Tx) forEachPage(pgidnum common.Pgid, fn func(*common.Page, int, []comm func (tx *Tx) forEachPageInternal(pgidstack []common.Pgid, fn func(*common.Page, int, []common.Pgid)) { p := tx.page(pgidstack[len(pgidstack)-1]) + // Decompress if needed so we can read page elements. + if p.IsCompressed() { + p = tx.decompressedPage(p) + } + // Execute function. fn(p, len(pgidstack)-1, pgidstack) diff --git a/tx_check.go b/tx_check.go index 59edf3573..5c1f6bc1d 100644 --- a/tx_check.go +++ b/tx_check.go @@ -92,6 +92,9 @@ func (tx *Tx) recursivelyCheckPage(pageId common.Pgid, reachable map[common.Pgid func (tx *Tx) recursivelyCheckBucketInPage(pageId common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool, kvStringer KVStringer, ch chan error) { p := tx.page(pageId) + if p.IsCompressed() { + p = tx.decompressedPage(p) + } switch { case p.IsBranchPage(): @@ -187,6 +190,9 @@ func (tx *Tx) recursivelyCheckPageKeyOrderInternal( keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) { p := tx.page(pgId) + if p.IsCompressed() { + p = tx.decompressedPage(p) + } pagesStack = append(pagesStack, pgId) switch { case p.IsBranchPage():