Skip to content

[Bug]: Unhandled exception in KafkaIO SDF #37449

@sjvanrossum

Description

@sjvanrossum

What happened?

#34659 introduced a "fail fast" feature that appears to have broken KafkaIO's SDF.

KafkaIO.java L1924-1939 (GenerateKafkaSourceDescriptor):

                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
                if (logTopicVerification == null || !logTopicVerification) {
                  checkState(
                      partitionInfoList != null && !partitionInfoList.isEmpty(),
                      "Could not find any partitions info for topic "
                          + topic
                          + ". Please check Kafka configuration and make sure "
                          + "that provided topics exist.");
                } else {
                  LOG.warn(
                      "Could not find any partitions info for topic {}. Please check Kafka configuration "
                          + "and make sure that the provided topics exist.",
                      topic);
                }

                for (PartitionInfo p : partitionInfoList) {

WatchForKafkaTopicPartitions.java L193-200:

          List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
          checkState(
              partitionInfoList != null && !partitionInfoList.isEmpty(),
              "Could not find any partitions info for topic "
                  + topic
                  + ". Please check Kafka configuration and make sure "
                  + "that provided topics exist.");
          for (PartitionInfo partition : partitionInfoList) {

This will throw IllegalStateException from those DoFns and result in retrying instead of stopping. I think logging the condition should suffice for GenerateKafkaSourceDescriptor and WatchForKafkaTopicPartitions.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions