diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index 4e16ab7c..295ae8fc 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -82,7 +82,7 @@ impl KafkaConsumerState { } /// Treat recoverable `librdkafka` errors as an empty poll the same way Python treats `KafkaError._TRANSPORT`. -fn kafka_error_is_recoverable(err: &KafkaError) -> bool { +fn kafka_poll_error_is_recoverable(err: &KafkaError) -> bool { matches!( err, KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerTransportFailure) @@ -90,6 +90,19 @@ fn kafka_error_is_recoverable(err: &KafkaError) -> bool { ) } +/// Same set as Python `KafkaConsumer`'s `retryable_errors` used with `BasicRetryPolicy` for `commit_offsets`. +fn kafka_commit_error_is_retryable(err: &KafkaError) -> bool { + matches!( + err.rdkafka_error_code(), + Some( + RDKafkaErrorCode::RequestTimedOut + | RDKafkaErrorCode::NotCoordinator + | RDKafkaErrorCode::CoordinatorLoadInProgress + | RDKafkaErrorCode::WaitingForCoordinator + ) + ) +} + fn create_kafka_message(topics: &[Topic], msg: BorrowedMessage) -> BrokerMessage { let topic = msg.topic(); // NOTE: We avoid calling `Topic::new` here, as that uses a lock to intern the `topic` name. @@ -119,6 +132,10 @@ fn commit_impl( consumer: &BaseConsumer>, offsets: HashMap, ) -> Result<(), ConsumerError> { + // Mirror `BasicRetryPolicy` usage on `commit_offsets` in Python + const MAX_ATTEMPTS: u32 = 3; + const RETRY_DELAY: Duration = Duration::from_secs(1); + let mut partitions = TopicPartitionList::with_capacity(offsets.len()); for (partition, offset) in &offsets { partitions.add_partition_offset( @@ -128,8 +145,31 @@ fn commit_impl( )?; } - consumer.commit(&partitions, CommitMode::Sync)?; - Ok(()) + let mut attempt = 0u32; + + loop { + attempt += 1; + + match consumer.commit(&partitions, CommitMode::Sync) { + Ok(()) => return Ok(()), + + Err(err) => { + if kafka_commit_error_is_retryable(&err) && attempt < MAX_ATTEMPTS { + tracing::warn!( + error = %err, + attempt, + max_attempts = MAX_ATTEMPTS, + "Kafka offset commit failed with retryable error, retrying after {:?}", + RETRY_DELAY + ); + + std::thread::sleep(RETRY_DELAY); + } else { + return Err(err.into()); + } + } + } + } } struct OffsetCommitter<'a, C: AssignmentCallbacks> { @@ -397,7 +437,7 @@ impl ArroyoConsumer for KafkaConsumer Ok(None), - Some(Err(err)) if kafka_error_is_recoverable(&err) => { + Some(Err(err)) if kafka_poll_error_is_recoverable(&err) => { let error: &dyn std::error::Error = &err; tracing::warn!(error, "Kafka poll transport error, retrying..."); Ok(None)