Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support data limit when reading a batch with TopicReaderSync #431

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.11.5 ##
* Added support for max_messages and max_bytes parameters, when reading a batch through a sync topic reader

## 3.11.4 ##
* Added missing returns to time converters for topic options

Expand Down
55 changes: 54 additions & 1 deletion ydb/_topic_reader/topic_reader_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,57 @@ def async_wait_message(self) -> concurrent.futures.Future:

return self._caller.unsafe_call_with_future(self._async_reader.wait_message())

def _make_batch_slice(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPORTANT

After applying the function, a caller will lose messages that have been trimmed from the batch and will not see these messages in the read session. A server does not allow to skip messages during commit. This can cause problems:

  1. If the caller commits messages with ack, the software will hang up forever (because the server will wait for skipped messages before ack the commit).
  2. If the caller commits messages without ack. After reconnecting all messages after the last successfully commit (first batch with cut messages) will be re-read. A log of extra work is required to re-read these messages and real progress will be very slow.
  3. If the progress is saved on the user's side and messages are not committed to the SDK, the will be lost and cannot be recovered.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see, though I'm not quite sure I fully understand the path to a solution.
What was the expected approach to take?

self,
batch: Union[PublicBatch, None],
max_messages: typing.Union[int, None] = None,
max_bytes: typing.Union[int, None] = None,
) -> Union[PublicBatch, None]:
all_amount = float("inf")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need all_amount as float const?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale is that by default we have no limits on a data flow.
I'm not sure if UInt64 max value is sufficient enough, therefore, I chose infinity, which happens to be a float.


# A non-empty batch must stay non-empty regardless of the max messages value
if max_messages is not None:
max_messages = max(max_messages, 1)
else:
max_messages = all_amount

if max_bytes is not None:
max_bytes = max(max_bytes, 1)
else:
max_bytes = all_amount

is_batch_set = batch is not None
is_msg_limit_set = max_messages < all_amount
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need all_amount instead check max_messages is not None?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because max_messages is being set to all_amount (up above) in case if it hasn't been provided (e.g. it's None).

is_bytes_limit_set = max_bytes < all_amount
is_limit_set = is_msg_limit_set or is_bytes_limit_set
is_slice_required = is_batch_set and is_limit_set

if not is_slice_required:
return batch

sliced_messages = []
bytes_taken = 0

for batch_message in batch.messages:
sliced_messages.append(batch_message)
bytes_taken += len(batch_message.data)

is_enough_messages = len(sliced_messages) >= max_messages
is_enough_bytes = bytes_taken >= max_bytes
is_stop_required = is_enough_messages or is_enough_bytes

if is_stop_required:
break

sliced_batch = PublicBatch(
messages=sliced_messages,
_partition_session=batch._partition_session,
_bytes_size=bytes_taken,
_codec=batch._codec,
)

return sliced_batch

def receive_batch(
self,
*,
Expand All @@ -102,11 +153,13 @@ def receive_batch(
"""
self._check_closed()

return self._caller.safe_call_with_result(
maybe_batch: Union[PublicBatch, None] = self._caller.safe_call_with_result(
self._async_reader.receive_batch(),
timeout,
)

return self._make_batch_slice(maybe_batch, max_messages, max_bytes)

def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
"""
Put commit message to internal buffer.
Expand Down
Loading