Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## Unreleased

### Performance

- **encode/decode:** configurable pooled buffer retention limit via `SetPoolBufferLimit(n)` — workloads with consistently large messages can raise the default 32 KiB threshold so pooled encoders/decoders keep their grown buffers across uses instead of re-allocating ([#62](https://github.com/Basekick-Labs/msgpack/issues/62)) (100KB payloads with a 256 KiB limit: MarshalAppend **-50% ns/op**, **3→1 allocs/op**; stream decode **-37% ns/op**, **3→1 allocs/op**)

---

## v6.1.0 (2026-04-27)

### Performance
Expand Down
66 changes: 66 additions & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,78 @@ import (
"encoding/json"
"io/ioutil"
"math"
"strings"
"testing"
"time"

"github.com/Basekick-Labs/msgpack/v6"
)

// Large-payload benchmarks for the pooled encoder/decoder buffer limit
// (issue #62). With the default 32KiB limit, every PutEncoder/PutDecoder
// drops the grown buffer and the next pooled use must re-grow it; raising
// the limit retains the buffer across pool round-trips.

func benchmarkMarshalAppendLarge(b *testing.B, n int) {
payload := make([]byte, n)
buf := make([]byte, 0, n+16)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
var err error
buf, err = msgpack.MarshalAppend(buf[:0], payload)
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkMarshalAppendLarge100KB(b *testing.B) {
b.Run("pool-limit-default", func(b *testing.B) {
benchmarkMarshalAppendLarge(b, 100<<10)
})
b.Run("pool-limit-256k", func(b *testing.B) {
msgpack.SetPoolBufferLimit(256 << 10)
defer msgpack.SetPoolBufferLimit(0)
benchmarkMarshalAppendLarge(b, 100<<10)
})
}

func benchmarkDecodeLargeStream(b *testing.B, n int) {
data, err := msgpack.Marshal(strings.Repeat("x", n))
if err != nil {
b.Fatal(err)
}
rd := bytes.NewReader(data)
var out string

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
rd.Reset(data)
dec := msgpack.GetDecoder()
dec.Reset(rd)
if err := dec.Decode(&out); err != nil {
b.Fatal(err)
}
msgpack.PutDecoder(dec)
}
}

func BenchmarkDecodeLargeStream100KB(b *testing.B) {
b.Run("pool-limit-default", func(b *testing.B) {
benchmarkDecodeLargeStream(b, 100<<10)
})
b.Run("pool-limit-256k", func(b *testing.B) {
msgpack.SetPoolBufferLimit(256 << 10)
defer msgpack.SetPoolBufferLimit(0)
benchmarkDecodeLargeStream(b, 100<<10)
})
}

func BenchmarkDiscard(b *testing.B) {
enc := msgpack.NewEncoder(ioutil.Discard)

Expand Down
2 changes: 1 addition & 1 deletion decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func PutDecoder(dec *Decoder) {
dec.bsr.data = nil
// Keep buf capacity for reuse, but drop oversized buffers to prevent
// the pool from retaining memory from large decode operations.
if cap(dec.buf) > 32*1024 {
if poolBufOversized(cap(dec.buf)) {
dec.buf = nil
} else if dec.buf != nil {
dec.buf = dec.buf[:0]
Expand Down
2 changes: 1 addition & 1 deletion encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func GetEncoder() *Encoder {
func PutEncoder(enc *Encoder) {
enc.w = nil
// Keep wbuf capacity for reuse, but drop oversized buffers.
if cap(enc.wbuf) > 32*1024 {
if poolBufOversized(cap(enc.wbuf)) {
enc.wbuf = nil
}
// Drop the interned-string dict if we own it and it grew large, so pool
Expand Down
49 changes: 49 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package msgpack

import "sync/atomic"

// defaultPoolBufferLimit is the default maximum capacity, in bytes, of
// internal buffers retained by pooled encoders and decoders.
const defaultPoolBufferLimit = 32 * 1024

// poolBufferLimit holds the configured limit. Zero means "use the default";
// it is read atomically so SetPoolBufferLimit is safe to call concurrently
// with PutEncoder/PutDecoder.
var poolBufferLimit atomic.Int64

// SetPoolBufferLimit sets the maximum capacity, in bytes, of internal
// buffers retained by pooled encoders and decoders (GetEncoder/PutEncoder,
// GetDecoder/PutDecoder, and the package-level Marshal/MarshalAppend/
// Unmarshal helpers that use them). Buffers whose capacity exceeds the
// limit are dropped when the encoder or decoder is returned to the pool,
// so one-off large operations don't pin memory.
//
// The default is 32 KiB. Workloads that consistently encode or decode
// larger payloads can raise the limit to avoid re-growing buffers on every
// pooled use. Values below the default are clamped to the default;
// n <= 0 restores the default.
func SetPoolBufferLimit(n int) {
if n <= 0 {
poolBufferLimit.Store(0)
return
}
if n < defaultPoolBufferLimit {
n = defaultPoolBufferLimit
}
poolBufferLimit.Store(int64(n))
}

func getPoolBufferLimit() int {
if n := poolBufferLimit.Load(); n > 0 {
return int(n)
}
return defaultPoolBufferLimit
}

// poolBufOversized reports whether a pooled buffer of capacity c should be
// dropped rather than retained. The constant default check comes first so
// the common small-buffer case never pays for the atomic load; the
// configured limit is never below the default.
func poolBufOversized(c int) bool {
return c > defaultPoolBufferLimit && c > getPoolBufferLimit()
}
80 changes: 80 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package msgpack

import "testing"

// These tests live in the msgpack package (not msgpack_test) because they
// inspect the unexported wbuf/buf fields to verify pool retention behavior.
// They rely on sync.Pool returning the just-Put item on the same goroutine,
// which holds absent a GC between Put and Get.

func TestPoolBufferLimitEncoder(t *testing.T) {
const big = 100 * 1024

// Default limit: an oversized wbuf is dropped on Put.
enc := GetEncoder()
enc.wbuf = make([]byte, big)
PutEncoder(enc)
enc = GetEncoder()
if cap(enc.wbuf) > defaultPoolBufferLimit {
t.Fatalf("wbuf cap=%d retained above default limit", cap(enc.wbuf))
}
PutEncoder(enc)

// Raised limit: the same buffer is retained.
SetPoolBufferLimit(256 * 1024)
defer SetPoolBufferLimit(0)

enc = GetEncoder()
enc.wbuf = make([]byte, big)
PutEncoder(enc)
enc = GetEncoder()
if cap(enc.wbuf) < big {
t.Fatalf("wbuf cap=%d, want >= %d retained under raised limit", cap(enc.wbuf), big)
}
PutEncoder(enc)
}

func TestPoolBufferLimitDecoder(t *testing.T) {
const big = 100 * 1024

dec := GetDecoder()
dec.buf = make([]byte, big)
PutDecoder(dec)
dec = GetDecoder()
if cap(dec.buf) > defaultPoolBufferLimit {
t.Fatalf("buf cap=%d retained above default limit", cap(dec.buf))
}
PutDecoder(dec)

SetPoolBufferLimit(256 * 1024)
defer SetPoolBufferLimit(0)

dec = GetDecoder()
dec.buf = make([]byte, big)
PutDecoder(dec)
dec = GetDecoder()
if cap(dec.buf) < big {
t.Fatalf("buf cap=%d, want >= %d retained under raised limit", cap(dec.buf), big)
}
PutDecoder(dec)
}
Comment on lines +10 to +60

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These tests rely on sync.Pool returning the exact same encoder/decoder instance that was just put back. However, sync.Pool behavior is non-deterministic: the pool can be cleared at any time by the garbage collector (GC), or it may return a new instance if the goroutine is rescheduled onto a different P or thread. When this happens, GetEncoder() or GetDecoder() will return a newly allocated instance with a nil buffer, causing the test to fail with a false positive (e.g., wbuf cap=0, want >= 102400 retained under raised limit).

To make the tests 100% deterministic and robust, we should test the underlying poolBufOversized function directly with various capacities and configured limits, rather than relying on sync.Pool side effects.

func TestPoolBufferLimit(t *testing.T) {
	// Test default limit (32 KiB)
	if poolBufOversized(16 * 1024) {
		t.Error("16 KiB buffer should not be oversized under default limit")
	}
	if poolBufOversized(32 * 1024) {
		t.Error("32 KiB buffer should not be oversized under default limit")
	}
	if !poolBufOversized(33 * 1024) {
		t.Error("33 KiB buffer should be oversized under default limit")
	}

	// Test raised limit (256 KiB)
	SetPoolBufferLimit(256 * 1024)
	defer SetPoolBufferLimit(0)

	if poolBufOversized(100 * 1024) {
		t.Error("100 KiB buffer should not be oversized under 256 KiB limit")
	}
	if poolBufOversized(256 * 1024) {
		t.Error("256 KiB buffer should not be oversized under 256 KiB limit")
	}
	if !poolBufOversized(257 * 1024) {
		t.Error("257 KiB buffer should be oversized under 256 KiB limit")
	}
}


func TestSetPoolBufferLimitClamp(t *testing.T) {
SetPoolBufferLimit(-1)
if got := getPoolBufferLimit(); got != defaultPoolBufferLimit {
t.Fatalf("limit=%d after SetPoolBufferLimit(-1), want default %d", got, defaultPoolBufferLimit)
}
SetPoolBufferLimit(64 * 1024)
if got := getPoolBufferLimit(); got != 64*1024 {
t.Fatalf("limit=%d, want %d", got, 64*1024)
}
// Values below the default are clamped up to the default.
SetPoolBufferLimit(1024)
if got := getPoolBufferLimit(); got != defaultPoolBufferLimit {
t.Fatalf("limit=%d after SetPoolBufferLimit(1024), want clamped default %d", got, defaultPoolBufferLimit)
}
SetPoolBufferLimit(0)
if got := getPoolBufferLimit(); got != defaultPoolBufferLimit {
t.Fatalf("limit=%d after reset, want default %d", got, defaultPoolBufferLimit)
}
}
Loading