[Reader] fix: deliver null-value tombstones instead of discarding them#1482
Merged
nodece merged 3 commits intoapache:masterfrom Apr 28, 2026
Merged
[Reader] fix: deliver null-value tombstones instead of discarding them#1482nodece merged 3 commits intoapache:masterfrom
nodece merged 3 commits intoapache:masterfrom
Conversation
A message published with MessageMetadata.null_value = true (the Pulsar compaction tombstone convention) was conflated with a deserialization failure in partitionConsumer.MessageReceived and silently discarded. Because lastDequeuedMsg was never advanced past the tombstone, hasMoreMessages kept returning true and Reader.Next blocked forever when a tombstone was the last message on a topic. Consumer: when the reader yields an empty payload and msgMeta or the single-message metadata has null_value set, build a normal message with payLoad == nil and take the usual dispatch path so lastDequeuedMsg advances. Real corruption still routes through discardCorruptedMessage. Producer: set MessageMetadata.null_value / SingleMessageMetadata.null_value when both Value and Payload are nil, matching the Java client so Go-produced tombstones carry the flag consumers need. Message gains an IsNullValue() bool accessor so applications can tell tombstones apart from empty payloads.
Reader] fix: deliver null-value tombstones instead of discarding them
Contributor
There was a problem hiding this comment.
Pull request overview
Fixes reader/consumer handling of Pulsar compaction tombstones by delivering null-value messages (instead of discarding them) and ensuring producers correctly mark tombstones on the wire, with a new Message.IsNullValue() accessor for applications.
Changes:
- Consumer: treat
null_valuemessages as valid even whenReadMessage()returnsinternal.ErrEOM, ensuring dispatch progresses past tombstones. - Producer: set
MessageMetadata.null_value/SingleMessageMetadata.null_valuewhen bothValueandPayloadare nil. - Public API: add
IsNullValue()to theMessageinterface and add a reader test covering tombstone delivery.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pulsar/consumer_partition.go | Accept and dispatch tombstone messages (null-value) instead of discarding them as corrupted. |
| pulsar/producer_partition.go | Emit protocol null_value flags for nil Value + nil Payload messages (single and batched). |
| pulsar/message.go | Adds IsNullValue() to the Message interface with explanatory doc comment. |
| pulsar/impl_message.go | Implements IsNullValue() on the concrete message type and stores a new isNullValue field. |
| pulsar/reader_test.go | Adds a reader test ensuring tombstones are delivered and HasNext() terminates. |
| pulsar/negative_acks_tracker_test.go | Updates test mocks to satisfy the extended Message interface. |
| pulsar/internal/pulsartracing/message_carrier_util_test.go | Updates tracing test mock to satisfy the extended Message interface. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Quoting Copilot's feedback: When accepting a tombstone with err == internal.ErrEOM, the code keeps err non-nil and continues. It works today because err is not used later, but it’s easy to misread and could become a latent bug if future logic inspects/returns err after this block. Consider explicitly setting err = nil in the tombstone/ErrEOM acceptance path to make the intent unambiguous.
Member
|
@jcmfernandes Please fix lint. |
Contributor
Author
Done. |
Member
|
Good work @jcmfernandes |
Contributor
Author
|
Thank you very much, @nodece! And thanks for helping me get this merged. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
A message published with
MessageMetadata.null_value = true(the Pulsar compaction tombstone convention) was conflated with a deserialization failure inpartitionConsumer.MessageReceivedand silently discarded. BecauselastDequeuedMsgwas never advanced past the tombstone,hasMoreMessageskept returning true andReader.Nextblocked forever when a tombstone was the last message on a topic.Modifications
Consumer: when the reader yields an empty payload and message metadata or the single-message metadata has
null_valueset, build a normal message withpayLoad == niland take the usual dispatch path solastDequeuedMsgadvances. Real corruption still routes throughdiscardCorruptedMessage.Producer: set
MessageMetadata.null_value/SingleMessageMetadata.null_valuewhen bothValueandPayloadare nil, matching the Java client so Go-produced tombstones carry the flag consumers need.Message gains an
IsNullValue()bool accessor so applications can tell tombstones apart from empty payloads.Note: This PR was written with the assistance of AI Anthropic’s Opus 4.7.
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesyes/ no)no) (addedIsNullValue()to theMessageinterface)yes/ no /don't know)yes/ no)yes/ no)Documentation
yes/ no)