Skip to content

Correctly reset kafka offset if auto reset is enabled #18282

@FrankChen021

Description

@FrankChen021

This problem has been there for a very long time. See #11658

When OffsetOutOfRangeException is raised, and if auto reset is not enabled(which is the default one), the behavior is fixed(see #18226) so that all data ingested before the exception WILL be handled, and task will FAIL. This is similar to kafka client behavior if 'auto.offset.reset' is not given.

However, when auto reset is enabled, current behaviour does not work as expected.
There're several bugs there:

  1. task resets the offset by using the expired offset, which is wrong, instead, the earliest offset should be used
  2. overlord kills task after offset is reset, and all ingested data lost
  3. overlord can reset the offset when it starts a new task. and this time, overlord will CLEAR(which is wrong) the offset stored in the metadatastore, and throws a exception, which delays the start of task

as we can see that, for the auto reset, both task and overlord are involved in making decision of offset reset which makes it overcomplicated.

The following sequence diagram shows how the task and overlord works.

sequenceDiagram
    participant Task
    participant Overlord
    participant Metdatastorage

    alt Reset Offset from tasks
      Task->>Overlord: 1.Reset Offset with expired offset (WRONG)
      Overlord->>Metdatastorage: 2.Read offset from metadata storage
      Overlord->>Overlord: 3.Check if offset is available
      Overlord->>Overlord: 4.Reset offset if NOT available (with current offset PLUS offset from metadata storage)
      Overlord->>Overlord: 5.Throw exception and wait for next retry if offset is available
      Overlord->>Task: Kill tasks
    end    
    
    alt Starting a new task
      Overlord->>Overlord: 6.Start a new task
      Overlord->>Metdatastorage: 7.Read offset from metadata storage
      Overlord->>Overlord: 8.Check if offset is available
      Overlord->>Overlord: 9.Reset offset if NOT available (with current offset MINUS(which is WRONG) offset from metadata storage)
      Overlord->>Overlord: 10.Throw exception and wait for next retry to start a task(introducing delay)
    end
Loading
  1. Tasks reset the offset by using 'nextOffset' which is currently expired offset. this is wrong

    final long nextOffset = outOfRangePartition.getValue();
    // seek to the beginning to get the least available offset
    StreamPartition<Integer> streamPartition = StreamPartition.of(
    topicPartition.topic(),
    topicPartition.partition()
    );
    final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
    if (leastAvailableOffset == null) {
    throw new ISE(
    "got null sequence number for partition[%s] when fetching from kafka!",
    topicPartition.partition()
    );
    }
    // reset the seek
    recordSupplier.seek(streamPartition, nextOffset);
    // Reset consumer offset if resetOffsetAutomatically is set to true
    // and the current message offset in the kafka partition is more than the
    // next message offset that we are trying to fetch
    if (leastAvailableOffset > nextOffset) {
    doReset = true;
    resetPartitions.put(topicPartition, nextOffset);
    }

  2. overlord throws an exception after reset, which delays the start of a task

    if (taskTuningConfig.isResetOffsetAutomatically()) {
    resetInternal(
    createDataSourceMetaDataForReset(ioConfig.getStream(), ImmutableMap.of(partition, sequence))
    );
    throw new StreamException(
    new ISE(
    "Previous sequenceNumber [%s] is no longer available for partition [%s] - automatically resetting"
    + " sequence",
    sequence,
    partition
    )

  3. overlord clears the offset when reseting offset.

    if (currentMetadata == null) {
    metadataUpdateSuccess = true;
    } else {
    final DataSourceMetadata newMetadata = currentMetadata.minus(resetMetadata);
    try {
    metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(supervisorId, newMetadata);
    }

in above code block, it call 'minus' method instead of 'plus' method. the resetOffsetsInternal used by task correctly calls the plus method.
In next turn, overlord find that there's no offset for this partition, it reset the offset to earliest or latest which is based on useEarliestOffset property.

metadata.getClass()
);
}
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) metadata;
final DataSourceMetadata newMetadata = currentMetadata.plus(resetMetadata);
log.info("Current checkpointed metadata[%s], new metadata[%s] for supervisor[%s] for dataSource[%s]", currentMetadata, newMetadata, supervisorId, dataSource);
try {
metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(supervisorId, newMetadata);
}

As a subsquence, when OffsetOutOfRangeException is raised, in extreme cases, new tasks are continously forked but all fail.

Proposed Change

To make it simple and clear, the key point is that, ONLY overlord is responsible for reseting the offset.

  1. remove offset reset logic from task. when OffsetOutOfRangeException is raised, it fail but commits all ingested data as fixed by Fail ingestion tasks when OffsetOutOfRangeException is raised #18226
  2. when supervisor tries to start a new task, it will know that last stored offset is not available, and tries to reset
    1. use the EARLIEST offset from stream(like Kafka) to reset offset
    2. correctly merge the reset offset and stored offset before update offset in metadata storage. this ensure that the overlord always resets the offset to earliest so that less data are lost.
    3. issue an alert from overlord that offset has been reset
    4. after reset, return the this offset so that supervisor can immediately start task from the reset offset

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions