From 4e8a38f753008857b112b61b85fc8255b6f2473e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Louren=C3=A7o?= Date: Thu, 15 Jan 2026 18:12:11 +0000 Subject: [PATCH 1/2] Introduce new processing order option, BATCH_BY_KEY, that will group records of the same key in the same batch --- .../parallelconsumer/ParallelConsumerOptions.java | 9 ++++++++- .../io/confluent/parallelconsumer/state/ShardKey.java | 2 +- .../parallelconsumer/state/WorkManagerTest.java | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 05cae8240..844e9cdde 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -117,7 +117,14 @@ public enum ProcessingOrder { * Process messages in key order. Concurrency is at most the number of unique keys in a topic, limited by the * max concurrency or uncommitted settings. */ - KEY + KEY, + + /** + * Together batch size > 1, creates batches of the same key. + * Concurrency is at most the number of unique keys in a topic, limited by the max concurrency or uncommitted + * settings. + */ + BATCH_BY_KEY } /** diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java index 4670cc8c0..2b6699ed2 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java @@ -30,7 +30,7 @@ public static ShardKey of(WorkContainer wc, ProcessingOrder ordering) { public static ShardKey of(ConsumerRecord rec, ProcessingOrder ordering) { return switch (ordering) { - case KEY -> ofKey(rec); + case KEY, BATCH_BY_KEY -> ofKey(rec); case PARTITION, UNORDERED -> ofTopicPartition(rec); }; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index a0f3ccdcc..aeefebef6 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -641,7 +641,7 @@ void workQueuesEmptyWhenAllWorkComplete() { @ParameterizedTest @EnumSource void resumesFromNextShard(ParallelConsumerOptions.ProcessingOrder order) { - Assumptions.assumeFalse(order == KEY); // just want to test ordered vs unordered + Assumptions.assumeFalse(order == KEY || order == BATCH_BY_KEY); // just want to test ordered vs unordered ParallelConsumerOptions build = ParallelConsumerOptions.builder() .ordering(order) From 97b620d22b1e1eea9f2c0c044080b65e453d302b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Louren=C3=A7o?= Date: Thu, 15 Jan 2026 18:18:03 +0000 Subject: [PATCH 2/2] Readme update --- README.adoc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.adoc b/README.adoc index 39f4db1be..15d821f76 100644 --- a/README.adoc +++ b/README.adoc @@ -1534,6 +1534,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.4.0 + +=== Improvements + +* feature: Add batch by key processing order, creating batches of the same key + == 0.5.3.4 === Fixes