diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 824a4c4a8ba08..44f6013e10812 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -499,7 +499,6 @@ public void initialize( staticVoters, log, serde, - BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, logContext, kafkaRaftMetrics, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 94bd8f526961c..f1896d0d69802 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -55,7 +55,6 @@ public final class KRaftControlRecordStateMachine { private final LogContext logContext; private final RaftLog log; private final RecordSerde serde; - private final BufferSupplier bufferSupplier; private final Logger logger; private final int maxBatchSizeBytes; @@ -82,7 +81,6 @@ public final class KRaftControlRecordStateMachine { * @param staticVoterSet the set of voter statically configured * @param log the on disk topic partition * @param serde the record decoder for data records - * @param bufferSupplier the supplier of byte buffers * @param maxBatchSizeBytes the maximum size of record batch * @param logContext the log context */ @@ -90,7 +88,6 @@ public KRaftControlRecordStateMachine( VoterSet staticVoterSet, RaftLog log, RecordSerde serde, - BufferSupplier bufferSupplier, int maxBatchSizeBytes, LogContext logContext, KafkaRaftMetrics kafkaRaftMetrics, @@ -100,7 +97,6 @@ public KRaftControlRecordStateMachine( this.log = log; this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext); this.serde = serde; - this.bufferSupplier = bufferSupplier; this.maxBatchSizeBytes = maxBatchSizeBytes; this.logger = logContext.logger(getClass()); this.kafkaRaftMetrics = kafkaRaftMetrics; @@ -232,25 +228,27 @@ private void checkOffsetIsValid(long offset) { } private void maybeLoadLog() { - while (log.endOffset().offset() > nextOffset) { - LogFetchInfo info = log.read( - nextOffset, - Isolation.UNCOMMITTED, - Integer.MAX_VALUE - ); - try (RecordsIterator iterator = new RecordsIterator<>( - info.records, - serde, - bufferSupplier, - maxBatchSizeBytes, - true, // Validate batch CRC - logContext - ) - ) { - while (iterator.hasNext()) { - Batch batch = iterator.next(); - handleBatch(batch, OptionalLong.empty()); - nextOffset = batch.lastOffset() + 1; + try (BufferSupplier bufferSupplier = BufferSupplier.create()) { + while (log.endOffset().offset() > nextOffset) { + LogFetchInfo info = log.read( + nextOffset, + Isolation.UNCOMMITTED, + Integer.MAX_VALUE + ); + try (RecordsIterator iterator = new RecordsIterator<>( + info.records, + serde, + bufferSupplier, + maxBatchSizeBytes, + true, // Validate batch CRC + logContext + ) + ) { + while (iterator.hasNext()) { + Batch batch = iterator.next(); + handleBatch(batch, OptionalLong.empty()); + nextOffset = batch.lastOffset() + 1; + } } } } @@ -271,7 +269,7 @@ private void maybeLoadSnapshot() { try (SnapshotReader reader = RecordsSnapshotReader.of( rawSnapshot, serde, - bufferSupplier, + BufferSupplier.create(), maxBatchSizeBytes, true, // Validate batch CRC logContext diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java index 84ca54f206a32..82def14f7f497 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java @@ -58,7 +58,6 @@ private static KRaftControlRecordStateMachine buildPartitionListener( staticVoterSet, log, STRING_SERDE, - BufferSupplier.NO_CACHING, 1024, new LogContext(), raftMetrics,