Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support data limit when reading a batch with TopicReaderSync #431

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

JasonRammoray
Copy link

@JasonRammoray JasonRammoray commented Jun 9, 2024

Allow a client to control the amount of data it receives, when reading a batch through TopicReaderSync.

Pull request type

Please check the type of change your PR introduces:

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • Documentation content changes
  • Other (please describe):

What is the current behavior?

TopicReaderSync.receive_batch ignores max_messages and max_bytes parameters, which means a client has no control over the amount of received data.

Issue Number: 365

What is the new behavior?

TopicReaderSync.receive_batch now takes max_messages and max_bytes into account.

Other information

Decisions made:

  1. Enforce at least one message and at least a single byte on a batch.
  2. Since a batch could theoretically be empty, don't use _commit_get_partition_session, and rather copy _partition_session from the batch to a new (sliced) batch.
  3. Make no slice in case if neither max_messages, nor max_bytes were provided.

@JasonRammoray JasonRammoray changed the title Support max_{messages,bytes} parameters, when reading a batch through… Support data limit when reading a batch with TopicReaderSync Jun 9, 2024
max_messages: typing.Union[int, None] = None,
max_bytes: typing.Union[int, None] = None,
) -> Union[PublicBatch, None]:
all_amount = float("inf")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need all_amount as float const?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale is that by default we have no limits on a data flow.
I'm not sure if UInt64 max value is sufficient enough, therefore, I chose infinity, which happens to be a float.

max_bytes = all_amount

is_batch_set = batch is not None
is_msg_limit_set = max_messages < all_amount
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need all_amount instead check max_messages is not None?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because max_messages is being set to all_amount (up above) in case if it hasn't been provided (e.g. it's None).

@@ -86,6 +86,57 @@ def async_wait_message(self) -> concurrent.futures.Future:

return self._caller.unsafe_call_with_future(self._async_reader.wait_message())

def _make_batch_slice(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPORTANT

After applying the function, a caller will lose messages that have been trimmed from the batch and will not see these messages in the read session. A server does not allow to skip messages during commit. This can cause problems:

  1. If the caller commits messages with ack, the software will hang up forever (because the server will wait for skipped messages before ack the commit).
  2. If the caller commits messages without ack. After reconnecting all messages after the last successfully commit (first batch with cut messages) will be re-read. A log of extra work is required to re-read these messages and real progress will be very slow.
  3. If the progress is saved on the user's side and messages are not committed to the SDK, the will be lost and cannot be recovered.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see, though I'm not quite sure I fully understand the path to a solution.
What was the expected approach to take?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants