11import asyncio
2- from typing import Callable
2+ from typing import Callable , Dict
33
4- from aiokafka import AIOKafkaConsumer
4+ from aiokafka import AIOKafkaConsumer , errors
55
66from dam .stream import Stream
77
88
99class Topic :
1010 """Kafka topic"""
1111
12- def __init__ (self , name : str , func : Callable ):
12+ def __init__ (self , name : str , func : Callable , ** kwargs ):
1313 self .name = name
1414 self .func = func
15+ self .consumer_kwargs : Dict = kwargs
16+
17+ def _configure (self , conf : Dict ):
18+ self .conf = conf
1519
1620 def create_consumer (
1721 self , name : str , loop : asyncio .AbstractEventLoop = None
@@ -22,13 +26,14 @@ def create_consumer(
2226 return AIOKafkaConsumer (
2327 name ,
2428 loop = loop ,
25- bootstrap_servers = "0.0.0.0:29092" ,
26- group_id = "test-consumer-group" ,
2729 auto_offset_reset = "earliest" ,
30+ ** self .conf ,
31+ ** self .consumer_kwargs
2832 )
2933
3034 async def run (self ):
3135 consumer = self .create_consumer (self .name )
36+
3237 await consumer .start ()
3338
3439 try :
@@ -37,29 +42,3 @@ async def run(self):
3742 finally :
3843 # Will leave consumer group; perform autocommit if enabled.
3944 await consumer .stop ()
40-
41-
42- # async def consume_from_kafka(loop):
43- # logger.debug("Task Consuming from kafka initiated...")
44- # consumer = AIOKafkaConsumer(
45- # TOPIC,
46- # loop=loop,
47- # bootstrap_servers=BOOTSTRAP_SERVERS,
48- # group_id=GROUP_ID,
49- # auto_offset_reset=AUTO_OFFSET_RESET,
50- # )
51-
52- # # Get cluster layout and join group `my-group`await consumer.start()
53- # await consumer.start()
54-
55- # try:
56- # # Consume messages
57- # async for msg in consumer:
58- # print(f"Task Consuming from kafka")
59- # print(
60- # f"Topic: {msg.topic}, Partition: {msg.partition}, Offset: {msg.offset},"
61- # f" Key: {msg.key}, Value: {msg.value}, Timestamp: {msg.timestamp}\n"
62- # )
63- # finally:
64- # # Will leave consumer group; perform autocommit if enabled.
65- # await consumer.stop()
0 commit comments