diff --git a/README.adoc b/README.adoc index 39f4db1be..15d821f76 100644 --- a/README.adoc +++ b/README.adoc @@ -1534,6 +1534,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.4.0 + +=== Improvements + +* feature: Add batch by key processing order, creating batches of the same key + == 0.5.3.4 === Fixes diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 05cae8240..844e9cdde 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -117,7 +117,14 @@ public enum ProcessingOrder { * Process messages in key order. Concurrency is at most the number of unique keys in a topic, limited by the * max concurrency or uncommitted settings. */ - KEY + KEY, + + /** + * Together batch size > 1, creates batches of the same key. + * Concurrency is at most the number of unique keys in a topic, limited by the max concurrency or uncommitted + * settings. + */ + BATCH_BY_KEY } /** diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java index 4670cc8c0..2b6699ed2 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardKey.java @@ -30,7 +30,7 @@ public static ShardKey of(WorkContainer wc, ProcessingOrder ordering) { public static ShardKey of(ConsumerRecord rec, ProcessingOrder ordering) { return switch (ordering) { - case KEY -> ofKey(rec); + case KEY, BATCH_BY_KEY -> ofKey(rec); case PARTITION, UNORDERED -> ofTopicPartition(rec); }; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index a0f3ccdcc..aeefebef6 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -641,7 +641,7 @@ void workQueuesEmptyWhenAllWorkComplete() { @ParameterizedTest @EnumSource void resumesFromNextShard(ParallelConsumerOptions.ProcessingOrder order) { - Assumptions.assumeFalse(order == KEY); // just want to test ordered vs unordered + Assumptions.assumeFalse(order == KEY || order == BATCH_BY_KEY); // just want to test ordered vs unordered ParallelConsumerOptions build = ParallelConsumerOptions.builder() .ordering(order)