Skip to content

KAFKA-20579: Initial producer incremental allocation for uncompressed data#22654

Open
lianetm wants to merge 12 commits into
apache:trunkfrom
lianetm:lm-producer-dyn-1
Open

KAFKA-20579: Initial producer incremental allocation for uncompressed data#22654
lianetm wants to merge 12 commits into
apache:trunkfrom
lianetm:lm-producer-dyn-1

Conversation

@lianetm

@lianetm lianetm commented Jun 23, 2026

Copy link
Copy Markdown
Member

Initial partial implementation for KIP-1332 (producer incremental
allocation strategy).

This PR includes:

  • new producer config for allocation strategy (incremental/full)
  • initial implementation of the incremental strategy: supports
    uncompressed data only (no growth support), extra-copy on send (linked
    chunks are flattened into a new buffer)
  • unit and integration tests (running existing Producer integration
    tests with the new strategy + new ones)

Support for compressed data and network layer improvements will come in
follow-up PRs.

Reviewers: Jun Rao junrao@gmail.com

@lianetm lianetm requested a review from junrao June 23, 2026 18:34
@github-actions github-actions Bot added core Kafka Broker producer build Gradle build or GitHub Actions clients labels Jun 23, 2026

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm : Thanks for the PR. Made a pass of the non-testing files. Left a few comments.

Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this as internal to prevent it from leaking into 4.4 before the feature is fully implemented?

// pool memory by completing batches). On exhaustion, close the batch (making it
// drainable) and fall through to the blocking first-record path next iteration.
try {
extensionChunks = chunkedFree.allocateChunks(extensionBytes, 0L);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do a special non-blocking call? In the existing logic, if an allocation request is blocked, all existing ProducerBatches become immediately drainable.

last.closeForRecordAppends();
}
}
continue;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may take a bit of time for the closed batches to be drained. If we continue here, it seems that the client will just busy-loop until some batches are drained and some free space becomes available in buffer pool?

* them, and retries. Otherwise defers to the parent.
*/
@Override
protected RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit awkward to have a return value of null and RecordAppendResult.needsExtension. Could we introduce a non-null value to indicate the batch is full?

// ProducerBatch), which can't take extension chunks. Only attach to a
// writable chunked batch; otherwise refund the chunks and re-evaluate.
if (last instanceof ChunkedProducerBatch && last.isWritable()) {
((ChunkedProducerBatch) last).addBuffers(extensionChunks);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess two concurrent clients could add buffers exceeding the batch size? Those buffers won't be used, but can only be freed after the batch is drained.

long remainingBytes = memoryRequired - (long) pooled.size() * chunkSize;
if (remainingBytes > 0) {
// remainingBytes <= memoryRequired <= totalMemory (validated above), so the int cast is safe.
freeUp((int) remainingBytes);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op since all pooled chunks have been used if we reach here.

pooled.add(free.pollFirst());
long remainingBytes = memoryRequired - (long) pooled.size() * chunkSize;
if (remainingBytes > 0) {
// remainingBytes <= memoryRequired <= totalMemory (validated above), so the int cast is safe.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is remainingBytes guaranteed to be an int? memoryRequired could be larger than int and pooled.size() initially could be 0.

}
long stillNeeded = memoryRequired - (long) pooled.size() * chunkSize - accumulated;
if (stillNeeded > 0) {
freeUp((int) stillNeeded);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be ok, but it's a bit weird to free up the chunks only to be reallocated again. Here is an alternative that doesn't require a freeup() call.

  // Reuse pooled chunks first. If a reused chunk covers a slot we already reserved as                                                                                                                                                                                                                                    
  // raw bytes in an earlier iteration, hand that raw reservation back to the pool.                                                                                                                                                                                                                                       
  while (pooled.size() < numChunks && !free.isEmpty()) {                                                                                                                                                                                                                                                                  
      pooled.add(free.pollFirst());                                                                                                                                                                                                                                                                                       
      if (accumulated >= chunkSize) {          // accumulated is always chunk-aligned here                                                                                                                                                                                                                                
          accumulated -= chunkSize;                                                                                                                                                                                                                                                                                       
          this.nonPooledAvailableMemory += chunkSize;   // refund → available to other waiters                                                                                                                                                                                                                            
      }                                                                                                                                                                                                                                                                                                                   
  }                                                                                                                                                                                                                                                                                                                       
  // Reserve raw memory for any still-uncovered chunks, in whole chunks.                                                                                                                                                                                                                                                  
  while (pooled.size() + (int)(accumulated / chunkSize) < numChunks                                                                                                                                                                                                                                                       
          && this.nonPooledAvailableMemory >= chunkSize) {                                                                                                                                                                                                                                                                
      this.nonPooledAvailableMemory -= chunkSize;                                                                                                                                                                                                                                                                         
      accumulated += chunkSize;                                                                                                                                                                                                                                                                                           
  }                                                                                                                                                                                                                                                                                                                       


// Take pool chunks first, then reserve non-pool bytes for the remainder.
while (pooled.size() < numChunks
&& (long) (pooled.size() + 1) * chunkSize + accumulated <= memoryRequired

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first condition is redundant, given the second one.

 (pooled+1)*chunkSize + accumulated ≤ memoryRequired                                                                                                                                                                                                                                                               
    ⇒  (pooled+1)*chunkSize ≤ memoryRequired − accumulated ≤ memoryRequired = numChunks*chunkSize                                                                                                                                                                                                                         
    ⇒  pooled+1 ≤ numChunks                                                                                                                                                                                                                                                                                               
    ⇒  pooled < numChunks

// On failure (timeout / close / interrupt), refund the non-pool bytes taken.
// Pool chunks already in `pooled` are returned to `free` separately by the
// outer catch.
this.nonPooledAvailableMemory += accumulated;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we return accumulated and pooled chunks in the same place? For example, we can set a flag like allocationCompleted to replace accumulated = 0. Then we can free both accumulated and pooled chunks if the flag is not set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions ci-approved clients core Kafka Broker producer tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants