Skip to content

Commit

Permalink
Faust commits the wrong offset in case of a gap in acks #312 (#313)
Browse files Browse the repository at this point in the history
* Faust commits the wrong offset in case of a gap in acks #312

Faust commits the wrong offset in case of a gap in acks #312

Co-Authored-By: ekerstens <[email protected]>

* kljlk

jhkjhj

* Revert "kljlk"

This reverts commit 8b487fc.

* update fix #313

Co-Authored-By: ekerstens <[email protected]>

* update test case

Co-Authored-By: ekerstens <[email protected]>
Co-Authored-By: Leo <[email protected]>

Co-authored-by: Leo Huang <[email protected]>
Co-authored-by: ekerstens <[email protected]>
Co-authored-by: Leo <[email protected]>
  • Loading branch information
4 people authored Jun 2, 2022
1 parent 26ff8fc commit 00adae7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
12 changes: 12 additions & 0 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,18 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked.extend(gaps)
gap_for_tp[:gap_index] = []
acked.sort()

# We iterate over it until we handle gap in the head of acked queue
# then return the previous committed offset.
# For example if acked[tp] is:
# 34 35 36 37
# ^-- gap
# self._committed_offset[tp] is 31
# the return value will be None (the same as 31)
if self._committed_offset[tp]:
if min(acked) - self._committed_offset[tp] > 0:
return None

# Note: acked is always kept sorted.
# find first list of consecutive numbers
batch = next(consecutive_numbers(acked))
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,13 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9),
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11),
(TP1, [3, 4], [], None),
(TP1, [3, 4], [2], None),
(TP1, [3, 4], [1, 2], 5),
],
)
def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):
consumer._committed_offset[tp] = 1
consumer._acked[tp] = acked
consumer._gap[tp] = gaps
assert consumer._new_offset(tp) == expected_offset
Expand Down

0 comments on commit 00adae7

Please sign in to comment.