Skip to content

Need zstd compression support while consuming/producing #181

@akgoel-mo

Description

@akgoel-mo

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Try consuming a topic containing data in zstd compression and the below error trace is encountered.

Expected behavior

Faust should be able to consume zstd compression data from a topic

Actual behavior

Faust crashed with the error as below -

Full traceback

Traceback (most recent call last):
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
    records = await self._getmany(
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
    return await self.call_thread(
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 824, in _fetch_records
    return await fetcher.fetched_records(
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 1084, in fetched_records
    records = res_or_error.getall(max_records)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 135, in getall
    for msg in self._partition_records:
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 202, in __next__
    return next(self._records_iterator)
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 243, in _unpack_records
    for record in next_batch:
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/record/default_records.py", line 270, in __iter__
    self._maybe_uncompress()
  File "/Users/nikhil/Downloads/consumer_poc/lib/python3.9/site-packages/aiokafka/record/default_records.py", line 187, in _maybe_uncompress
    self._buffer = bytearray(uncompressed)
UnboundLocalError: local variable 'uncompressed' referenced before assignment

Versions

  • Python version - 3.9.5
  • Faust version - 1.10.4
  • Operating system - ubuntu
  • Kafka version - 2.6
  • RocksDB version (if applicable)

Any plans to support this anytime soon?

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestquestionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions