diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 4922fa830..4133b01cd 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -719,7 +719,7 @@ def ack(self, message: Message) -> bool: if self.app.topics.acks_enabled_for(message.topic): committed = self._committed_offset[tp] try: - if committed is None or offset > committed: + if committed is None or offset >= committed: acked_index = self._acked_index[tp] if offset not in acked_index: self._unacked_messages.discard(message) @@ -1005,7 +1005,7 @@ def _new_offset(self, tp: TP) -> Optional[int]: acked[:len(batch) - 1] = [] self._acked_index[tp].difference_update(batch) # return the highest commit offset - return batch[-1] + return batch[-1] + 1 return None async def on_task_error(self, exc: BaseException) -> None: @@ -1059,7 +1059,7 @@ async def _drain_messages( offset = message.offset r_offset = get_read_offset(tp) - if r_offset is None or offset > r_offset: + if r_offset is None or offset >= r_offset: gap = offset - (r_offset or 0) # We have a gap in income messages if gap > 1 and r_offset: diff --git a/t/unit/transport/test_consumer.py b/t/unit/transport/test_consumer.py index 869f517e7..ef0736469 100644 --- a/t/unit/transport/test_consumer.py +++ b/t/unit/transport/test_consumer.py @@ -823,7 +823,8 @@ def test_filter_committable_offsets(self, *, consumer): TP2: 30, } assert consumer._filter_committable_offsets({TP1, TP2}) == { - TP2: 36, + TP1: 5, + TP2: 37, } @pytest.mark.asyncio @@ -959,10 +960,10 @@ def test_should_commit(self, tp, offset, committed, should, *, consumer): @pytest.mark.parametrize('tp,acked,expected_offset', [ (TP1, [], None), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 10), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], 8), - (TP1, [1, 2, 3, 4, 6, 7, 8, 10], 4), - (TP1, [1, 3, 4, 6, 7, 8, 10], 1), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 11), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], 9), + (TP1, [1, 2, 3, 4, 6, 7, 8, 10], 5), + (TP1, [1, 3, 4, 6, 7, 8, 10], 2), ]) def test_new_offset(self, tp, acked, expected_offset, *, consumer): consumer._acked[tp] = acked @@ -970,10 +971,10 @@ def test_new_offset(self, tp, acked, expected_offset, *, consumer): @pytest.mark.parametrize('tp,acked,gaps,expected_offset', [ (TP1, [], [], None), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 10), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 10), - (TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 8), - (TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 10), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 11), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11), + (TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9), + (TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11), ]) def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):