feat: add ackable message support with OnSuccess and Manual ack modes#109
feat: add ackable message support with OnSuccess and Manual ack modes#109silvioramalho wants to merge 9 commits intomasterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds opt-in Kafka acknowledgement modes to prevent offsets from being committed before message processing completes, closing the “silent loss on restart” window present with eager auto-commit.
Changes:
- Introduces
KafkaAckMode(Eager,OnSuccess,Manual) andKafkaAckableMessage<T>for explicit post-processing acknowledgement. - Updates
KafkaReceiverQueue<T>to support ackable dequeue operations and manual/controlled commits using a per-partitionPartitionCommitTracker. - Adds test coverage for ack modes and contiguous-per-partition commit tracking behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Take.Elephant.Kafka/KafkaAckMode.cs | Adds acknowledgement mode enum and associated docs. |
| src/Take.Elephant.Kafka/KafkaAckableMessage.cs | Adds ackable message envelope with idempotent AcknowledgeAsync. |
| src/Take.Elephant.Kafka/IKafkaAckableReceiverQueue.cs | Adds receiver interface for ackable dequeue APIs. |
| src/Take.Elephant.Kafka/PartitionCommitTracker.cs | Adds per-partition contiguous-ack tracking to compute safe commit offsets. |
| src/Take.Elephant.Kafka/KafkaReceiverQueue.cs | Implements ack modes, disables auto-commit for non-eager, exposes ackable dequeues, and commits offsets after ack. |
| src/Take.Elephant.Kafka/KafkaHeadersConverter.cs | Adds an internal alias used when constructing ackable messages. |
| src/Take.Elephant.Tests/Kafka/PartitionCommitTrackerFacts.cs | Adds unit tests for contiguous advancement / gap-holding commit tracking. |
| src/Take.Elephant.Tests/Kafka/KafkaReceiverQueueAckModeFacts.cs | Adds unit tests verifying ack-mode behavior and idempotent ack semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…t discontinuities Agent-Logs-Url: https://github.com/takenet/elephant/sessions/dd241f8a-83b8-43af-ba16-57aaa6c7ea57 Co-authored-by: silvioramalho <20154605+silvioramalho@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try | ||
| { | ||
| var tpo = new TopicPartitionOffset(tp, new Offset(commitOffset.Value)); | ||
| _consumer.StoreOffset(tpo); | ||
| _consumer.Commit(new[] { tpo }); | ||
| } | ||
| catch (KafkaException) | ||
| { | ||
| // The partition was revoked or rebalanced before the commit | ||
| // could be persisted. Kafka will redeliver these messages to | ||
| // the new owner — safe to discard this exception. | ||
| } |
There was a problem hiding this comment.
The ack delegate only catches KafkaException around StoreOffset/Commit. If AcknowledgeAsync is called after CloseAsync/Dispose (or during shutdown), these calls can also throw ObjectDisposedException/InvalidOperationException and would currently bubble up to the caller, potentially failing the pipeline even though shutdown is in progress. Consider broadening the catch to include disposal/invalid-state exceptions (or checking _closed) and treating them similarly to the rebalance case.
| /// <summary> | ||
| /// Records that <paramref name="offset"/> was consumed. Must be called before | ||
| /// the message is dispatched downstream. Bootstraps the HWM on the first call | ||
| /// so that contiguity is relative to this run's starting offset. | ||
| /// </summary> | ||
| public void Track(long offset) | ||
| { | ||
| lock (_lock) | ||
| { | ||
| if (_lastCommitted == long.MinValue) | ||
| _lastCommitted = offset - 1; | ||
| } |
There was a problem hiding this comment.
Track() only bootstraps _lastCommitted on the first call and doesn’t handle offset discontinuities within the same assignment. If consumption ever jumps forward (e.g., due to a seek, or any scenario where the next delivered offset is not lastCommitted+1), Acknowledge() will wait forever for the missing offsets and commits will be stuck. Consider tracking the last seen consumed offset and resetting _lastCommitted/_acked when a discontinuity is detected (or exposing an explicit Reset(startOffset) that KafkaReceiverQueue can call when the starting position changes).
| public KafkaReceiverQueue( | ||
| ConsumerConfig consumerConfig, | ||
| string topic, | ||
| ISerializer<T> serializer, | ||
| IDeserializer<string> deserializer = null) | ||
| : this( | ||
| new ConsumerBuilder<Ignore, string>(consumerConfig) | ||
| .SetValueDeserializer(deserializer ?? new StringDeserializer()) | ||
| .Build(), | ||
| serializer, | ||
| topic) | ||
| IDeserializer<string> deserializer = null, | ||
| KafkaAckMode ackMode = KafkaAckMode.Eager) | ||
| { |
There was a problem hiding this comment.
Ack mode is only configurable via the ConsumerConfig constructor overload; the (bootstrapServers, topic, groupId, ...) convenience constructor still hard-codes the old signature and can’t pass a non-Eager ackMode through. To keep the API consistent, consider adding an optional KafkaAckMode parameter to the convenience constructor and forwarding it to this overload.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public async Task<KafkaAckableMessage<T>> DequeueAckableOrDefaultAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| if (_ackMode == KafkaAckMode.Eager) | ||
| throw new InvalidOperationException( | ||
| $"DequeueAckableOrDefaultAsync requires KafkaAckMode.OnSuccess or Manual. Current mode: {_ackMode}."); | ||
|
|
||
| if (_ackableChannel.Reader.TryRead(out var entry)) | ||
| return BuildAckableMessage(entry); | ||
|
|
||
| return null; | ||
| } |
There was a problem hiding this comment.
DequeueAckableOrDefaultAsync does not start the consumer task (unlike DequeueOrDefaultAsync / DequeueWithHeadersOrDefaultAsync). If callers rely on the queue to auto-start on first dequeue, this method will keep returning null because no background Consume loop is running. It is also marked async but has no awaits. Consider calling StartConsumerTaskIfNotAsync(cancellationToken) before TryRead (or removing async and returning a completed Task).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// <summary> | ||
| /// Offset is stored only after the caller explicitly invokes | ||
| /// <see cref="KafkaAckableMessage{T}.AcknowledgeAsync"/> following | ||
| /// successful processing. If the POD restarts before ack, the message | ||
| /// will be redelivered. | ||
| /// </summary> |
There was a problem hiding this comment.
In the OnSuccess summary the doc says “Offset is stored” after AcknowledgeAsync, but the implementation commits offsets directly (auto offset store is disabled and Commit(...) is used). Consider changing this wording to “committed” (and keep “stored” only when referring to Confluent’s local offset store) to avoid confusion about the delivery guarantees.
| public KafkaReceiverQueue(IConsumer<Ignore, string> consumer, ISerializer<T> serializer, string topic, | ||
| KafkaAckMode ackMode = KafkaAckMode.Eager) | ||
| { |
There was a problem hiding this comment.
The injected-IConsumer constructor used to exist without an ackMode parameter; changing its signature (even with a default value) is also a binary breaking change for downstream assemblies compiled against the previous ctor. Consider restoring the original 3-parameter constructor as an overload that calls this one with KafkaAckMode.Eager.
Motivation
In the current Kafka consumer implementation (
KafkaAckMode.Eager), offsets are auto-committed by the Confluent client at the momentconsumer.Consume()is called — before the business processing completes. This means that if a pod restarts while a message is still being processed, the offset has already been committed and the message is silently lost.This PR introduces opt-in acknowledgement modes that allow upstream pipelines (e.g.,
iris-shared) to commit offsets only after confirmed processing, eliminating this silent loss window.What Changed
New types (
Take.Elephant.Kafka)KafkaAckMode.csEager(default, legacy),OnSuccess,ManualKafkaAckableMessage<T>.csAcknowledgeAsync()delegate. Idempotent (double-ack is a no-op).IKafkaAckableReceiverQueue<T>.csIKafkaReceiverQueue<T>withDequeueAckableAsyncandDequeueAckableOrDefaultAsyncPartitionCommitTracker.csModified (
KafkaReceiverQueue<T>)IKafkaAckableReceiverQueue<T>KafkaAckMode ackMode = KafkaAckMode.Eager(fully backward-compatible)ackMode != Eager: setsEnableAutoCommit=falseandEnableAutoOffsetStore=false; uses_ackableChannel(capacity 1) andPartitionCommitTrackerinternallyDequeueAsync/DequeueOrDefaultAsync/DequeueWithHeadersAsyncremain unchanged forEagerconsumersKafkaHeadersConverter.ToReadOnlyDictionaryadded as internal alias for ackable message constructionTests (
Take.Elephant.Tests)KafkaReceiverQueueAckModeFacts.csPartitionCommitTrackerFacts.csAck Mode Semantics
PartitionCommitTracker — no offset gap rule
Backward Compatibility
KafkaAckMode.Eagerpreserves all existing behavior with zero code changes required in consumers.KafkaReceiverQueue(ConsumerConfig, topic, serializer)signature unchanged (newackModeparameter is optional with defaultEager).KafkaReceiverQueue(IConsumer, serializer, topic)signature unchanged (same).Risks
OnSuccess/Manualmust be idempotent.AcknowledgeAsyncis never calledPartitionCommitTrackerholds back commits until contiguous. Safe but may delay commit window.IKafkaReceiverQueue<T>implementorsKafkaReceiverQueuenow implements the widerIKafkaAckableReceiverQueue<T>. Cast to narrower interface still works.Testing