-
Notifications
You must be signed in to change notification settings - Fork 201
Open
Labels
enhancementNew feature or requestNew feature or requestquestionFurther information is requestedFurther information is requested
Description
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Try consuming a topic containing data in zstd compression and the below error trace is encountered.
Expected behavior
Faust should be able to consume zstd compression data from a topic
Actual behavior
Faust crashed with the error as below -
Full traceback
Traceback (most recent call last):
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/services.py", line 779, in _execute_task
await task
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 176, in _fetcher
await self._drainer
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
records = await self._getmany(
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
return await self.call_thread(
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 824, in _fetch_records
return await fetcher.fetched_records(
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 1084, in fetched_records
records = res_or_error.getall(max_records)
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 135, in getall
for msg in self._partition_records:
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 202, in __next__
return next(self._records_iterator)
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 243, in _unpack_records
for record in next_batch:
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/record/default_records.py", line 270, in __iter__
self._maybe_uncompress()
File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/record/default_records.py", line 187, in _maybe_uncompress
self._buffer = bytearray(uncompressed)
UnboundLocalError: local variable 'uncompressed' referenced before assignmentVersions
- Python version - 3.9.5
- Faust version - 1.10.4
- Operating system - ubuntu
- Kafka version - 2.6
- RocksDB version (if applicable)
Any plans to support this anytime soon?
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requestquestionFurther information is requestedFurther information is requested