Skip to content

Commit db58b59

Browse files
moonyoungCHAEspring-builds
authored andcommitted
GH-4170 : Add KafkaListener Validation (Allow @Topic or @TopicPartition) (#4172)
Fixes #4170 * add topic validation of kafka listener * refactor: allow topic pattern for validtion Signed-off-by: moonyoungCHAE <xpf_fl@naver.com> (cherry picked from commit a0b5b70)
1 parent 2982a39 commit db58b59

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,9 +666,12 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
666666
endpoint.setBean(bean);
667667
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
668668
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
669+
670+
assertTopic(kafkaListener);
669671
endpoint.setTopicPartitions(tps);
670672
endpoint.setTopics(topics);
671673
endpoint.setTopicPattern(resolvePattern(kafkaListener));
674+
672675
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
673676
endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
674677
String group = kafkaListener.containerGroup();
@@ -859,6 +862,20 @@ private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String
859862
return groupId;
860863
}
861864

865+
private void assertTopic(KafkaListener kafkaListener) {
866+
int count = 0;
867+
if (!kafkaListener.topicPattern().isEmpty()) {
868+
count++;
869+
}
870+
if (kafkaListener.topics().length > 0) {
871+
count++;
872+
}
873+
if (kafkaListener.topicPartitions().length > 0) {
874+
count++;
875+
}
876+
Assert.state(count == 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");
877+
}
878+
862879
private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
863880
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
864881
List<TopicPartitionOffset> result = new ArrayList<>();

0 commit comments

Comments
 (0)