Skip to content

Commit

Permalink
fix: increase total events also when using sync testing (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh authored Nov 22, 2023
1 parent 71da1fe commit 45c4473
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 58 deletions.
13 changes: 8 additions & 5 deletions kstreams/test_utils/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ class Topic:

async def put(self, event: ConsumerRecord) -> None:
await self.queue.put(event)

# keep track of the amount of events per topic partition
self.total_partition_events[event.partition] += 1
self.total_events += 1
self._inc_amount(event)

async def get(self) -> ConsumerRecord:
return await self.queue.get()
Expand All @@ -33,7 +30,13 @@ def get_nowait(self) -> ConsumerRecord:
return self.queue.get_nowait()

def put_nowait(self, *, event: ConsumerRecord) -> None:
return self.queue.put_nowait(event)
self.queue.put_nowait(event)
self._inc_amount(event)

def _inc_amount(self, event: ConsumerRecord) -> None:
# keep track of the amount of events per topic partition
self.total_partition_events[event.partition] += 1
self.total_events += 1

def task_done(self) -> None:
self.queue.task_done()
Expand Down
103 changes: 50 additions & 53 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import Mock, call, patch
from unittest.mock import Mock, call

import pytest

Expand Down Expand Up @@ -332,56 +332,53 @@ async def test_streams_consume_events_with_initial_offsets(stream_engine: Stream
tp2,
)

with patch("kstreams.test_utils.test_clients.TestConsumer.seek") as client_seek:
async with client:
await client.send(topic, value=event1, partition=0)
await client.send(topic, value=event1, partition=0)
await client.send(topic, value=event1, partition=0)
await client.send(topic, value=event2, partition=1)

async def func_stream(consumer: Stream):
async for cr in consumer:
process(cr.value)

stream: Stream = Stream(
topics=topic,
consumer_class=TestConsumer,
name="my-stream",
func=func_stream,
initial_offsets=[
# initial topic offset is -1
TopicPartitionOffset(topic=topic, partition=0, offset=1),
TopicPartitionOffset(topic=topic, partition=1, offset=0),
TopicPartitionOffset(topic=topic, partition=2, offset=10),
],
)
stream_engine.add_stream(stream)
await stream.start()

# simulate partitions assigned on rebalance
await stream.rebalance_listener.on_partitions_assigned(assigned=assignments)

assert stream.consumer.assignment() == [tp0, tp1, tp2]

assert stream.consumer.last_stable_offset(tp0) == 2
assert stream.consumer.highwater(tp0) == 3
assert await stream.consumer.position(tp0) == 3

assert stream.consumer.last_stable_offset(tp1) == 0
assert stream.consumer.highwater(tp1) == 1
assert await stream.consumer.position(tp1) == 1

# the position will be 0 as the offset 10 does not exist
assert stream.consumer.last_stable_offset(tp2) == -1
assert stream.consumer.highwater(tp2) == 0
assert await stream.consumer.position(tp2) == 0

client_seek.assert_has_calls(
[
call(partition=tp0, offset=1),
call(partition=tp1, offset=0),
call(partition=tp2, offset=10),
],
any_order=True,
)
async with client:
await client.send(topic, value=event1, partition=0)
await client.send(topic, value=event1, partition=0)
await client.send(topic, value=event1, partition=0)
await client.send(topic, value=event2, partition=1)

assert TopicManager.get(name=topic).size() == 4

async def func_stream(consumer: Stream):
async for cr in consumer:
process(cr.value)

stream: Stream = Stream(
topics=topic,
consumer_class=TestConsumer,
name="my-stream",
func=func_stream,
initial_offsets=[
# initial topic offset is -1
TopicPartitionOffset(topic=topic, partition=0, offset=1),
TopicPartitionOffset(topic=topic, partition=1, offset=0),
TopicPartitionOffset(topic=topic, partition=2, offset=10),
],
)
stream_engine.add_stream(stream)
await stream.start()

# simulate partitions assigned on rebalance
await stream.rebalance_listener.on_partitions_assigned(assigned=assignments)

assert stream.consumer.assignment() == [tp0, tp1, tp2]

assert stream.consumer.last_stable_offset(tp0) == 2
assert stream.consumer.highwater(tp0) == 3
assert await stream.consumer.position(tp0) == 3

assert stream.consumer.last_stable_offset(tp1) == 0
assert stream.consumer.highwater(tp1) == 1
assert await stream.consumer.position(tp1) == 1

# the position will be 0 as the offset 10 does not exist
assert stream.consumer.last_stable_offset(tp2) == -1
assert stream.consumer.highwater(tp2) == 0
assert await stream.consumer.position(tp2) == 0

# We moved to offset 1 on partition 0, then there are
# 3 events in the Queue rather than 4
assert TopicManager.get(name=topic).size() == 3

process.assert_has_calls([call(event1), call(event1), call(event2)], any_order=True)

0 comments on commit 45c4473

Please sign in to comment.