From 7907e515764c6016ff842c841a47995f5a17be20 Mon Sep 17 00:00:00 2001 From: Ignacio Van Droogenbroeck Date: Fri, 12 Jun 2026 14:53:31 -0600 Subject: [PATCH 1/2] perf: configurable pool buffer retention limit via SetPoolBufferLimit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PutEncoder/PutDecoder drop internal buffers whose capacity exceeds 32KiB, so workloads with consistently larger messages re-grow the buffer on every pooled use. Add SetPoolBufferLimit(n) to raise the retention limit for such workloads. The default (32KiB) is unchanged; values below the default are clamped to it, n <= 0 restores it. The limit is stored atomically, and the Put path checks the constant default first so small buffers (the common case) never pay for the atomic load — small-message benchmarks are unchanged vs v6: StructMarshal 368.5n ± 0% 363.6n ± 1% -1.33% StructUnmarshal 338.8n ± 0% 338.9n ± 0% ~ 100KB payloads with the limit raised to 256KiB (count=8): MarshalAppendLarge100KB 9.31µs → 4.61µs (-50%) 104KiB → 24B/op 3 → 1 allocs DecodeLargeStream100KB 16.14µs → 10.23µs (-37%) 208KiB → 104KiB 3 → 1 allocs Closes #62 --- bench_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++ decode.go | 2 +- encode.go | 2 +- pool.go | 49 +++++++++++++++++++++++++++++++ pool_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 pool.go create mode 100644 pool_test.go diff --git a/bench_test.go b/bench_test.go index 77acc88..748c51d 100644 --- a/bench_test.go +++ b/bench_test.go @@ -5,12 +5,78 @@ import ( "encoding/json" "io/ioutil" "math" + "strings" "testing" "time" "github.com/Basekick-Labs/msgpack/v6" ) +// Large-payload benchmarks for the pooled encoder/decoder buffer limit +// (issue #62). With the default 32KiB limit, every PutEncoder/PutDecoder +// drops the grown buffer and the next pooled use must re-grow it; raising +// the limit retains the buffer across pool round-trips. + +func benchmarkMarshalAppendLarge(b *testing.B, n int) { + payload := make([]byte, n) + buf := make([]byte, 0, n+16) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var err error + buf, err = msgpack.MarshalAppend(buf[:0], payload) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkMarshalAppendLarge100KB(b *testing.B) { + b.Run("pool-limit-default", func(b *testing.B) { + benchmarkMarshalAppendLarge(b, 100<<10) + }) + b.Run("pool-limit-256k", func(b *testing.B) { + msgpack.SetPoolBufferLimit(256 << 10) + defer msgpack.SetPoolBufferLimit(0) + benchmarkMarshalAppendLarge(b, 100<<10) + }) +} + +func benchmarkDecodeLargeStream(b *testing.B, n int) { + data, err := msgpack.Marshal(strings.Repeat("x", n)) + if err != nil { + b.Fatal(err) + } + rd := bytes.NewReader(data) + var out string + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + rd.Reset(data) + dec := msgpack.GetDecoder() + dec.Reset(rd) + if err := dec.Decode(&out); err != nil { + b.Fatal(err) + } + msgpack.PutDecoder(dec) + } +} + +func BenchmarkDecodeLargeStream100KB(b *testing.B) { + b.Run("pool-limit-default", func(b *testing.B) { + benchmarkDecodeLargeStream(b, 100<<10) + }) + b.Run("pool-limit-256k", func(b *testing.B) { + msgpack.SetPoolBufferLimit(256 << 10) + defer msgpack.SetPoolBufferLimit(0) + benchmarkDecodeLargeStream(b, 100<<10) + }) +} + func BenchmarkDiscard(b *testing.B) { enc := msgpack.NewEncoder(ioutil.Discard) diff --git a/decode.go b/decode.go index 6e40351..bfcd522 100644 --- a/decode.go +++ b/decode.go @@ -48,7 +48,7 @@ func PutDecoder(dec *Decoder) { dec.bsr.data = nil // Keep buf capacity for reuse, but drop oversized buffers to prevent // the pool from retaining memory from large decode operations. - if cap(dec.buf) > 32*1024 { + if poolBufOversized(cap(dec.buf)) { dec.buf = nil } else if dec.buf != nil { dec.buf = dec.buf[:0] diff --git a/encode.go b/encode.go index 6420663..bda3d9a 100644 --- a/encode.go +++ b/encode.go @@ -55,7 +55,7 @@ func GetEncoder() *Encoder { func PutEncoder(enc *Encoder) { enc.w = nil // Keep wbuf capacity for reuse, but drop oversized buffers. - if cap(enc.wbuf) > 32*1024 { + if poolBufOversized(cap(enc.wbuf)) { enc.wbuf = nil } // Drop the interned-string dict if we own it and it grew large, so pool diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..ad0beac --- /dev/null +++ b/pool.go @@ -0,0 +1,49 @@ +package msgpack + +import "sync/atomic" + +// defaultPoolBufferLimit is the default maximum capacity, in bytes, of +// internal buffers retained by pooled encoders and decoders. +const defaultPoolBufferLimit = 32 * 1024 + +// poolBufferLimit holds the configured limit. Zero means "use the default"; +// it is read atomically so SetPoolBufferLimit is safe to call concurrently +// with PutEncoder/PutDecoder. +var poolBufferLimit atomic.Int64 + +// SetPoolBufferLimit sets the maximum capacity, in bytes, of internal +// buffers retained by pooled encoders and decoders (GetEncoder/PutEncoder, +// GetDecoder/PutDecoder, and the package-level Marshal/MarshalAppend/ +// Unmarshal helpers that use them). Buffers whose capacity exceeds the +// limit are dropped when the encoder or decoder is returned to the pool, +// so one-off large operations don't pin memory. +// +// The default is 32 KiB. Workloads that consistently encode or decode +// larger payloads can raise the limit to avoid re-growing buffers on every +// pooled use. Values below the default are clamped to the default; +// n <= 0 restores the default. +func SetPoolBufferLimit(n int) { + if n <= 0 { + poolBufferLimit.Store(0) + return + } + if n < defaultPoolBufferLimit { + n = defaultPoolBufferLimit + } + poolBufferLimit.Store(int64(n)) +} + +func getPoolBufferLimit() int { + if n := poolBufferLimit.Load(); n > 0 { + return int(n) + } + return defaultPoolBufferLimit +} + +// poolBufOversized reports whether a pooled buffer of capacity c should be +// dropped rather than retained. The constant default check comes first so +// the common small-buffer case never pays for the atomic load; the +// configured limit is never below the default. +func poolBufOversized(c int) bool { + return c > defaultPoolBufferLimit && c > getPoolBufferLimit() +} diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..c2b4f15 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,80 @@ +package msgpack + +import "testing" + +// These tests live in the msgpack package (not msgpack_test) because they +// inspect the unexported wbuf/buf fields to verify pool retention behavior. +// They rely on sync.Pool returning the just-Put item on the same goroutine, +// which holds absent a GC between Put and Get. + +func TestPoolBufferLimitEncoder(t *testing.T) { + const big = 100 * 1024 + + // Default limit: an oversized wbuf is dropped on Put. + enc := GetEncoder() + enc.wbuf = make([]byte, big) + PutEncoder(enc) + enc = GetEncoder() + if cap(enc.wbuf) > defaultPoolBufferLimit { + t.Fatalf("wbuf cap=%d retained above default limit", cap(enc.wbuf)) + } + PutEncoder(enc) + + // Raised limit: the same buffer is retained. + SetPoolBufferLimit(256 * 1024) + defer SetPoolBufferLimit(0) + + enc = GetEncoder() + enc.wbuf = make([]byte, big) + PutEncoder(enc) + enc = GetEncoder() + if cap(enc.wbuf) < big { + t.Fatalf("wbuf cap=%d, want >= %d retained under raised limit", cap(enc.wbuf), big) + } + PutEncoder(enc) +} + +func TestPoolBufferLimitDecoder(t *testing.T) { + const big = 100 * 1024 + + dec := GetDecoder() + dec.buf = make([]byte, big) + PutDecoder(dec) + dec = GetDecoder() + if cap(dec.buf) > defaultPoolBufferLimit { + t.Fatalf("buf cap=%d retained above default limit", cap(dec.buf)) + } + PutDecoder(dec) + + SetPoolBufferLimit(256 * 1024) + defer SetPoolBufferLimit(0) + + dec = GetDecoder() + dec.buf = make([]byte, big) + PutDecoder(dec) + dec = GetDecoder() + if cap(dec.buf) < big { + t.Fatalf("buf cap=%d, want >= %d retained under raised limit", cap(dec.buf), big) + } + PutDecoder(dec) +} + +func TestSetPoolBufferLimitClamp(t *testing.T) { + SetPoolBufferLimit(-1) + if got := getPoolBufferLimit(); got != defaultPoolBufferLimit { + t.Fatalf("limit=%d after SetPoolBufferLimit(-1), want default %d", got, defaultPoolBufferLimit) + } + SetPoolBufferLimit(64 * 1024) + if got := getPoolBufferLimit(); got != 64*1024 { + t.Fatalf("limit=%d, want %d", got, 64*1024) + } + // Values below the default are clamped up to the default. + SetPoolBufferLimit(1024) + if got := getPoolBufferLimit(); got != defaultPoolBufferLimit { + t.Fatalf("limit=%d after SetPoolBufferLimit(1024), want clamped default %d", got, defaultPoolBufferLimit) + } + SetPoolBufferLimit(0) + if got := getPoolBufferLimit(); got != defaultPoolBufferLimit { + t.Fatalf("limit=%d after reset, want default %d", got, defaultPoolBufferLimit) + } +} From 7215fbb27bb8a468b450b398245b20eec55b0910 Mon Sep 17 00:00:00 2001 From: Ignacio Van Droogenbroeck Date: Fri, 12 Jun 2026 14:57:50 -0600 Subject: [PATCH 2/2] docs: add SetPoolBufferLimit to CHANGELOG (review follow-up) Co-Authored-By: Claude Fable 5 --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c661dd7..2a19026 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## Unreleased + +### Performance + +- **encode/decode:** configurable pooled buffer retention limit via `SetPoolBufferLimit(n)` — workloads with consistently large messages can raise the default 32 KiB threshold so pooled encoders/decoders keep their grown buffers across uses instead of re-allocating ([#62](https://github.com/Basekick-Labs/msgpack/issues/62)) (100KB payloads with a 256 KiB limit: MarshalAppend **-50% ns/op**, **3→1 allocs/op**; stream decode **-37% ns/op**, **3→1 allocs/op**) + +--- + ## v6.1.0 (2026-04-27) ### Performance