Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need zstd compression support while consuming/producing #181

Open
2 tasks
akgoel-mo opened this issue Aug 18, 2021 · 8 comments
Open
2 tasks

Need zstd compression support while consuming/producing #181

akgoel-mo opened this issue Aug 18, 2021 · 8 comments
Labels
enhancement New feature or request question Further information is requested

Comments

@akgoel-mo
Copy link

akgoel-mo commented Aug 18, 2021

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Try consuming a topic containing data in zstd compression and the below error trace is encountered.

Expected behavior

Faust should be able to consume zstd compression data from a topic

Actual behavior

Faust crashed with the error as below -

Full traceback

Traceback (most recent call last):
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
    records = await self._getmany(
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
    return await self.call_thread(
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 824, in _fetch_records
    return await fetcher.fetched_records(
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 1084, in fetched_records
    records = res_or_error.getall(max_records)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 135, in getall
    for msg in self._partition_records:
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 202, in __next__
    return next(self._records_iterator)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 243, in _unpack_records
    for record in next_batch:
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/record/default_records.py", line 270, in __iter__
    self._maybe_uncompress()
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/record/default_records.py", line 187, in _maybe_uncompress
    self._buffer = bytearray(uncompressed)
UnboundLocalError: local variable 'uncompressed' referenced before assignment

Versions

  • Python version - 3.9.5
  • Faust version - 1.10.4
  • Operating system - ubuntu
  • Kafka version - 2.6
  • RocksDB version (if applicable)

Any plans to support this anytime soon?

@wbarnha wbarnha added enhancement New feature or request question Further information is requested labels Sep 2, 2022
@theultimate1
Copy link

Is this issue being worked on?

@wbarnha
Copy link
Member

wbarnha commented Oct 13, 2022

I definitely would like to work on supporting ztd compression, but I'm not sure how high the demand is because I'm triaging other ongoing issues. If the demand is there, I'll take a crack at it.

@theultimate1
Copy link

oh sweet, cuz I was thinking about adding the support.

@wbarnha
Copy link
Member

wbarnha commented Oct 13, 2022

If you do add support, I'd be grateful for your help. Let me know if you need help.

@theultimate1
Copy link

awesome thanks

@theultimate1
Copy link

aiokafka has an issue with zstd compression on the broker level.
I will probably start there and try to fix that and come back here to integrate the methods.

@theultimate1
Copy link

so just wanted to give you an update: zstd support does exist at a producer level with the master branch of aiokafka. They just dont have it on their public release, however the issue I mentioned above still persists ie broker level zstd setting does not work. This is due to the kafka broker not letting zstd-compressed message consumption without the fetch request version being at least 10 as per this KPI. Problem is the fetch request v10 requires a support for fetch sessions and this has to be done on the aiokafka (by extension kafka-python side)

@wbarnha
Copy link
Member

wbarnha commented Mar 20, 2023

I saw your notes in confluentinc/confluent-kafka-python#858, hopefully a fix for this will get bundled into #418 or a related PR when I have time, someday.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants