Skip to content

[fix][io] Fix delayed acknowledgment for offset 0 in Kafka Connect Adapter#24

Open
sandeep-mst wants to merge 2 commits intoapache:masterfrom
cognitree:kafka-connect-adapter-offsets
Open

[fix][io] Fix delayed acknowledgment for offset 0 in Kafka Connect Adapter#24
sandeep-mst wants to merge 2 commits intoapache:masterfrom
cognitree:kafka-connect-adapter-offsets

Conversation

@sandeep-mst
Copy link
Copy Markdown
Contributor

@sandeep-mst sandeep-mst commented May 6, 2026

Fixes #23

Motivation

PulsarKafkaSinkTaskContext.currentOffsets() was filtering out offsets equal to 0 by only including values greater than 0. This caused the first record in a partition, whose offset can legitimately be 0, to be omitted from the offsets snapshot returned to KafkaConnectSink#flush(). As a result, the acknowledgment for that record was not flushed until a later record with a non-zero offset was processed.

This change ensures that offset 0 is treated as a valid offset and is included in the current offsets map, so the first record can be acknowledged without waiting for another record.

Modifications

The offset filter in PulsarKafkaSinkTaskContext.currentOffsets() was changed from:
if (offset > 0) {
to:
if (offset >= 0) {

This includes records with offset 0 in the returned snapshot.
A assertion was added to existing offsetTest in KafkaConnectSinkTest to verify that currentOffsets() contains that record and reports offset 0 correctly.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest#offsetTest (updated existing test)

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:
cognitree#6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Kafka Connect Adapter does not acknowledge records with offset 0 until next record

1 participant