Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
files="(Utils|Topic|Lz4BlockOutputStream|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
files="(AbstractFetch|ChunkedBufferPool|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaProducerTest).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.clients.producer.internals.ChunkedBufferPool;
import org.apache.kafka.clients.producer.internals.ChunkedRecordAccumulator;
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
Expand Down Expand Up @@ -90,6 +92,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -456,19 +459,49 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
// As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
// batching which in practice actually means using a batch size of 1.
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,
batchSize,
compression,
lingerMs(config),
retryBackoffMs,
retryBackoffMaxMs,
deliveryTimeoutMs,
partitionerConfig,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
String allocationStrategy = config.getString(ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG)
.toLowerCase(Locale.ROOT);
boolean incremental = ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL.equals(allocationStrategy);
// Use the chunked path only when a batch is at least one full chunk
// (batch.size >= CHUNK_SIZE). Below that, a batch can't fill even one chunk, so chunking
// would over-reserve and the producer falls back to the full strategy instead.
boolean useIncremental = incremental && batchSize >= ChunkedRecordAccumulator.CHUNK_SIZE;
// The chunked path does not support compression yet (TODO: KAFKA-20579)
if (useIncremental && compression.type() != CompressionType.NONE) {
throw new ConfigException("The " + ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL
+ " " + ProducerConfig.BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG
+ " does not support compression yet. " + ProducerConfig.COMPRESSION_TYPE_CONFIG
+ " must be set to none.");
}
if (useIncremental) {
this.accumulator = new ChunkedRecordAccumulator(logContext,
batchSize,
compression,
lingerMs(config),
retryBackoffMs,
retryBackoffMaxMs,
deliveryTimeoutMs,
partitionerConfig,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
transactionManager,
new ChunkedBufferPool(this.totalMemorySize, ChunkedRecordAccumulator.CHUNK_SIZE, metrics, time, PRODUCER_METRIC_GROUP_NAME));
} else {
this.accumulator = new RecordAccumulator(logContext,
batchSize,
compression,
lingerMs(config),
retryBackoffMs,
retryBackoffMaxMs,
deliveryTimeoutMs,
partitionerConfig,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
}

this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,20 @@ public class ProducerConfig extends AbstractConfig {
+ "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
+ "compression is enabled) as well as for maintaining in-flight requests.";

/** <code>buffer.memory.allocation.strategy</code> */
public static final String BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG = "buffer.memory.allocation.strategy";
public static final String BUFFER_MEMORY_ALLOCATION_STRATEGY_FULL = "full";
public static final String BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL = "incremental";
private static final String BUFFER_MEMORY_ALLOCATION_STRATEGY_DOC = "Controls how the producer allocates memory from <code>" + BUFFER_MEMORY_CONFIG + "</code> for record batches. The following values are supported: "
+ "<ul>"
+ "<li><code>" + BUFFER_MEMORY_ALLOCATION_STRATEGY_FULL + "</code>: reserves a full <code>" + BATCH_SIZE_CONFIG + "</code> up front when a batch is created, "
+ "regardless of how much data it ends up holding. Pool memory therefore scales with the number of active partitions.</li>"
+ "<li><code>" + BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL + "</code>: allocates memory on demand as records are appended, growing a batch "
+ "up to <code>" + BATCH_SIZE_CONFIG + "</code>. Pool memory therefore scales with the data actually buffered rather than the number of active "
+ "partitions, allowing larger <code>" + BATCH_SIZE_CONFIG + "</code> values (e.g. for high-latency clusters) without reserving "
+ "<code>" + BATCH_SIZE_CONFIG + "</code> for every active partition.</li>"
+ "</ul>";

/** <code>retry.backoff.ms</code> */
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;

Expand Down Expand Up @@ -399,6 +413,13 @@ public class ProducerConfig extends AbstractConfig {
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(BUFFER_MEMORY_ALLOCATION_STRATEGY_CONFIG,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this as internal to prevent it from leaking into 4.4 before the feature is fully implemented?

Type.STRING,
BUFFER_MEMORY_ALLOCATION_STRATEGY_FULL,
ConfigDef.CaseInsensitiveValidString
.in(BUFFER_MEMORY_ALLOCATION_STRATEGY_FULL, BUFFER_MEMORY_ALLOCATION_STRATEGY_INCREMENTAL),
Importance.MEDIUM,
BUFFER_MEMORY_ALLOCATION_STRATEGY_DOC)
.define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ public class BufferPool {

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */
private long nonPooledAvailableMemory;
/** Lock held for any read or write of {@link #free}, {@link #waiters}, {@link #nonPooledAvailableMemory}, or {@link #closed}. */
protected final ReentrantLock lock;
/** Pooled buffers of capacity {@link #poolableSize}, available for reuse. Guarded by {@link #lock}. */
protected final Deque<ByteBuffer> free;
/** FIFO queue of pending allocation requests; the longest-waiting thread is woken first. Guarded by {@link #lock}. */
protected final Deque<Condition> waiters;
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. Guarded by {@link #lock}. */
protected long nonPooledAvailableMemory;
private final Metrics metrics;
private final Time time;
protected final Time time;
private final Sensor waitTime;
private boolean closed;
protected boolean closed;

/**
* Create a new buffer pool
Expand Down Expand Up @@ -192,8 +195,7 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
signalNextWaiterIfMemoryAvailable();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
Expand All @@ -211,6 +213,15 @@ protected void recordWaitTime(long timeNs) {
this.waitTime.record(timeNs, time.milliseconds());
}

/**
* Wake the longest-waiting thread if any memory (pooled or non-pooled) is available.
* Must be called with {@link #lock} held. No-op if no waiters or no memory is free.
*/
protected void signalNextWaiterIfMemoryAvailable() {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}

/**
* Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to
* available memory and signal the next waiter if it exists.
Expand All @@ -222,16 +233,24 @@ private ByteBuffer safeAllocateByteBuffer(int size) {
error = false;
return buffer;
} finally {
if (error) {
this.lock.lock();
try {
this.nonPooledAvailableMemory += size;
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
this.lock.unlock();
}
}
if (error)
releaseReservedBytes(size);
}
}

/**
* Return previously-reserved non-pooled bytes to the pool and signal the next
* waiter. Acquires {@link #lock} internally. Used by callers
* that reserve memory and then need to roll back the reservation (e.g., upon errors).
*/
protected void releaseReservedBytes(long bytes) {
this.lock.lock();
try {
this.nonPooledAvailableMemory += bytes;
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
this.lock.unlock();
}
}

Expand All @@ -242,9 +261,9 @@ protected ByteBuffer allocateByteBuffer(int size) {

/**
* Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
* buffers (if needed)
* buffers (if needed). Must be called with {@link #lock} held.
*/
private void freeUp(int size) {
protected void freeUp(int size) {
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
Expand Down
Loading
Loading