waiters;
+ /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. Guarded by {@link #lock}. */
+ protected long nonPooledAvailableMemory;
private final Metrics metrics;
- private final Time time;
+ protected final Time time;
private final Sensor waitTime;
- private boolean closed;
+ protected boolean closed;
/**
* Create a new buffer pool
@@ -192,8 +195,7 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx
// signal any additional waiters if there is more memory left
// over for them
try {
- if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
- this.waiters.peekFirst().signal();
+ signalNextWaiterIfMemoryAvailable();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
@@ -211,6 +213,15 @@ protected void recordWaitTime(long timeNs) {
this.waitTime.record(timeNs, time.milliseconds());
}
+ /**
+ * Wake the longest-waiting thread if any memory (pooled or non-pooled) is available.
+ * Must be called with {@link #lock} held. No-op if no waiters or no memory is free.
+ */
+ protected void signalNextWaiterIfMemoryAvailable() {
+ if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
+ this.waiters.peekFirst().signal();
+ }
+
/**
* Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to
* available memory and signal the next waiter if it exists.
@@ -222,16 +233,24 @@ private ByteBuffer safeAllocateByteBuffer(int size) {
error = false;
return buffer;
} finally {
- if (error) {
- this.lock.lock();
- try {
- this.nonPooledAvailableMemory += size;
- if (!this.waiters.isEmpty())
- this.waiters.peekFirst().signal();
- } finally {
- this.lock.unlock();
- }
- }
+ if (error)
+ releaseReservedBytes(size);
+ }
+ }
+
+ /**
+ * Return previously-reserved non-pooled bytes to the pool and signal the next
+ * waiter. Acquires {@link #lock} internally. Used by callers
+ * that reserve memory and then need to roll back the reservation (e.g., upon errors).
+ */
+ protected void releaseReservedBytes(long bytes) {
+ this.lock.lock();
+ try {
+ this.nonPooledAvailableMemory += bytes;
+ if (!this.waiters.isEmpty())
+ this.waiters.peekFirst().signal();
+ } finally {
+ this.lock.unlock();
}
}
@@ -242,9 +261,9 @@ protected ByteBuffer allocateByteBuffer(int size) {
/**
* Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
- * buffers (if needed)
+ * buffers (if needed). Must be called with {@link #lock} held.
*/
- private void freeUp(int size) {
+ protected void freeUp(int size) {
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java
new file mode 100644
index 0000000000000..f2c8ba35f1568
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+
+/**
+ * A {@link BufferPool} dedicated to chunk-sized buffer reuse (chunk size = {@link #poolableSize()}).
+ *
+ * Adds {@link #allocateChunks(int, long)} to acquire multiple chunks atomically.
+ */
+public class ChunkedBufferPool extends BufferPool {
+
+ public ChunkedBufferPool(long memory, int chunkSize, Metrics metrics, Time time, String metricGrpName) {
+ super(memory, chunkSize, metrics, time, metricGrpName);
+ }
+
+ /**
+ * Allocate {@code ceil(totalSize / chunkSize)} chunk-sized buffers atomically, mirroring
+ * {@link BufferPool#allocate}: satisfied immediately if memory is available, else blocks up to
+ * {@code maxTimeToBlockMs} for the whole request (FIFO on {@link #waiters}).
+ * The reservation is tracked as bytes against {@link #nonPooledAvailableMemory} plus chunks polled
+ * from {@link #free}. Any failure refunds the whole reservation and signals
+ * the next waiter before the exception propagates, so no partial holds are visible during the wait.
+ *
+ * @param totalSize minimum total bytes of capacity required across the returned chunks
+ * @param maxTimeToBlockMs maximum time in milliseconds to block waiting for memory
+ * @return list of {@code ceil(totalSize / chunkSize)} {@code ByteBuffer}s, each of capacity
+ * {@code chunkSize}
+ * @throws InterruptedException if interrupted while waiting
+ * @throws IllegalArgumentException if {@code totalSize <= 0}, or if the request rounded up to
+ * whole chunks exceeds {@code totalMemory()}
+ * @throws BufferExhaustedException if the request can't be satisfied within {@code maxTimeToBlockMs}
+ * @throws KafkaException if the pool is closed during the wait
+ */
+ public List allocateChunks(int totalSize, long maxTimeToBlockMs) throws InterruptedException {
+ if (totalSize <= 0)
+ throw new IllegalArgumentException("totalSize must be positive: " + totalSize);
+ throwIfChunksNeededExceedsPool(totalSize);
+
+ int chunkSize = poolableSize();
+ int numChunks = (int) (((long) totalSize + chunkSize - 1L) / chunkSize);
+ long memoryRequired = (long) numChunks * chunkSize;
+
+ // Chunks taken from the free list. The remaining bytes are reserved against
+ // nonPooledAvailableMemory and materialized as raw allocations after the lock is released.
+ List pooled = new ArrayList<>(numChunks);
+
+ lock.lock();
+ if (this.closed) {
+ lock.unlock();
+ throw new KafkaException("Producer closed while allocating memory");
+ }
+ try {
+ long freeListBytes = (long) free.size() * chunkSize;
+ if (this.nonPooledAvailableMemory + freeListBytes >= memoryRequired) {
+ // Enough memory available to allocate the chunks needed
+ while (pooled.size() < numChunks && !free.isEmpty())
+ pooled.add(free.pollFirst());
+ long remainingBytes = memoryRequired - (long) pooled.size() * chunkSize;
+ if (remainingBytes > 0) {
+ // remainingBytes <= memoryRequired <= totalMemory (validated above), so the int cast is safe.
+ freeUp((int) remainingBytes);
+ this.nonPooledAvailableMemory -= remainingBytes;
+ }
+ } else {
+ // Not enough memory available to allocate the chunks needed, so we need to wait for memory.
+ // Same as in BufferPool.allocate, but wait to acquire the memory needed for all the chunks.
+ // A single Condition is added to the waiter's list to ensure FIFO fairness at the request level.
+ //
+ // `accumulated` tracks bytes drawn from nonPooledAvailableMemory only (pool chunks
+ // already taken live in `pooled`). Matches BufferPool.allocate's semantics: on
+ // failure, `accumulated` is exactly the amount to refund; on success it is reset to 0.
+ long accumulated = 0;
+ Condition moreMemory = lock.newCondition();
+ try {
+ long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
+ waiters.addLast(moreMemory);
+ while ((long) pooled.size() * chunkSize + accumulated < memoryRequired) {
+ long startWaitNs = time.nanoseconds();
+ long timeNs;
+ boolean waitingTimeElapsed;
+ try {
+ waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
+ } finally {
+ long endWaitNs = time.nanoseconds();
+ timeNs = Math.max(0L, endWaitNs - startWaitNs);
+ recordWaitTime(timeNs);
+ }
+
+ if (this.closed)
+ throw new KafkaException("Producer closed while allocating memory");
+
+ if (waitingTimeElapsed) {
+ throw new BufferExhaustedException("Failed to allocate " + memoryRequired
+ + " bytes (" + numChunks + " chunks of " + chunkSize
+ + ") within the configured max blocking time " + maxTimeToBlockMs
+ + " ms. Total memory: " + totalMemory() + " bytes. Available memory: "
+ + availableMemory() + " bytes.");
+ }
+
+ remainingTimeToBlockNs -= timeNs;
+
+ // Take pool chunks first, then reserve non-pool bytes for the remainder.
+ while (pooled.size() < numChunks
+ && (long) (pooled.size() + 1) * chunkSize + accumulated <= memoryRequired
+ && !free.isEmpty()) {
+ pooled.add(free.pollFirst());
+ }
+ long stillNeeded = memoryRequired - (long) pooled.size() * chunkSize - accumulated;
+ if (stillNeeded > 0) {
+ freeUp((int) stillNeeded);
+ long got = Math.min(stillNeeded, this.nonPooledAvailableMemory);
+ this.nonPooledAvailableMemory -= got;
+ accumulated += got;
+ }
+ }
+ // Clear the rollback tracker.
+ accumulated = 0;
+ } finally {
+ // On failure (timeout / close / interrupt), refund the non-pool bytes taken.
+ // Pool chunks already in `pooled` are returned to `free` separately by the
+ // outer catch.
+ this.nonPooledAvailableMemory += accumulated;
+ waiters.remove(moreMemory);
+ }
+ }
+ } catch (RuntimeException | InterruptedException e) {
+ // Any chunks taken from the pool must be returned.
+ for (ByteBuffer chunk : pooled)
+ free.addFirst(chunk);
+ throw e;
+ } finally {
+ try {
+ signalNextWaiterIfMemoryAvailable();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Allocate raw chunks for the reserved non-pooled portion outside the lock. On error,
+ // refund all reserved bytes (memoryRequired) and let the next waiter try.
+ int chunksStillNeeded = numChunks - pooled.size();
+ List result = new ArrayList<>(numChunks);
+ result.addAll(pooled);
+ boolean error = true;
+ try {
+ for (int i = 0; i < chunksStillNeeded; i++)
+ result.add(allocateByteBuffer(chunkSize));
+ error = false;
+ return result;
+ } finally {
+ if (error) {
+ // The pooled buffers we already drained are also lost on this path; mirror
+ // BufferPool.safeAllocateByteBuffer's behaviour (the bytes return, the ByteBuffer
+ // instances become garbage).
+ releaseReservedBytes(memoryRequired);
+ }
+ }
+ }
+
+ /**
+ * Throw if rounding {@code totalSize} up to whole chunks would exceed the pool.
+ */
+ private void throwIfChunksNeededExceedsPool(int totalSize) {
+ int chunkSize = poolableSize();
+ int numChunks = (int) (((long) totalSize + chunkSize - 1L) / chunkSize);
+ long memoryRequired = (long) numChunks * chunkSize;
+ if (memoryRequired > totalMemory())
+ throw new IllegalArgumentException("Attempt to allocate " + totalSize + " bytes ("
+ + numChunks + " chunks of " + chunkSize + " = " + memoryRequired + " bytes), but the "
+ + "hard limit on memory allocations is " + totalMemory() + ".");
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedByteBufferOutputStream.java
new file mode 100644
index 0000000000000..101a5b15cabdc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedByteBufferOutputStream.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link ByteBufferOutputStream} backed by a linked list of fixed-size chunks instead of a single
+ * re-allocated buffer. Chunks are supplied by the caller (initial chunks via the constructor,
+ * additional chunks via {@link #addBuffers(List)}).
+ *
+ * Current/temporary behavior:
+ *
+ * - The stream does not grow on its own: a write whose size exceeds the remaining free bytes
+ * across all attached chunks throws {@link IllegalStateException}, so the caller must attach
+ * enough chunks before any such write.
+ * TODO: KAFKA-20579 (automatic mid-write growth for compression support).
+ * - {@link #buffer()} returns the written bytes as a single contiguous {@link ByteBuffer},
+ * flattening all chunks into a new buffer with an extra copy.
+ * TODO: KAFKA-20580 (remove the extra copy on send, scatter-gather send).
+ *
+ */
+public class ChunkedByteBufferOutputStream extends ByteBufferOutputStream {
+
+ private final List chunks;
+ private final int chunkSize;
+ private final BufferPool pool;
+ private ByteBuffer currentChunk;
+ private int currentChunkIndex;
+ private ByteBuffer flattenedBuffer;
+ private boolean dirty;
+
+ /**
+ * Constructs a chunked output stream backed by the given pre-allocated chunks. Ownership of
+ * {@code initialChunks} transfers to this stream (they will be returned to the pool via
+ * {@link #deallocate()}).
+ *
+ * @param initialChunks pre-allocated chunks. Must be non-empty and each chunk's capacity must
+ * equal {@code chunkSize}
+ * @param chunkSize the size of each chunk in bytes
+ * @param pool the buffer pool used for deallocation
+ */
+ public ChunkedByteBufferOutputStream(List initialChunks, int chunkSize, BufferPool pool) {
+ super(validatedFirstChunk(initialChunks, chunkSize));
+ this.chunkSize = chunkSize;
+ this.pool = pool;
+ this.chunks = new ArrayList<>(initialChunks);
+ this.currentChunk = this.chunks.get(0);
+ this.currentChunkIndex = 0;
+ this.dirty = true;
+ }
+
+ /**
+ * Validates the chunk contract: {@code initialChunks} non-empty, each chunk's capacity equal to
+ * {@code chunkSize}. Returns the first chunk.
+ */
+ private static ByteBuffer validatedFirstChunk(List initialChunks, int chunkSize) {
+ if (initialChunks == null || initialChunks.isEmpty())
+ throw new IllegalArgumentException("initialChunks must be non-empty");
+ for (ByteBuffer chunk : initialChunks) {
+ if (chunk.capacity() != chunkSize)
+ throw new IllegalArgumentException("each chunk must have capacity " + chunkSize
+ + ", but found a chunk of capacity " + chunk.capacity());
+ }
+ return initialChunks.get(0);
+ }
+
+ @Override
+ public void write(int b) {
+ ensureChunkCapacity(1);
+ currentChunk.put((byte) b);
+ dirty = true;
+ }
+
+ @Override
+ public void write(byte[] bytes, int off, int len) {
+ while (len > 0) {
+ ensureChunkCapacity(1);
+ int toWrite = Math.min(len, currentChunk.remaining());
+ currentChunk.put(bytes, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ }
+ dirty = true;
+ }
+
+ @Override
+ public void write(ByteBuffer sourceBuffer) {
+ while (sourceBuffer.hasRemaining()) {
+ ensureChunkCapacity(1);
+ int toWrite = Math.min(sourceBuffer.remaining(), currentChunk.remaining());
+ int oldLimit = sourceBuffer.limit();
+ sourceBuffer.limit(sourceBuffer.position() + toWrite);
+ currentChunk.put(sourceBuffer);
+ sourceBuffer.limit(oldLimit);
+ }
+ dirty = true;
+ }
+
+ private void ensureChunkCapacity(int needed) {
+ if (currentChunk.remaining() < needed) {
+ advanceToNextChunk();
+ }
+ }
+
+ /**
+ * Advances {@code currentChunk} to the next pre-supplied chunk.
+ */
+ private void advanceToNextChunk() {
+ if (currentChunkIndex + 1 >= chunks.size()) {
+ // TODO: KAFKA-20579. With compression support, grow here instead of throwing.
+ throw new IllegalStateException("write exceeded the stream's remaining chunk capacity");
+ }
+ currentChunkIndex++;
+ currentChunk = chunks.get(currentChunkIndex);
+ }
+
+ /**
+ * Appends pre-allocated chunks to this stream. Ownership of {@code newChunks} transfers to
+ * the stream; they will be returned to the pool via {@link #deallocate()}.
+ */
+ public void addBuffers(List newChunks) {
+ chunks.addAll(newChunks);
+ }
+
+ @Override
+ public ByteBuffer buffer() {
+ if (flattenedBuffer != null && !dirty) {
+ return flattenedBuffer;
+ }
+ // TODO: KAFKA-20687. This flatten runs at batch close, when the chunk set is final.
+ // Today all chunks (used and unused) are returned to the pool only when the batch
+ // completes (via deallocate(pool)). Consider releasing the fully-unused chunks early, here.
+ int totalSize = 0;
+ for (ByteBuffer chunk : chunks) {
+ totalSize += chunk.position();
+ }
+ flattenedBuffer = ByteBuffer.allocate(totalSize);
+ for (ByteBuffer chunk : chunks) {
+ int chunkPos = chunk.position();
+ chunk.flip();
+ flattenedBuffer.put(chunk);
+ chunk.limit(chunk.capacity());
+ chunk.position(chunkPos);
+ }
+ dirty = false;
+ // The bytes are now copied into flattenedBuffer, but we intentionally do not release the
+ // chunks until batch completion, so the in-flight data stays reserved against the
+ // buffer.memory budget (the pool's available memory reflects it), consistent with the
+ // "full" strategy. Releasing here would return that memory to the pool while the bytes are
+ // still in flight in the heap copy, letting the pool admit more than buffer.memory intends.
+ // This flattening is an initial approach and will be removed with KAFKA-20580.
+ return flattenedBuffer;
+ }
+
+ /**
+ * Total bytes written across all chunks.
+ */
+ @Override
+ public int position() {
+ int total = 0;
+ for (ByteBuffer chunk : chunks) {
+ total += chunk.position();
+ }
+ return total;
+ }
+
+ /**
+ * Sets the write position, walking across pre-supplied chunks if the requested position
+ * exceeds the first chunk's capacity. Only valid before any write.
+ */
+ @Override
+ public void position(int position) {
+ if (currentChunkIndex != 0 || currentChunk.position() != 0) {
+ throw new IllegalStateException("position() can only be called before any writes");
+ }
+ int remaining = position;
+ int idx = 0;
+ while (remaining > 0 && idx < chunks.size()) {
+ ByteBuffer chunk = chunks.get(idx);
+ int take = Math.min(remaining, chunk.capacity());
+ chunk.position(take);
+ remaining -= take;
+ if (remaining > 0)
+ idx++;
+ }
+ if (remaining > 0) {
+ throw new IllegalArgumentException("position " + position
+ + " exceeds total pre-allocated capacity");
+ }
+ currentChunkIndex = idx;
+ currentChunk = chunks.get(idx);
+ dirty = true;
+ }
+
+ /**
+ * Total bytes available across the current chunk and every queued (not-yet-active) chunk.
+ */
+ @Override
+ public int remaining() {
+ if (currentChunk == null) // after deallocate no chunks attached, no free capacity
+ return 0;
+ int total = currentChunk.remaining();
+ for (int i = currentChunkIndex + 1; i < chunks.size(); i++)
+ total += chunks.get(i).remaining();
+ return total;
+ }
+
+ @Override
+ public int limit() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int initialCapacity() {
+ return chunkSize;
+ }
+
+ @Override
+ public void ensureRemaining(int remainingBytesRequired) {
+ // A single call can guarantee at most `chunkSize` of space (the stream advances one chunk
+ // at a time). Callers needing more attach chunks via addBuffers first. write(byte[]) loops
+ // across chunks, so contiguous capacity isn't required.
+ ensureChunkCapacity(Math.min(remainingBytesRequired, chunkSize));
+ }
+
+ /**
+ * Returns all pool-allocated chunks to the buffer pool. Called at batch completion.
+ */
+ public void deallocate(BufferPool pool) {
+ if (pool != null) {
+ for (ByteBuffer chunk : chunks) {
+ pool.deallocate(chunk);
+ }
+ }
+ chunks.clear();
+ currentChunk = null;
+ flattenedBuffer = null;
+ }
+
+ public void deallocate() {
+ deallocate(pool);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedProducerBatch.java
new file mode 100644
index 0000000000000..05c290737fe24
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedProducerBatch.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link ProducerBatch} for the incremental buffer.memory allocation strategy, backed
+ * by a {@link MemoryRecordsBuilder} whose stream is a {@link ChunkedByteBufferOutputStream}.
+ * It adds mid-batch chunk extension support ({@link #extensionBytesNeeded} /
+ * {@link #addBuffers}) and overrides the pool deallocation hooks so all chunks are returned to
+ * the pool rather than a single buffer.
+ *
+ * This class is not thread safe and external synchronization must be used when modifying it.
+ */
+public class ChunkedProducerBatch extends ProducerBatch {
+
+ // The parent's builder reference is private; keep our own to access stream capacity state.
+ private final MemoryRecordsBuilder recordsBuilder;
+
+ public ChunkedProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) {
+ super(tp, recordsBuilder, createdMs);
+ this.recordsBuilder = recordsBuilder;
+ }
+
+ /**
+ * Bytes of chunk capacity this batch needs before {@code tryAppend} could accept the given
+ * record. Returns 0 when no extension is needed: the batch is empty (first record), it is at
+ * its batch-size limit, or the attached chunk capacity already has room. Positive when the
+ * record is within the batch-size limit but the attached chunks lack capacity — the accumulator
+ * then allocates exactly the missing bytes (rounded up to whole chunks) and attaches them via
+ * {@link #addBuffers} before retrying.
+ */
+ int extensionBytesNeeded(long timestamp, byte[] key, byte[] value, Header[] headers) {
+ if (recordCount == 0)
+ return 0;
+ if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers))
+ return 0;
+ // Size against the batch's projected total output after this record (header counted once,
+ // ratio-adjusted when compressed), not per-record. Per-record sizing would over-count the
+ // header and miss the compressor's flush-accumulation behavior.
+ int target = recordsBuilder.estimatedBytesWrittenAfter(key, value, headers);
+ ByteBufferOutputStream stream = recordsBuilder.bufferStream();
+ int totalAttachedCapacity = stream.position() + stream.remaining();
+ return Math.max(0, target - totalAttachedCapacity);
+ }
+
+ /**
+ * Attach pre-allocated chunks to this batch's stream so the next {@code tryAppend} can
+ * spill into them. Ownership of the chunks transfers to the stream.
+ */
+ void addBuffers(List chunks) {
+ stream().addBuffers(chunks);
+ }
+
+ @Override
+ protected void deallocateBuffer(BufferPool pool) {
+ stream().deallocate(pool);
+ }
+
+ /**
+ * Unlike the single-buffer batch — which must donate a fresh buffer because the network
+ * layer may still be reading the pooled one — a chunked batch's inflight bytes live in the
+ * separate flattened buffer (see {@link ChunkedByteBufferOutputStream#buffer()}), so it is
+ * safe to return the actual chunks to the pool here.
+ */
+ @Override
+ protected void deallocateInflightBuffer(BufferPool pool) {
+ stream().deallocate(pool);
+ }
+
+ private ChunkedByteBufferOutputStream stream() {
+ return (ChunkedByteBufferOutputStream) recordsBuilder.bufferStream();
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedRecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedRecordAccumulator.java
new file mode 100644
index 0000000000000..297dac0dca91f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedRecordAccumulator.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.record.internal.AbstractRecords;
+import org.apache.kafka.common.record.internal.CompressionRatioEstimator;
+import org.apache.kafka.common.record.internal.CompressionType;
+import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+
+/**
+ * A {@link RecordAccumulator} variant that backs each batch with fixed-size chunks drawn from a
+ * {@link ChunkedBufferPool}, attaching more chunks on demand as records are appended instead of
+ * reserving {@code batch.size} per batch up front. Buffered memory therefore scales with the data
+ * actually written rather than with {@code active_partition_count × batch.size}.
+ *
+ * See {@link #append} and {@link #tryAppend} for how batches are created and grown.
+ *
+ * TODO: support compressed data (with mid-record growth); the constructor rejects compression for now.
+ */
+public class ChunkedRecordAccumulator extends RecordAccumulator {
+
+ /**
+ * Fixed size of every chunk, independent of {@code batch.size}. The incremental strategy is
+ * only used when {@code batch.size >= CHUNK_SIZE} (see {@code KafkaProducer}); below it a batch
+ * is smaller than a single chunk, so the producer uses the full strategy instead.
+ */
+ public static final int CHUNK_SIZE = 16 * 1024;
+
+ private final ChunkedBufferPool chunkedFree;
+
+ public ChunkedRecordAccumulator(LogContext logContext,
+ int batchSize,
+ Compression compression,
+ int lingerMs,
+ long retryBackoffMs,
+ long retryBackoffMaxMs,
+ int deliveryTimeoutMs,
+ PartitionerConfig partitionerConfig,
+ Metrics metrics,
+ String metricGrpName,
+ Time time,
+ TransactionManager transactionManager,
+ ChunkedBufferPool bufferPool) {
+ super(logContext, batchSize, compression, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+ deliveryTimeoutMs, partitionerConfig, metrics, metricGrpName, time, transactionManager, bufferPool);
+ // TODO: drop this once the incremental strategy supports compressed data (with the
+ // mid-record growth fallback for compressor overshoot).
+ if (compression.type() != CompressionType.NONE)
+ throw new UnsupportedOperationException(
+ "Compression is not yet supported with the incremental buffer.memory allocation strategy");
+ this.chunkedFree = bufferPool;
+ }
+
+ public ChunkedRecordAccumulator(LogContext logContext,
+ int batchSize,
+ Compression compression,
+ int lingerMs,
+ long retryBackoffMs,
+ long retryBackoffMaxMs,
+ int deliveryTimeoutMs,
+ Metrics metrics,
+ String metricGrpName,
+ Time time,
+ TransactionManager transactionManager,
+ ChunkedBufferPool bufferPool) {
+ this(logContext, batchSize, compression, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+ deliveryTimeoutMs, new PartitionerConfig(), metrics, metricGrpName, time, transactionManager,
+ bufferPool);
+ }
+
+ @Override
+ public RecordAppendResult append(String topic,
+ int partition,
+ long timestamp,
+ byte[] key,
+ byte[] value,
+ Header[] headers,
+ AppendCallbacks callbacks,
+ long maxTimeToBlock,
+ long nowMs,
+ Cluster cluster) throws InterruptedException {
+ TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic,
+ k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize, partitionerRackAware, rack)));
+
+ appendsInProgress.incrementAndGet();
+ ChunkedByteBufferOutputStream bufferStream = null;
+ List extensionChunks = null;
+ int extensionBytes;
+ if (headers == null) headers = Record.EMPTY_HEADERS;
+ try {
+ while (true) {
+ final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+ final int effectivePartition;
+ if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+ partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+ effectivePartition = partitionInfo.partition();
+ } else {
+ partitionInfo = null;
+ effectivePartition = partition;
+ }
+ setPartition(callbacks, effectivePartition);
+
+ Deque dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+ synchronized (dq) {
+ if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
+ continue;
+
+ // The tryAppend override checks the open batch (dq.peekLast()) for chunk
+ // capacity: a needsBufferExtension result means it is within its batch-size limit
+ // but its chunks lack capacity for this record — fall through to allocate the gap
+ // outside the deque lock. A null result means there is no open batch (it is full
+ // or absent) — fall through to the first-record (new batch) path.
+ RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
+ if (appendResult != null && !appendResult.needsBufferExtension) {
+ boolean enableSwitch = allBatchesFull(dq);
+ topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
+ return appendResult;
+ }
+ extensionBytes = appendResult == null ? 0 : appendResult.extensionBytesNeeded;
+ }
+
+ if (extensionBytes > 0) {
+ // Mid-batch extension: non-blocking only. The thread already holds the open
+ // batch's chunks, so blocking here could deadlock with the Sender (which frees
+ // pool memory by completing batches). On exhaustion, close the batch (making it
+ // drainable) and fall through to the blocking first-record path next iteration.
+ try {
+ extensionChunks = chunkedFree.allocateChunks(extensionBytes, 0L);
+ } catch (BufferExhaustedException e) {
+ log.trace("Pool exhausted while extending batch for topic {} partition {}; closing existing batch",
+ topic, effectivePartition);
+ synchronized (dq) {
+ ProducerBatch last = dq.peekLast();
+ if (last != null && last.isWritable()) {
+ last.closeForRecordAppends();
+ }
+ }
+ continue;
+ }
+ nowMs = time.milliseconds();
+ } else if (extensionBytes == 0 && bufferStream == null) {
+ // First-record path: block on the pool for enough chunks to fit this record,
+ // sized with the same cumulative estimator used mid-batch (header + record bytes
+ // for NONE, ratio-adjusted when compressed) so the two stay consistent.
+ int recordUncompressed = AbstractRecords.recordSizeUpperBound(
+ RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers);
+ int size = MemoryRecordsBuilder.estimatedBytesWritten(
+ RecordBatch.CURRENT_MAGIC_VALUE, compression.type(),
+ CompressionRatioEstimator.estimation(topic, compression.type()),
+ recordUncompressed);
+ log.trace("Allocating {} byte chunked buffer ({} byte chunks) for topic {} partition {} with remaining timeout {}ms",
+ size, chunkedFree.poolableSize(), topic, effectivePartition, maxTimeToBlock);
+ List initialChunks = chunkedFree.allocateChunks(size, maxTimeToBlock);
+ nowMs = time.milliseconds();
+ bufferStream = new ChunkedByteBufferOutputStream(initialChunks, chunkedFree.poolableSize(), chunkedFree);
+ }
+
+ synchronized (dq) {
+ if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster)) {
+ // The partition switched while we allocated extension chunks off-lock. They
+ // were sized against the previous partition's open batch, so they must not be
+ // attached to a different partition's open batch — refund them and let the next
+ // iteration re-check the new partition from scratch.
+ if (extensionChunks != null) {
+ for (ByteBuffer chunk : extensionChunks)
+ chunkedFree.deallocate(chunk);
+ extensionChunks = null;
+ }
+ continue;
+ }
+
+ if (extensionChunks != null) {
+ ProducerBatch last = dq.peekLast();
+ // The off-lock allocateChunks window allows the open batch we checked to be
+ // drained and replaced — possibly by a split batch (a plain
+ // ProducerBatch), which can't take extension chunks. Only attach to a
+ // writable chunked batch; otherwise refund the chunks and re-evaluate.
+ if (last instanceof ChunkedProducerBatch && last.isWritable()) {
+ ((ChunkedProducerBatch) last).addBuffers(extensionChunks);
+ extensionChunks = null;
+ RecordAppendResult retryResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
+ if (retryResult != null && !retryResult.needsBufferExtension) {
+ boolean enableSwitch = allBatchesFull(dq);
+ topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, retryResult.appendedBytes, cluster, enableSwitch);
+ return retryResult;
+ }
+ // needsBufferExtension: a concurrent appender consumed capacity our
+ // extension was sized against — loop to re-check. null: batch became
+ // full — loop into new-batch creation. Terminates because writeLimit is
+ // fixed: once full, the check stops requesting extension.
+ continue;
+ }
+ // The open batch is gone, closed, or non-chunked (e.g., a split batch). Return chunks to pool.
+ for (ByteBuffer chunk : extensionChunks)
+ chunkedFree.deallocate(chunk);
+ extensionChunks = null;
+ continue;
+ }
+
+ // First-record path: extensionChunks == null here implies extensionBytes == 0,
+ // so bufferStream was allocated (this iteration or carried from a prior one).
+ assert bufferStream != null;
+ int firstRecordSize = AbstractRecords.estimateSizeInBytesUpperBound(
+ RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers);
+ final ChunkedByteBufferOutputStream batchStream = bufferStream;
+ RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks,
+ () -> chunkedRecordsBuilder(batchStream, firstRecordSize), nowMs);
+ if (appendResult.needsBufferExtension) {
+ // A concurrent appender created an open batch we should extend rather
+ // than start a new one (detected by appendNewBatch's in-lock tryAppend).
+ // Our bufferStream was sized for a fresh batch — release it and loop so
+ // the extension path allocates exactly the gap-sized chunks.
+ bufferStream.deallocate();
+ bufferStream = null;
+ continue;
+ }
+ if (appendResult.newBatchCreated)
+ bufferStream = null;
+ boolean enableSwitch = allBatchesFull(dq);
+ topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
+ return appendResult;
+ }
+ }
+ } finally {
+ if (bufferStream != null)
+ bufferStream.deallocate();
+ if (extensionChunks != null) {
+ for (ByteBuffer chunk : extensionChunks)
+ chunkedFree.deallocate(chunk);
+ }
+ appendsInProgress.decrementAndGet();
+ }
+ }
+
+ /**
+ * Try to append to a ProducerBatch, with mid-batch chunk extension support.
+ *
+ * If the open batch is within its batch-size limit but its chunked stream lacks chunk
+ * capacity, returns {@link RecordAppendResult#needsExtension(int)} without
+ * attempting the append; the caller allocates chunks outside the deque lock, attaches
+ * them, and retries. Otherwise defers to the parent.
+ */
+ @Override
+ protected RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
+ Callback callback, Deque deque, long nowMs) {
+ if (closed)
+ throw new KafkaException("Producer closed while send in progress");
+ ProducerBatch last = deque.peekLast();
+ // Split batches in an incremental deque are plain ProducerBatch (heap-backed, grow-on-demand)
+ // and never need chunk extension, so the check only applies to chunked batches.
+ if (last instanceof ChunkedProducerBatch) {
+ int extensionBytes = ((ChunkedProducerBatch) last).extensionBytesNeeded(timestamp, key, value, headers);
+ if (extensionBytes > 0)
+ return RecordAppendResult.needsExtension(extensionBytes);
+ }
+ return super.tryAppend(timestamp, key, value, headers, callback, deque, nowMs);
+ }
+
+ @Override
+ protected ProducerBatch createProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long nowMs) {
+ return new ChunkedProducerBatch(tp, recordsBuilder, nowMs);
+ }
+
+ /**
+ * Build a {@link MemoryRecordsBuilder} backed by the chunked stream. Its {@code writeLimit}
+ * bounds when the batch is full (the same batch-size limit as the full path), while chunk
+ * capacity grows on demand via {@link ChunkedByteBufferOutputStream#addBuffers(List)}.
+ */
+ private MemoryRecordsBuilder chunkedRecordsBuilder(ChunkedByteBufferOutputStream bufferStream,
+ int firstRecordSize) {
+ int writeLimit = Math.max(batchSize, firstRecordSize);
+ return new MemoryRecordsBuilder(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE, compression,
+ TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 14d0d6ef6334d..6adac7a24424d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -57,7 +57,7 @@
*
* This class is not thread safe and external synchronization must be used when modifying it
*/
-public final class ProducerBatch {
+public class ProducerBatch {
private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);
@@ -139,6 +139,25 @@ boolean hasLeaderChangedForTheOngoingRetry() {
}
+ /**
+ * Return this batch's buffer memory to the pool. The default single-buffer batch returns
+ * the pooled buffer at its initial capacity; {@link ChunkedProducerBatch} overrides this
+ * to return all of its chunks.
+ */
+ protected void deallocateBuffer(BufferPool pool) {
+ pool.deallocate(buffer(), initialCapacity());
+ }
+
+ /**
+ * Credit this batch's memory back to the pool when it is unexpectedly still inflight
+ * (KAFKA-19012): the buffer can't be touched (the network may still be reading it), so the
+ * default donates a fresh same-capacity buffer. {@link ChunkedProducerBatch} instead returns
+ * the actual chunks (safe — inflight bytes live in the separate flattened buffer).
+ */
+ protected void deallocateInflightBuffer(BufferPool pool) {
+ pool.deallocate(ByteBuffer.allocate(initialCapacity()));
+ }
+
/**
* Append the record to the current record set and return the relative offset within that record set
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index fd1cae3d8e4cc..c43952e96c5a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
/**
* This class acts as a queue that accumulates records into {@link MemoryRecords}
@@ -69,23 +70,23 @@
*/
public class RecordAccumulator {
- private final LogContext logContext;
- private final Logger log;
- private volatile boolean closed;
+ protected final LogContext logContext;
+ protected final Logger log;
+ protected volatile boolean closed;
private final AtomicInteger flushesInProgress;
- private final AtomicInteger appendsInProgress;
- private final int batchSize;
- private final Compression compression;
+ protected final AtomicInteger appendsInProgress;
+ protected final int batchSize;
+ protected final Compression compression;
private final int lingerMs;
private final ExponentialBackoff retryBackoff;
private final int deliveryTimeoutMs;
private final long partitionAvailabilityTimeoutMs; // latency threshold for marking partition temporary unavailable
- private final boolean partitionerRackAware;
- private final String rack;
+ protected final boolean partitionerRackAware;
+ protected final String rack;
private final boolean enableAdaptivePartitioning;
private final BufferPool free;
- private final Time time;
- private final ConcurrentMap topicInfoMap = new CopyOnWriteMap<>();
+ protected final Time time;
+ protected final ConcurrentMap topicInfoMap = new CopyOnWriteMap<>();
private final ConcurrentMap nodeStats = new CopyOnWriteMap<>();
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
@@ -218,7 +219,7 @@ private void registerMetrics(Metrics metrics, String metricGrpName) {
(config, now) -> free.availableMemory());
}
- private void setPartition(AppendCallbacks callbacks, int partition) {
+ protected void setPartition(AppendCallbacks callbacks, int partition) {
if (callbacks != null)
callbacks.setPartition(partition);
}
@@ -235,7 +236,7 @@ private void setPartition(AppendCallbacks callbacks, int partition) {
* @return 'true' if partition changed and we need to get new partition info and retry,
* 'false' otherwise
*/
- private boolean partitionChanged(String topic,
+ protected boolean partitionChanged(String topic,
TopicInfo topicInfo,
BuiltInPartitioner.StickyPartitionInfo partitionInfo,
Deque deque, long nowMs,
@@ -348,7 +349,10 @@ public RecordAppendResult append(String topic,
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
- RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
+ final ByteBuffer batchBuffer = buffer;
+ RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks,
+ () -> MemoryRecords.builder(batchBuffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L),
+ nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
@@ -375,10 +379,13 @@ public RecordAppendResult append(String topic,
* @param value The value for the record
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
- * @param buffer The buffer for the new batch
+ * @param recordsBuilderSupplier Supplies the {@link MemoryRecordsBuilder} for the new
+ * batch. Invoked lazily, only when a new batch is actually created. The chunked
+ * subclass passes a supplier that produces a builder backed by a
+ * {@link ChunkedByteBufferOutputStream}.
* @param nowMs The current time, in milliseconds
*/
- private RecordAppendResult appendNewBatch(String topic,
+ protected RecordAppendResult appendNewBatch(String topic,
int partition,
Deque dq,
long timestamp,
@@ -386,18 +393,21 @@ private RecordAppendResult appendNewBatch(String topic,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
- ByteBuffer buffer,
+ Supplier recordsBuilderSupplier,
long nowMs) {
assert partition != RecordMetadata.UNKNOWN_PARTITION;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+ // Propagate without creating a new batch: either another thread already made us a batch
+ // (success), or — incremental strategy — a concurrent appender created an extendable open batch
+ // (needsBufferExtension), so the caller releases its pre-allocated buffer and retries via
+ // the extension path.
return appendResult;
}
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
- ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
+ MemoryRecordsBuilder recordsBuilder = recordsBuilderSupplier.get();
+ ProducerBatch batch = createProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
@@ -407,14 +417,18 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes());
}
- private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
- return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L);
+ /**
+ * Create the {@link ProducerBatch} for a new batch. The incremental strategy overrides this to
+ * create a {@link ChunkedProducerBatch}.
+ */
+ protected ProducerBatch createProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long nowMs) {
+ return new ProducerBatch(tp, recordsBuilder, nowMs);
}
/**
* Check if all batches in the queue are full.
*/
- private boolean allBatchesFull(Deque deque) {
+ protected boolean allBatchesFull(Deque deque) {
// Only the last batch may be incomplete, so we just check that.
ProducerBatch last = deque.peekLast();
return last == null || last.isFull();
@@ -428,7 +442,7 @@ private boolean allBatchesFull(Deque deque) {
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
- private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
+ protected RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque deque, long nowMs) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
@@ -1055,11 +1069,12 @@ public void deallocate(ProducerBatch batch) {
} else {
batch.markBufferDeallocated();
if (batch.isInflight()) {
- // Create a fresh ByteBuffer to give to BufferPool to reuse since we can't safely call deallocate with the ProduceBatch's buffer
- free.deallocate(ByteBuffer.allocate(batch.initialCapacity()));
+ // We can't safely deallocate the buffer of an inflight batch; the batch credits
+ // the memory back to the pool in a way that suits its buffer type.
+ batch.deallocateInflightBuffer(free);
throw new IllegalStateException("Attempting to deallocate a batch that is inflight. Batch is " + batch);
}
- free.deallocate(batch.buffer(), batch.initialCapacity());
+ batch.deallocateBuffer(free);
}
}
}
@@ -1257,17 +1272,41 @@ public static final class RecordAppendResult {
public final FutureRecordMetadata future;
public final boolean batchIsFull;
public final boolean newBatchCreated;
+ /**
+ * Signal (incremental strategy) that the open batch is within its batch-size limit but its
+ * chunks lack capacity. The append was NOT attempted; the caller allocates
+ * {@link #extensionBytesNeeded} bytes of chunk capacity, attaches them via
+ * {@link ChunkedProducerBatch#addBuffers}, and retries. When {@code true}, {@code future} is null.
+ */
+ public final boolean needsBufferExtension;
+ public final int extensionBytesNeeded;
public final int appendedBytes;
public RecordAppendResult(FutureRecordMetadata future,
boolean batchIsFull,
boolean newBatchCreated,
int appendedBytes) {
+ this(future, batchIsFull, newBatchCreated, false, 0, appendedBytes);
+ }
+
+ private RecordAppendResult(FutureRecordMetadata future,
+ boolean batchIsFull,
+ boolean newBatchCreated,
+ boolean needsBufferExtension,
+ int extensionBytesNeeded,
+ int appendedBytes) {
this.future = future;
this.batchIsFull = batchIsFull;
this.newBatchCreated = newBatchCreated;
+ this.needsBufferExtension = needsBufferExtension;
+ this.extensionBytesNeeded = extensionBytesNeeded;
this.appendedBytes = appendedBytes;
}
+
+ /** A signal-only result indicating the caller must allocate more chunk capacity. */
+ static RecordAppendResult needsExtension(int extensionBytesNeeded) {
+ return new RecordAppendResult(null, false, false, true, extensionBytesNeeded, 0);
+ }
}
/*
@@ -1299,7 +1338,7 @@ public ReadyCheckResult(Set readyNodes, long nextReadyCheckDelayMs, Set> batches = new CopyOnWriteMap<>();
public final BuiltInPartitioner builtInPartitioner;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractRecords.java
index d0704d18ec37a..4e6e32d75a556 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractRecords.java
@@ -143,6 +143,28 @@ else if (compressionType != CompressionType.NONE)
return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
}
+ /**
+ * Upper-bound on the bytes a single record contributes to a batch, excluding the batch header.
+ * Used by the incremental strategy, which counts the header once, separately.
+ */
+ public static int recordSizeUpperBound(byte magic, CompressionType compressionType, byte[] key,
+ byte[] value, Header[] headers) {
+ return recordSizeUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+ }
+
+ /**
+ * @see #recordSizeUpperBound(byte, CompressionType, byte[], byte[], Header[])
+ */
+ private static int recordSizeUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
+ ByteBuffer value, Header[] headers) {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2)
+ return DefaultRecord.recordSizeUpperBound(key, value, headers);
+ else if (compressionType != CompressionType.NONE)
+ return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic) + LegacyRecord.recordSize(magic, key, value);
+ else
+ return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
+ }
+
/**
* Return the size of the record batch header.
*
diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
index 3fd89deaba135..611654ed3e595 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
@@ -211,6 +211,14 @@ public int initialCapacity() {
return bufferStream.initialCapacity();
}
+ /**
+ * The underlying output stream, exposed so the incremental strategy can manage its
+ * chunk-backed stream.
+ */
+ public ByteBufferOutputStream bufferStream() {
+ return bufferStream;
+ }
+
public double compressionRatio() {
return actualCompressionRatio;
}
@@ -815,12 +823,49 @@ private void ensureOpenForRecordBatchWrite() {
* @return The estimated number of bytes written
*/
private int estimatedBytesWritten() {
- if (compression.type() == CompressionType.NONE) {
+ return estimatedBytesWritten(magic, compression.type(), estimatedCompressionRatio, uncompressedRecordsSizeInBytes);
+ }
+
+ /**
+ * Returns the projected number of bytes the builder would write for
+ * {@code uncompressedRecordsSizeInBytes} of records under the given magic, compression type,
+ * and ratio: exact for uncompressed, a ratio-aware estimate for compressed.
+ *
+ * Used by the incremental strategy to size chunk reservations (first record, and mid-batch via
+ * {@link #estimatedBytesWrittenAfter}).
+ */
+ public static int estimatedBytesWritten(byte magic, CompressionType compressionType,
+ float compressionRatio,
+ int uncompressedRecordsSizeInBytes) {
+ int batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
+ if (compressionType == CompressionType.NONE) {
return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
} else {
- // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
- return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * compressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ }
+ }
+
+ /**
+ * Projected value of {@link #estimatedBytesWritten} after appending one more record with the
+ * given fields, using the record's worst-case (upper-bound) per-record size. Used by the
+ * incremental strategy to size mid-batch chunk extensions.
+ */
+ public int estimatedBytesWrittenAfter(byte[] key, byte[] value, Header[] headers) {
+ return estimatedBytesWrittenAfter(wrapNullable(key), wrapNullable(value), headers);
+ }
+
+ /**
+ * @see #estimatedBytesWrittenAfter(byte[], byte[], Header[])
+ */
+ private int estimatedBytesWrittenAfter(ByteBuffer key, ByteBuffer value, Header[] headers) {
+ final int recordSize;
+ if (magic < RecordBatch.MAGIC_VALUE_V2) {
+ recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
+ } else {
+ recordSize = DefaultRecord.recordSizeUpperBound(key, value, headers);
}
+ return estimatedBytesWritten(magic, compression.type(), estimatedCompressionRatio,
+ uncompressedRecordsSizeInBytes + recordSize);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 5fd9ab727e046..45615be25dce0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -129,6 +129,41 @@ public void testInvalidMetadataRecoveryStrategy() {
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
+ @Test
+ public void testDefaultBufferMemoryAllocationStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ final ProducerConfig producerConfig = new ProducerConfig(configs);
+ assertEquals(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_FULL,
+ producerConfig.getString(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testValidBufferMemoryAllocationStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ configs.put(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG,
+ ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL);
+ final ProducerConfig producerConfig = new ProducerConfig(configs);
+ assertEquals(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL,
+ producerConfig.getString(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidBufferMemoryAllocationStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ configs.put(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG, "abc");
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
+ assertTrue(ce.getMessage().contains(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG));
+ }
+
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java
new file mode 100644
index 0000000000000..5b0f6f9b4fbb4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ChunkedBufferPoolTest {
+
+ private final MockTime time = new MockTime();
+ private final Metrics metrics = new Metrics(time);
+
+ @AfterEach
+ public void teardown() {
+ metrics.close();
+ }
+
+ private ChunkedBufferPool pool(long totalMemory, int chunkSize) {
+ String metricGroup = "producer-metrics";
+ return new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup);
+ }
+
+ /** Single-chunk request returns a list of one buffer at the chunk size. */
+ @Test
+ public void testAllocateOneChunk() throws Exception {
+ int chunkSize = 64;
+ ChunkedBufferPool p = pool(1024, chunkSize);
+ List chunks = p.allocateChunks(chunkSize, 100);
+ assertEquals(1, chunks.size());
+ assertEquals(chunkSize, chunks.get(0).capacity());
+ }
+
+ /** Total size that's not a multiple of chunk size rounds up */
+ @Test
+ public void testAllocateRoundsUpToChunkBoundary() throws Exception {
+ int chunkSize = 64;
+ ChunkedBufferPool p = pool(1024, chunkSize);
+ // 65 bytes requested → 2 chunks (128 bytes total).
+ List chunks = p.allocateChunks(65, 100);
+ assertEquals(2, chunks.size());
+ for (ByteBuffer chunk : chunks)
+ assertEquals(chunkSize, chunk.capacity());
+ }
+
+ /** Multi-chunk request returns ceil(totalSize / chunkSize) chunks. */
+ @Test
+ public void testAllocateMultipleChunks() throws Exception {
+ int chunkSize = 64;
+ ChunkedBufferPool p = pool(1024, chunkSize);
+ List chunks = p.allocateChunks(4 * chunkSize, 100);
+ assertEquals(4, chunks.size());
+ }
+
+ /** Pool memory accounting: after allocation, the unallocated portion shrinks by the request. */
+ @Test
+ public void testAvailableMemoryAfterAllocation() throws Exception {
+ int chunkSize = 64;
+ long total = 256;
+ ChunkedBufferPool p = pool(total, chunkSize);
+ p.allocateChunks(3 * chunkSize, 100);
+ assertEquals(total - 3L * chunkSize, p.availableMemory());
+ }
+
+ /** Returning chunks via deallocate restores pool memory. */
+ @Test
+ public void testDeallocationRestoresMemory() throws Exception {
+ int chunkSize = 64;
+ long total = 256;
+ ChunkedBufferPool p = pool(total, chunkSize);
+ List chunks = p.allocateChunks(2 * chunkSize, 100);
+ for (ByteBuffer chunk : chunks)
+ p.deallocate(chunk);
+ assertEquals(total, p.availableMemory());
+ }
+
+ @Test
+ public void testRejectsRequestExceedingTotalMemory() {
+ ChunkedBufferPool p = pool(128, 64);
+ assertThrows(IllegalArgumentException.class, () -> p.allocateChunks(129, 100));
+ }
+
+ @Test
+ public void testRejectsNonPositiveRequest() {
+ ChunkedBufferPool p = pool(128, 64);
+ assertThrows(IllegalArgumentException.class, () -> p.allocateChunks(0, 100));
+ assertThrows(IllegalArgumentException.class, () -> p.allocateChunks(-1, 100));
+ }
+
+ /**
+ * When a multi-chunk request can't be fully satisfied within the deadline, chunks already
+ * acquired during the call must be returned to the pool.
+ */
+ @Test
+ public void testRollbackOnPartialFailure() throws Exception {
+ int chunkSize = 64;
+ long total = 2 * chunkSize; // only 2 chunks worth of memory
+ ChunkedBufferPool p = pool(total, chunkSize);
+ // Reserve one chunk so the pool has only 1 left.
+ ByteBuffer held = p.allocate(chunkSize, 100);
+
+ // Request 2 chunks with a zero deadline — first chunk fits, second can't, must throw.
+ assertThrows(BufferExhaustedException.class, () -> p.allocateChunks(2 * chunkSize, 0));
+
+ // Available memory must reflect only the chunk we deliberately hold; the request's
+ // first-chunk acquisition was rolled back.
+ assertEquals(total - chunkSize, p.availableMemory());
+
+ p.deallocate(held);
+ assertEquals(total, p.availableMemory());
+ }
+
+ /**
+ * No partial chunk holds during the wait. While a multi-chunk request blocks, the pool's
+ * {@code availableMemory()} must reflect bytes the waiter has not yet "earned".
+ */
+ @Test
+ public void testNoPartialHoldsDuringWait() throws Exception {
+ int chunkSize = 64;
+ long total = 4L * chunkSize;
+ ChunkedBufferPool p = pool(total, chunkSize);
+ // Drain the pool down to 1 chunk's worth so a 2-chunk request can't be satisfied immediately.
+ ByteBuffer h1 = p.allocate(chunkSize, 100);
+ ByteBuffer h2 = p.allocate(chunkSize, 100);
+ ByteBuffer h3 = p.allocate(chunkSize, 100);
+ long available = p.availableMemory();
+ assertEquals(chunkSize, available, "pool should have exactly 1 chunk free");
+
+ // Background thread requests 2 chunks; will block on the 2nd.
+ AtomicReference err = new AtomicReference<>();
+ Thread t = new Thread(() -> {
+ try {
+ p.allocateChunks(2 * chunkSize, 5_000);
+ } catch (Throwable th) {
+ err.set(th);
+ }
+ }, "chunked-waiter");
+ t.start();
+ // Wait until the thread has joined the waiters queue.
+ TestUtils.waitForCondition(() -> p.queued() == 1, "waiter should be parked on the pool's queue");
+
+ // While the waiter is parked, the pool's available memory must still report the
+ // 1 chunk's worth — the waiter has NOT consumed any of it (no partial hold).
+ assertEquals(chunkSize, p.availableMemory(),
+ "pool memory must reflect no partial holds during the atomic wait");
+
+ // Free enough chunks for the waiter to complete.
+ p.deallocate(h1);
+ p.deallocate(h2);
+ t.join(5_000);
+ assertNull(err.get(), "waiter unexpectedly threw: " + err.get());
+ p.deallocate(h3);
+ }
+
+ /**
+ * FIFO fairness across the K-chunk request. A multi-chunk request that joins the wait queue
+ * before a single-chunk request must complete first when memory becomes available — the
+ * K-chunk request occupies a single waiter slot, not K of them.
+ */
+ @Test
+ public void testFifoFairnessAcrossChunkedAndSingleChunkRequests() throws Exception {
+ int chunkSize = 64;
+ long total = 4L * chunkSize;
+ ChunkedBufferPool p = pool(total, chunkSize);
+ // Drain the pool entirely so any new request must wait.
+ ByteBuffer h1 = p.allocate(chunkSize, 100);
+ ByteBuffer h2 = p.allocate(chunkSize, 100);
+ ByteBuffer h3 = p.allocate(chunkSize, 100);
+ ByteBuffer h4 = p.allocate(chunkSize, 100);
+ assertEquals(0, p.availableMemory());
+
+ // T_multi enters first, requesting 2 chunks; T_single enters second, requesting 1 chunk.
+ AtomicReference multiCompletionMs = new AtomicReference<>();
+ AtomicReference singleCompletionMs = new AtomicReference<>();
+ CountDownLatch multiStarted = new CountDownLatch(1);
+ Thread tMulti = new Thread(() -> {
+ try {
+ multiStarted.countDown();
+ List got = p.allocateChunks(2 * chunkSize, 10_000);
+ multiCompletionMs.set(System.nanoTime());
+ for (ByteBuffer b : got) p.deallocate(b);
+ } catch (Throwable th) {
+ multiCompletionMs.set(-1L);
+ }
+ }, "multi");
+ Thread tSingle = new Thread(() -> {
+ try {
+ ByteBuffer got = p.allocate(chunkSize, 10_000);
+ singleCompletionMs.set(System.nanoTime());
+ p.deallocate(got);
+ } catch (Throwable th) {
+ singleCompletionMs.set(-1L);
+ }
+ }, "single");
+
+ tMulti.start();
+ assertTrue(multiStarted.await(2, TimeUnit.SECONDS));
+ // Wait for tMulti to be parked before starting tSingle, so the FIFO order is deterministic.
+ TestUtils.waitForCondition(() -> p.queued() == 1, "multi-chunk waiter should be the only one queued");
+ tSingle.start();
+ // Wait for tSingle to also be parked.
+ TestUtils.waitForCondition(() -> p.queued() == 2, "single-chunk waiter joined after the multi-chunk one");
+
+ // Now free chunks one at a time, allowing the multi-chunk request to accumulate.
+ p.deallocate(h1);
+ Thread.sleep(50);
+ p.deallocate(h2);
+ // Both freed — the FIFO leader (multi) should claim both before the single-chunk waiter
+ // gets any. The multi completes first.
+ tMulti.join(5_000);
+ // Now free another chunk for the single-chunk waiter.
+ p.deallocate(h3);
+ tSingle.join(5_000);
+
+ assertNotNull(multiCompletionMs.get(), "multi did not complete");
+ assertNotNull(singleCompletionMs.get(), "single did not complete");
+ assertTrue(multiCompletionMs.get() != -1L && singleCompletionMs.get() != -1L,
+ "both threads must complete without exception");
+ assertTrue(multiCompletionMs.get() <= singleCompletionMs.get(),
+ "FIFO violated: single (joined later) completed at "
+ + singleCompletionMs.get() + " before multi at " + multiCompletionMs.get());
+ p.deallocate(h4);
+ }
+
+ /**
+ * Slow-path rollback must not double-refund pooled chunks. When the waiter polls some
+ * chunks from the free list, then times out on a subsequent iteration, the inner finally
+ * refunds {@code accumulated} bytes to {@code nonPooledAvailableMemory} and the outer
+ * catch re-inserts the polled chunks into {@code free}. If {@code accumulated} includes
+ * bytes that came from polled chunks, the same memory is refunded twice — the pool
+ * over-reports {@code availableMemory()} and subsequent allocations can exceed the
+ * configured {@code buffer.memory} limit.
+ */
+ @Test
+ public void testSlowPathDoesNotDoubleRefundPolledChunksOnTimeout() throws Exception {
+ int chunkSize = 64;
+ long total = 3L * chunkSize;
+ ChunkedBufferPool p = pool(total, chunkSize);
+
+ // Drain so the next allocateChunks must wait.
+ ByteBuffer h1 = p.allocate(chunkSize, 100);
+ ByteBuffer h2 = p.allocate(chunkSize, 100);
+ ByteBuffer h3 = p.allocate(chunkSize, 100);
+ assertEquals(0, p.availableMemory());
+
+ AtomicReference err = new AtomicReference<>();
+ Thread t = new Thread(() -> {
+ try {
+ // Asks for 3 chunks with a finite deadline. After we deallocate 2 chunks below
+ // the waiter will poll them, find itself still 1 short, and time out on the
+ // next await iteration — with 2 chunks held in `pooled` and 2*chunkSize in
+ // `accumulated`.
+ p.allocateChunks(3 * chunkSize, 500);
+ } catch (Throwable th) {
+ err.set(th);
+ }
+ }, "partial-holder");
+ t.start();
+
+ TestUtils.waitForCondition(() -> p.queued() == 1, "waiter should be parked on the pool's queue");
+
+ // Hand the waiter 2 chunks — it polls them into `pooled` then awaits again for the 3rd.
+ p.deallocate(h1);
+ p.deallocate(h2);
+
+ // Give the waiter a moment to wake, poll, and re-enter await before the timeout fires.
+ Thread.sleep(100);
+
+ t.join(5_000);
+ assertInstanceOf(BufferExhaustedException.class, err.get(), "expected BufferExhaustedException, got " + err.get());
+
+ // After the throw, exactly 2 chunkSizes of physical memory should be back in the pool
+ // (h1 + h2). h3 is still held outside the pool. The bug double-refunds: it credits
+ // 2*chunkSize to `nonPooledAvailableMemory` (inner finally) AND re-inserts h1, h2 into
+ // `free` (outer catch), so availableMemory() reports 4*chunkSize.
+ assertEquals(2L * chunkSize, p.availableMemory(),
+ "pool over-reports availableMemory due to double-refund of pooled chunks on rollback");
+
+ p.deallocate(h3);
+ }
+
+ /**
+ * Slow-path success must not corrupt the pool's accounting. When the waiter polls chunks
+ * from the free list to satisfy its request, the finally block runs with the inner loop's
+ * "success" reset still in effect — it must not refund or decrement {@code nonPool}.
+ * Pool chunks transferred to the caller via {@code pooled} are accounted for by removing
+ * them from {@code free}, not by deducting their bytes from {@code nonPool}.
+ */
+ @Test
+ public void testSlowPathSuccessDoesNotCorruptAccounting() throws Exception {
+ int chunkSize = 64;
+ long total = 3L * chunkSize;
+ ChunkedBufferPool p = pool(total, chunkSize);
+
+ // Drain so a 2-chunk request must wait.
+ ByteBuffer h1 = p.allocate(chunkSize, 100);
+ ByteBuffer h2 = p.allocate(chunkSize, 100);
+ ByteBuffer h3 = p.allocate(chunkSize, 100);
+ assertEquals(0, p.availableMemory());
+
+ AtomicReference err = new AtomicReference<>();
+ AtomicReference> got = new AtomicReference<>();
+ Thread t = new Thread(() -> {
+ try {
+ got.set(p.allocateChunks(2 * chunkSize, 10_000));
+ } catch (Throwable th) {
+ err.set(th);
+ }
+ }, "slow-success");
+ t.start();
+
+ TestUtils.waitForCondition(() -> p.queued() == 1, "waiter should be parked on the pool's queue");
+
+ // Deallocate 2 chunks → waiter wakes, polls both, accumulated reaches memoryRequired,
+ // exits normally. The success path's finally must NOT refund or decrement nonPool.
+ p.deallocate(h1);
+ p.deallocate(h2);
+ t.join(5_000);
+ assertNull(err.get(), "waiter unexpectedly threw: " + err.get());
+
+ // After success: waiter owns the 2 chunks (returned via `got`), the test still holds h3.
+ // The pool has lent out everything it had — availableMemory must be exactly 0.
+ assertEquals(0, p.availableMemory(),
+ "slow-path success corrupted accounting (likely a phantom nonPool decrement in the finally)");
+
+ // Returning all the held buffers must fully restore the pool.
+ for (ByteBuffer chunk : got.get())
+ p.deallocate(chunk);
+ p.deallocate(h3);
+ assertEquals(total, p.availableMemory(),
+ "pool not fully restored after returning all chunks");
+ }
+
+ /**
+ * Closing the pool while a multi-chunk request is parked must surface a
+ * {@link KafkaException} and refund all reserved bytes.
+ */
+ @Test
+ public void testCloseDuringAtomicWait() throws Exception {
+ int chunkSize = 64;
+ long total = 2L * chunkSize;
+ ChunkedBufferPool p = pool(total, chunkSize);
+ // Drain so the next request must wait.
+ ByteBuffer h1 = p.allocate(chunkSize, 100);
+ ByteBuffer h2 = p.allocate(chunkSize, 100);
+ assertEquals(0, p.availableMemory());
+
+ AtomicReference err = new AtomicReference<>();
+ Thread t = new Thread(() -> {
+ try {
+ p.allocateChunks(2 * chunkSize, 10_000);
+ } catch (Throwable th) {
+ err.set(th);
+ }
+ }, "close-during-wait");
+ t.start();
+ TestUtils.waitForCondition(() -> p.queued() == 1, "waiter should be parked on the pool's queue");
+
+ // Close the pool: signals all waiters; the waiter must throw KafkaException.
+ p.close();
+ t.join(5_000);
+ assertInstanceOf(KafkaException.class, err.get(), "expected KafkaException, got " + err.get());
+ p.deallocate(h1);
+ p.deallocate(h2);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedByteBufferOutputStreamTest.java
new file mode 100644
index 0000000000000..145876730e977
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedByteBufferOutputStreamTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ChunkedByteBufferOutputStreamTest {
+
+ private final MockTime time = new MockTime();
+ private final Metrics metrics = new Metrics(time);
+
+ @AfterEach
+ public void teardown() {
+ metrics.close();
+ }
+
+ private ChunkedBufferPool pool(long total, int chunkSize) {
+ String metricGroup = "test";
+ return new ChunkedBufferPool(total, chunkSize, metrics, time, metricGroup);
+ }
+
+ private List chunks(ChunkedBufferPool pool, int chunkSize, int count) throws InterruptedException {
+ return pool.allocateChunks(chunkSize * count, 100);
+ }
+
+ @Test
+ public void testConstructorRejectsInvalidChunks() {
+ int chunkSize = 16;
+ ChunkedBufferPool p = pool(64, chunkSize);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new ChunkedByteBufferOutputStream(null, chunkSize, p));
+ assertThrows(IllegalArgumentException.class,
+ () -> new ChunkedByteBufferOutputStream(Collections.emptyList(), chunkSize, p));
+
+ // A chunk whose capacity doesn't match chunkSize violates the contract.
+ List wrongSize = Collections.singletonList(ByteBuffer.allocate(chunkSize + 1));
+ assertThrows(IllegalArgumentException.class,
+ () -> new ChunkedByteBufferOutputStream(wrongSize, chunkSize, p));
+ }
+
+ @Test
+ public void testSingleChunkWriteRoundtrip() throws Exception {
+ int chunkSize = 16;
+ ChunkedBufferPool p = pool(64, chunkSize);
+ try (ChunkedByteBufferOutputStream stream = new ChunkedByteBufferOutputStream(chunks(p, chunkSize, 1), chunkSize, p)) {
+
+ byte[] payload = new byte[]{1, 2, 3, 4, 5};
+ stream.write(payload, 0, payload.length);
+
+ ByteBuffer flat = stream.buffer();
+ flat.flip();
+ byte[] out = new byte[flat.remaining()];
+ flat.get(out);
+ assertArrayEquals(payload, out);
+
+ stream.deallocate();
+ }
+ }
+
+ @Test
+ public void testWriteAcrossMultipleChunks() throws Exception {
+ int chunkSize = 8;
+ ChunkedBufferPool p = pool(64, chunkSize);
+ try (ChunkedByteBufferOutputStream stream = new ChunkedByteBufferOutputStream(chunks(p, chunkSize, 3), chunkSize, p)) {
+
+ byte[] payload = new byte[20];
+ for (int i = 0; i < payload.length; i++) payload[i] = (byte) i;
+ stream.write(payload, 0, payload.length);
+
+ ByteBuffer flat = stream.buffer();
+ flat.flip();
+ byte[] out = new byte[flat.remaining()];
+ flat.get(out);
+ assertArrayEquals(payload, out);
+
+ stream.deallocate();
+ }
+ }
+
+ @Test
+ public void testRemainingSumsFreeBytesAcrossChunks() throws Exception {
+ int chunkSize = 8;
+ ChunkedBufferPool p = pool(64, chunkSize);
+ try (ChunkedByteBufferOutputStream stream = new ChunkedByteBufferOutputStream(chunks(p, chunkSize, 2), chunkSize, p)) {
+
+ assertEquals(2 * chunkSize, stream.remaining());
+ stream.write(new byte[3], 0, 3);
+ assertEquals(2 * chunkSize - 3, stream.remaining());
+ stream.write(new byte[chunkSize], 0, chunkSize); // crosses into chunk 2
+ assertEquals(chunkSize - 3, stream.remaining());
+
+ stream.deallocate();
+ }
+ }
+
+ @Test
+ public void testAddBuffersExtendsStream() throws Exception {
+ int chunkSize = 8;
+ ChunkedBufferPool p = pool(64, chunkSize);
+ try (ChunkedByteBufferOutputStream stream = new ChunkedByteBufferOutputStream(chunks(p, chunkSize, 1), chunkSize, p)) {
+
+ // Fill the initial chunk.
+ stream.write(new byte[chunkSize], 0, chunkSize);
+ assertEquals(0, stream.remaining());
+
+ // Extend and write more — must land in the new chunk.
+ stream.addBuffers(Collections.singletonList(p.allocate(chunkSize, 100)));
+ assertEquals(chunkSize, stream.remaining());
+
+ byte[] more = new byte[]{9, 9, 9};
+ stream.write(more, 0, more.length);
+ assertEquals(chunkSize - 3, stream.remaining());
+
+ stream.deallocate();
+ }
+ }
+
+ @Test
+ public void testPositionWalksAcrossChunks() throws Exception {
+ int chunkSize = 4;
+ ChunkedBufferPool p = pool(32, chunkSize);
+ try (ChunkedByteBufferOutputStream stream = new ChunkedByteBufferOutputStream(chunks(p, chunkSize, 3), chunkSize, p)) {
+
+ stream.position(6); // straddles chunk 0 (4 bytes) and chunk 1 (2 bytes)
+ assertEquals(6, stream.position());
+ assertEquals(6, stream.remaining());
+
+ stream.deallocate();
+ }
+ }
+
+ @Test
+ public void testDeallocateReturnsAllChunks() throws Exception {
+ int chunkSize = 8;
+ long total = 64;
+ ChunkedBufferPool p = pool(total, chunkSize);
+ List initial = chunks(p, chunkSize, 3);
+ try (ChunkedByteBufferOutputStream stream = new ChunkedByteBufferOutputStream(initial, chunkSize, p)) {
+ // Also attach one extra chunk so deallocate must handle the added-buffer case too.
+ stream.addBuffers(Collections.singletonList(p.allocate(chunkSize, 100)));
+ assertEquals(total - 4L * chunkSize, p.availableMemory());
+
+ stream.deallocate();
+ }
+ assertEquals(total, p.availableMemory());
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedRecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedRecordAccumulatorTest.java
new file mode 100644
index 0000000000000..d658e201083fe
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedRecordAccumulatorTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.MetadataSnapshot;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.internals.LogContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ChunkedRecordAccumulatorTest {
+
+ private final String topic = "test";
+ private final int partition1 = 0;
+ private final Node node1 = new Node(0, "localhost", 1111);
+ private final TopicPartition tp1 = new TopicPartition(topic, partition1);
+
+ private final PartitionMetadata partMetadata1 =
+ new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.empty(), null, null, null);
+ private final List partMetadatas = new ArrayList<>(List.of(partMetadata1));
+ private final Map nodes =
+ Stream.of(node1).collect(Collectors.toMap(Node::id, java.util.function.Function.identity()));
+ private final MetadataSnapshot metadataCache = new MetadataSnapshot(null, nodes, partMetadatas,
+ Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap());
+ private final Cluster cluster = metadataCache.cluster();
+
+ private final MockTime time = new MockTime();
+ private final Metrics metrics = new Metrics(time);
+ private final LogContext logContext = new LogContext();
+ private final long maxBlockTimeMs = 1000;
+ private final byte[] key = "k".getBytes();
+
+ @AfterEach
+ public void teardown() {
+ metrics.close();
+ }
+
+ private ChunkedRecordAccumulator newAccumulator(int batchSize, int chunkSize, long totalMemory, Compression compression) {
+ ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, "producer-metrics");
+ return new ChunkedRecordAccumulator(logContext, batchSize, compression,
+ /* lingerMs */ 0, /* retryBackoffMs */ 0L, /* retryBackoffMaxMs */ 0L,
+ /* deliveryTimeoutMs */ 3200, metrics, "producer-metrics", time,
+ /* transactionManager */ null, pool);
+ }
+
+ @Test
+ public void testFirstRecordCreatesChunkedBatch() throws Exception {
+ int chunkSize = 256;
+ ChunkedRecordAccumulator accum = newAccumulator(1024, chunkSize, 16L * chunkSize, Compression.NONE);
+
+ byte[] value = new byte[64];
+ accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ Deque dq = batchesFor(accum, tp1);
+ assertEquals(1, dq.size());
+ ProducerBatch batch = dq.peekFirst();
+ assertNotNull(batch);
+ assertEquals(1, batch.recordCount);
+ accum.close();
+ }
+
+ @Test
+ public void testSmallFollowupRecordFitsWithoutExtension() throws Exception {
+ int chunkSize = 256;
+ ChunkedRecordAccumulator accum = newAccumulator(1024, chunkSize, 16L * chunkSize, Compression.NONE);
+
+ accum.append(topic, partition1, 0L, key, new byte[16], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+ accum.append(topic, partition1, 0L, key, new byte[16], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ Deque dq = batchesFor(accum, tp1);
+ assertEquals(1, dq.size(), "Both records should land in the same batch");
+ assertNotNull(dq.peekFirst());
+ assertEquals(2, dq.peekFirst().recordCount);
+ accum.close();
+ }
+
+ /**
+ * A follow-up record that would overflow the existing chunks triggers mid-batch extension:
+ * additional chunks are attached and the record lands in the same batch.
+ */
+ @Test
+ public void testMidBatchExtensionGrowsExistingBatch() throws Exception {
+ int chunkSize = 128;
+ ChunkedRecordAccumulator accum = newAccumulator(8192, chunkSize, 16L * chunkSize, Compression.NONE);
+
+ // First record fits in 1 chunk (~64+overhead).
+ accum.append(topic, partition1, 0L, key, new byte[32], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ Deque dq = batchesFor(accum, tp1);
+ assertEquals(1, dq.size());
+ assertNotNull(dq.peekFirst());
+ int initialRecordCount = dq.peekFirst().recordCount;
+ assertEquals(1, initialRecordCount);
+
+ // Second record big enough to require an extra chunk.
+ accum.append(topic, partition1, 0L, key, new byte[200], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ // Same batch, now with the second record.
+ assertEquals(1, dq.size(), "Should still be the same batch — extension should not roll");
+ assertNotNull(dq.peekFirst());
+ assertEquals(2, dq.peekFirst().recordCount,
+ "Second record should land in the extended batch");
+ accum.close();
+ }
+
+ /**
+ * Two concurrent appenders race to extend the same open batch, each sizing its extension
+ * against the same remaining capacity off-lock; once one attaches its chunks, the other's is
+ * short. Verifies the second-lock re-check makes the short appender re-check rather than write
+ * past the stream's capacity, which would fail.
+ */
+ @Test
+ public void testConcurrentExtensionRaceDoesNotOverflowChunkedStream() throws Exception {
+ final int chunkSize = 256;
+ final int recordValueSize = 350; // sized so gap ≈ chunkSize → minimal rounding cushion
+
+ final AtomicBoolean armed = new AtomicBoolean(false);
+ final CountDownLatch aHoldsChunks = new CountDownLatch(1);
+ final CountDownLatch bDoneAttaching = new CountDownLatch(1);
+
+ ChunkedBufferPool pool = new ChunkedBufferPool(64L * chunkSize, chunkSize, metrics, time, "producer-metrics") {
+ @Override
+ public List allocateChunks(int totalSize, long maxTimeToBlockMs) throws InterruptedException {
+ boolean blockThisCall = armed.compareAndSet(true, false);
+ List chunks = super.allocateChunks(totalSize, maxTimeToBlockMs);
+ if (blockThisCall) {
+ aHoldsChunks.countDown();
+ if (!bDoneAttaching.await(10, TimeUnit.SECONDS)) {
+ throw new InterruptedException("test timeout waiting for B");
+ }
+ }
+ return chunks;
+ }
+ };
+ ChunkedRecordAccumulator accum = new ChunkedRecordAccumulator(logContext, 8192, Compression.NONE,
+ /* lingerMs */ 0, /* retryBackoffMs */ 0L, /* retryBackoffMaxMs */ 0L,
+ /* deliveryTimeoutMs */ 3200, metrics, "producer-metrics", time,
+ /* transactionManager */ null, pool);
+ try {
+ // Warmup: tiny first record establishes an open batch with a known remainingInStream.
+ accum.append(topic, partition1, 0L, key, new byte[1], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ // Arm the race: the next allocateChunks call (A's) will block until B has appended.
+ armed.set(true);
+
+ AtomicReference aError = new AtomicReference<>();
+ Thread tA = new Thread(() -> {
+ try {
+ accum.append(topic, partition1, 0L, key, new byte[recordValueSize],
+ Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
+ } catch (Throwable t) {
+ aError.set(t);
+ }
+ }, "test-appender-A");
+ tA.start();
+
+ // Wait for A to be parked inside allocateChunks holding its (pre-attach) chunks.
+ assertTrue(aHoldsChunks.await(10, TimeUnit.SECONDS), "A did not reach allocateChunks");
+
+ // B runs on this thread. Its allocateChunks is the second call — armed=false now,
+ // so it proceeds without blocking. B reads the same R as A (A hasn't attached yet),
+ // attaches its own chunks, appends its record. Now the open batch's remaining has shrunk
+ // by B's actual record size, but A's still-off-lock allocation didn't know that.
+ accum.append(topic, partition1, 0L, key, new byte[recordValueSize], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ // Release A. Without the fix in the second-lock block, A's tryAppendToExisting
+ // will overflow the chunked stream and throw IllegalStateException.
+ bDoneAttaching.countDown();
+ tA.join(10_000);
+ assertFalse(tA.isAlive(), "Thread A did not complete in time");
+
+ assertNull(aError.get(),
+ "Thread A failed: " + (aError.get() == null ? "" : aError.get().toString()));
+ } finally {
+ // Drain any state regardless of outcome.
+ accum.close();
+ }
+ }
+
+ /**
+ * When the sticky partition switches while extension chunks are held off-lock, those
+ * chunks (sized against the previous partition's open batch) must be refunded to the pool on the
+ * retry (not carried over and attached to a different partition's open batch)
+ */
+ @Test
+ public void testPartitionSwitchRefundsHeldExtensionChunks() throws Exception {
+ int chunkSize = 128;
+ long totalMemory = 32L * chunkSize;
+
+ AtomicBoolean extensionAllocated = new AtomicBoolean(false);
+ AtomicInteger chunkDeallocations = new AtomicInteger(0);
+
+ ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, "producer-metrics") {
+ @Override
+ public List allocateChunks(int totalSize, long maxTimeToBlockMs) throws InterruptedException {
+ List chunks = super.allocateChunks(totalSize, maxTimeToBlockMs);
+ // The mid-batch extension path is the only non-blocking caller.
+ if (maxTimeToBlockMs == 0L)
+ extensionAllocated.set(true);
+ return chunks;
+ }
+
+ @Override
+ public void deallocate(ByteBuffer buffer) {
+ chunkDeallocations.incrementAndGet();
+ super.deallocate(buffer);
+ }
+ };
+
+ AtomicBoolean switchFired = new AtomicBoolean(false);
+ ChunkedRecordAccumulator accum = new ChunkedRecordAccumulator(logContext, 8192, Compression.NONE,
+ /* lingerMs */ 0, /* retryBackoffMs */ 0L, /* retryBackoffMaxMs */ 0L,
+ /* deliveryTimeoutMs */ 3200, metrics, "producer-metrics", time,
+ /* transactionManager */ null, pool) {
+ @Override
+ protected boolean partitionChanged(String topic, TopicInfo topicInfo,
+ BuiltInPartitioner.StickyPartitionInfo partitionInfo,
+ Deque deque, long nowMs, Cluster cluster) {
+ // Inject one spurious switch, but only once an extension allocation has happened —
+ // i.e. on the second-sync-block check, while extension chunks are held.
+ if (extensionAllocated.get() && switchFired.compareAndSet(false, true))
+ return true;
+ return super.partitionChanged(topic, topicInfo, partitionInfo, deque, nowMs, cluster);
+ }
+ };
+ try {
+ // Warmup: tiny first record establishes an open batch (first-record path, blocking
+ // allocate — does not flag extensionAllocated).
+ accum.append(topic, partition1, 0L, key, new byte[1], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+ int deallocBeforeExtend = chunkDeallocations.get();
+
+ // Second record overflows the chunk → extension allocated off-lock → injected switch
+ // fires in the second sync block with those chunks held.
+ accum.append(topic, partition1, 0L, key, new byte[200], Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ assertTrue(switchFired.get(), "the injected partition switch should have fired");
+ assertTrue(chunkDeallocations.get() > deallocBeforeExtend,
+ "extension chunks held at the partition switch must be refunded to the pool; "
+ + "without the fix they are carried and attached instead");
+
+ // The record still appends correctly after the switch-retry.
+ Deque dq = batchesFor(accum, tp1);
+ assertEquals(1, dq.size());
+ assertNotNull(dq.peekFirst());
+ assertEquals(2, dq.peekFirst().recordCount,
+ "second record should still land in the batch after the switch-retry");
+ } finally {
+ accum.close();
+ }
+ }
+
+ @Test
+ public void testInflightExpirationReturnsAllChunksToPool() throws Exception {
+ int chunkSize = 128;
+ long totalMemory = 32L * chunkSize;
+ ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, "producer-metrics");
+ ChunkedRecordAccumulator accum = new ChunkedRecordAccumulator(logContext, 8192, Compression.NONE,
+ /* lingerMs */ 0, /* retryBackoffMs */ 0L, /* retryBackoffMaxMs */ 0L,
+ /* deliveryTimeoutMs */ 3200, metrics, "producer-metrics", time,
+ /* transactionManager */ null, pool);
+
+ // A record large enough that allocateChunks reserves multiple chunks (K > 1).
+ byte[] value = new byte[400];
+ accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ Deque dq = batchesFor(accum, tp1);
+ assertEquals(1, dq.size());
+ ProducerBatch batch = dq.peekFirst();
+
+ // Confirm the batch actually consumed multiple chunks. estimateSizeInBytesUpperBound for
+ // a 400-byte value with v2 framing is well over chunkSize, so K should be >= 2.
+ long heldBeforeDeallocate = totalMemory - pool.availableMemory();
+ assertTrue(heldBeforeDeallocate >= 2L * chunkSize,
+ "test setup expects K >= 2; pool held " + heldBeforeDeallocate + " bytes");
+
+ // Mark the batch as if the Sender had drained it.
+ batch.setInflight(true);
+
+ // The inflight branch throws after deallocating. The chunked override must return
+ // all K chunks (not just initialCapacity) before propagating the exception.
+ assertThrows(IllegalStateException.class, () -> accum.deallocate(batch));
+
+ assertEquals(totalMemory, pool.availableMemory(),
+ "pool should be fully restored after inflight-expiration deallocate; "
+ + "any K-1 unsurrendered chunks indicate the chunked-leak regression");
+
+ accum.close();
+ }
+
+ private Deque batchesFor(RecordAccumulator accum, TopicPartition tp) {
+ return accum.getDeque(tp);
+ }
+
+ /**
+ * Test that chunks are returned to the pool only at batch completion (deallocate), never at close.
+ */
+ @Test
+ public void testBatchCloseDoesNotDeallocateChunksPrematurely() throws Exception {
+ int chunkSize = 256;
+ ChunkedRecordAccumulator accum = newAccumulator(8192, chunkSize, 32L * chunkSize, Compression.NONE);
+
+ byte[] value = new byte[64];
+ accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+
+ Deque dq = batchesFor(accum, tp1);
+ ProducerBatch batch = dq.peekFirst();
+ assertNotNull(batch);
+
+ // Matches the call sites in RecordAccumulator.drain and Sender.
+ batch.close();
+ MemoryRecords records = batch.records();
+ assertTrue(records.sizeInBytes() > 0,
+ "batch must produce a non-empty record set after close; chunks were deallocated prematurely");
+
+ int count = 0;
+ for (RecordBatch rb : records.batches()) {
+ for (Record r : rb) {
+ count++;
+ assertNotNull(r.value());
+ }
+ }
+ assertEquals(1, count, "expected exactly 1 record after close");
+
+ accum.close();
+ }
+
+ /**
+ * As small records accumulate in a batch, the attached chunks grow with the batch's cumulative
+ * projected output (not per-record).
+ */
+ @Test
+ public void testExtensionTracksCumulativeBatchSize() throws Exception {
+ int chunkSize = 64;
+ int batchSize = 512;
+ ChunkedRecordAccumulator accum = newAccumulator(batchSize, chunkSize, 64L * chunkSize, Compression.NONE);
+
+ byte[] smallValue = new byte[24];
+ for (int i = 0; i < 6; i++) {
+ accum.append(topic, partition1, 0L, key, smallValue, Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+ }
+
+ Deque dq = batchesFor(accum, tp1);
+ assertEquals(1, dq.size());
+ ProducerBatch batch = dq.peekFirst();
+ assertNotNull(batch);
+ assertEquals(6, batch.recordCount);
+
+ // batch.close() flattens chunks and writes the header. If chunks under-allocated, this
+ // throws (insufficient capacity in the underlying buffer).
+ batch.close();
+ MemoryRecords records = batch.records();
+ int actualSize = records.sizeInBytes();
+ // Under NONE compression, estimatedBytesWritten is exact: physical bytes = header + sum
+ // of per-record bytes. The chunks attached must cover that, so actualSize must be >
+ // chunkSize for a multi-record batch with non-trivial content.
+ assertTrue(actualSize > chunkSize,
+ "batch should have grown beyond a single chunk; got " + actualSize);
+
+ accum.close();
+ }
+
+ @Test
+ public void testCumulativeAccountsForBatchHeaderOnce() throws Exception {
+ int chunkSize = 256;
+ int batchSize = 8192;
+ long totalMemory = 64L * chunkSize;
+ ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, "producer-metrics");
+ ChunkedRecordAccumulator accum = new ChunkedRecordAccumulator(logContext, batchSize, Compression.NONE,
+ /* lingerMs */ 0, /* retryBackoffMs */ 0L, /* retryBackoffMaxMs */ 0L,
+ /* deliveryTimeoutMs */ 3200, metrics, "producer-metrics", time,
+ /* transactionManager */ null, pool);
+
+ long beforeAlloc = pool.availableMemory();
+ // First record establishes the batch. Each subsequent small record contributes its
+ // uncompressed bytes to the cumulative target; the batch header is NOT re-counted.
+ byte[] smallValue = new byte[8];
+ for (int i = 0; i < 4; i++) {
+ accum.append(topic, partition1, 0L, key, smallValue, Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, time.milliseconds(), cluster);
+ }
+
+ // Each per-record append adds ~10-30 bytes (key + value + record overhead, V2). Cumulative
+ // total for 4 records is well below chunkSize=256, so only 1 chunk should ever be attached.
+ // The per-record formula (header counted once per record) would have allocated more.
+ long held = beforeAlloc - pool.availableMemory();
+ assertEquals(chunkSize, held,
+ "cumulative formula should hold exactly one chunk for a small-record batch; "
+ + "header double-counting (per-record formula) would inflate this");
+
+ accum.close();
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 71d0dd8273992..1d50bd8910549 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -101,6 +101,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
super.tearDown()
}
+ /**
+ * Additional producer properties applied to every producer created via [[createProducer]].
+ * Subclasses override this to run the whole suite under a different producer configuration
+ * (e.g. a different buffer.memory.allocation.strategy).
+ */
+ protected def producerOverrides: Properties = new Properties()
+
protected def createProducer(lingerMs: Int = 0,
deliveryTimeoutMs: Int = 2 * 60 * 1000,
batchSize: Int = 16384,
@@ -117,7 +124,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
deliveryTimeoutMs = deliveryTimeoutMs,
maxBlockMs = maxBlockMs,
batchSize = batchSize,
- bufferSize = bufferSize)
+ bufferSize = bufferSize,
+ additionalProperties = Some(producerOverrides))
registerProducer(producer)
}
diff --git a/core/src/test/scala/integration/kafka/api/IncrementalAllocationProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/IncrementalAllocationProducerSendTest.scala
new file mode 100644
index 0000000000000..74bcd892813c7
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/IncrementalAllocationProducerSendTest.scala
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.util.concurrent.ThreadLocalRandom
+import java.util.{Properties, List => JList}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+
+/**
+ * Runs the whole [[BaseProducerSendTest]] suite with the producer configured with the incremental
+ * buffer.memory allocation strategy, plus a few scenarios specific to chunked allocation.
+ */
+class IncrementalAllocationProducerSendTest extends BaseProducerSendTest {
+
+ override protected def producerOverrides: Properties = {
+ val props = new Properties()
+ props.put(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG,
+ ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL)
+ props
+ }
+
+ // The incremental strategy does not support compression yet; the base test would fail at
+ // producer construction. Overriding without the test annotations removes it from this
+ // subclass's run. TODO: remove this override when compression support lands.
+ override def testSendCompressedMessageWithCreateTime(groupProtocol: String): Unit = {}
+
+ // batch.size=0 is below the internal chunk size, so the incremental strategy falls back to the full
+ // allocation path (a batch is smaller than one chunk). Verifies that fallback yields a working producer.
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testBatchSizeZero(groupProtocol: String): Unit = {
+ sendAndVerify(createProducer(
+ lingerMs = Int.MaxValue,
+ deliveryTimeoutMs = Int.MaxValue,
+ batchSize = 0))
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testSendLargeRecordsSpanningMultipleChunks(groupProtocol: String): Unit = {
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2)
+ val partition = 0
+ val tp = new TopicPartition(topic, partition)
+ val valueSize = 600_000 // far larger than one chunk, so each record spans many chunks
+ // TODO: also exercise the compressed codecs here once the incremental strategy supports compression.
+
+ val producer = createProducer(batchSize = 1024 * 1024, bufferSize = 8L * 1024 * 1024)
+ val value = randomBytes(valueSize)
+ val metadata = producer.send(new ProducerRecord(topic, partition, "key".getBytes, value)).get
+ assertEquals(valueSize, metadata.serializedValueSize)
+
+ // Verify the exact bytes round-trip.
+ val consumer = TestUtils.createConsumer(
+ bootstrapServers(listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
+ groupProtocolFromTestParameters())
+ try {
+ consumer.assign(JList.of(tp))
+ consumer.seekToBeginning(JList.of(tp))
+ val consumed = TestUtils.consumeRecords(consumer, 1)
+ assertEquals(metadata.offset, consumed.head.offset)
+ assertArrayEquals(value, consumed.head.value)
+ } finally {
+ consumer.close()
+ }
+ }
+
+ private def randomBytes(size: Int): Array[Byte] = {
+ val bytes = new Array[Byte](size)
+ ThreadLocalRandom.current().nextBytes(bytes)
+ bytes
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bc41c2ed2aeeb..088fe6384ba6d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -491,7 +491,8 @@ object TestUtils extends Logging {
saslProperties: Option[Properties] = None,
keySerializer: Serializer[K] = new ByteArraySerializer,
valueSerializer: Serializer[V] = new ByteArraySerializer,
- enableIdempotence: Boolean = false): KafkaProducer[K, V] = {
+ enableIdempotence: Boolean = false,
+ additionalProperties: Option[Properties] = None): KafkaProducer[K, V] = {
val producerProps = new Properties
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
@@ -504,6 +505,7 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence.toString)
+ additionalProperties.foreach(producerProps ++= _)
producerProps ++= JaasTestUtils.producerSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(saslProperties))
new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 7268ffc6ebeec..32465b0a027f7 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -127,6 +127,17 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
+
+
+
+
@@ -553,6 +564,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
-->
+
diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index 8e646534263a7..7830889a5313d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -414,7 +414,8 @@ KafkaProducer createProducer() {
Option.empty(),
new ByteArraySerializer(),
new ByteArraySerializer(),
- false
+ false,
+ Option.empty()
);
}
}