Fail ingestion tasks when OffsetOutOfRangeException is raised#18226
Fail ingestion tasks when OffsetOutOfRangeException is raised#18226FrankChen021 wants to merge 5 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR improves Kafka ingestion tasks by failing immediately on OffsetOutOfRangeException when auto-reset is disabled, ensuring messages before the expired offset are persisted and published.
- Introduces
TaskStatusExceptionto propagate task state and error messages. - Updates
SeekableStreamIndexTaskRunnerto catchTaskStatusExceptionand report task failure. - Modifies
KafkaIndexTaskRunnerto throwTaskStatusException.failon offset-out-of-range when auto-reset is off.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/TaskStatusException.java | New exception class encapsulating a TaskState and message |
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java | Catches TaskStatusException around getRecords and adjusts onFailure to return a failure status |
| extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java | Throws TaskStatusException.fail on OffsetOutOfRangeException when automatic reset is disabled |
Comments suppressed due to low confidence (3)
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/TaskStatusException.java:25
- [nitpick] Add a class-level Javadoc explaining the purpose of this exception and how it should be used to signal task status changes.
public class TaskStatusException extends Exception
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:2092
- [nitpick] This
@throwsdescription is unclear and grammatically awkward. Consider rephrasing to:Throws TaskStatusException when an OffsetOutOfRangeException occurs (with messages persisted), and clarifying other exception paths.
* @throws TaskStatusException all ingested message will be persisted and published.
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java:94
- No unit or integration tests appear to cover the new failure path when auto-reset is disabled and
OffsetOutOfRangeExceptionis thrown. Consider adding a test to verify immediate task failure and message persistence.
return recordSupplier.poll(task.getIOConfig().getPollTimeout());
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java
Show resolved
Hide resolved
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java
Show resolved
Hide resolved
|
merging master in should help with CI as the QTest timeouts are now less likely |
|
This pull request has been marked as stale due to 60 days of inactivity. |
|
@FrankChen021 looks like there's some unit test failures that are related to your changes. |
|
This pull request has been marked as stale due to 60 days of inactivity. |
|
This pull request/issue has been closed due to lack of activity. If you think that |
GWphua
left a comment
There was a problem hiding this comment.
LGTM, one comment about the exception message.
Can merge after CI passes.
| if (!task.getTuningConfig().isResetOffsetAutomatically()) { | ||
| throw TaskStatusException.fail( | ||
| e.getMessage() | ||
| + "\nThis may happen when given offsets have been deleted at the Kafka server due to the retention configuration. " | ||
| + "\nYou can use supervisor's reset API to set the offset to a valid position." | ||
| ); | ||
| } |
There was a problem hiding this comment.
Since this block fails with isResetOffsetAutomatically, I think we can also include a message to tell users that we can set a config to reset offset automatically, instead of needing to manually reset everytime.
Something like:
... use supervisor's reset API to set the offset to a valid position, or turn on resetOffsetAutomatically
|
Hi @FrankChen021 |
Fixes #14344
Description
When
OffsetOutOfRangeExceptionis raised, previously, it waits until task completes. This brings problems for operators that it may take long time to observe such exception is raised.Now the exception handling is improved if auto reset is not enabled:
Release note
Tasks that encounters
OffsetOutOfRangeExceptionexception will fail immediatelyFollow these steps to test:
kafka-producer-perf-test.shto generate events to the test topic in very large throughputafter a while when kafka server deletes offset, we can see from task log, task pushed segments and fail at last
On the task view, it's state is displayed as FAIIL

The status of task records the out of range exception
This PR has: