Skip to content

Commit e41cf99

Browse files
committed
feat: minimal working mvp
1 parent d7e5831 commit e41cf99

7 files changed

Lines changed: 162 additions & 1 deletion

File tree

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,37 @@
11
# dam
2+
23
Python Stream Processing toolkit
4+
5+
:warning: WARNING :warning:
6+
7+
_This repo is highly experimental and it's not recommended for production use._
8+
9+
_The interface may change at any time during initial development._
10+
11+
```python
12+
import asyncio
13+
from dam import App, Topic
14+
15+
loop = asyncio.get_event_loop()
16+
17+
async def hello_world(stream):
18+
async for value in stream:
19+
print(value.value)
20+
21+
22+
topics = [
23+
Topic("test-topic-worker", hello_world)
24+
]
25+
26+
streamapp = App(topics=topics)
27+
28+
while True:
29+
# This should be any other worker
30+
loop.run_until_complete(streamapp.run())
31+
```
32+
33+
## Usage
34+
35+
1. Clone repository
36+
2. Run `poetry install`
37+
3. Write apps inside `example` folder (for now)

dam/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
1+
from dam.application import App
2+
from dam.topic import Topic
3+
4+
__all__ = ["App", "Topic"]
5+
6+
17
__version__ = "0.1.0"

dam/application.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import asyncio
2+
from typing import List, Optional
3+
4+
5+
class App:
6+
def __init__(
7+
self,
8+
topics: Optional[List] = None,
9+
loop: Optional[asyncio.AbstractEventLoop] = None,
10+
):
11+
if loop is None:
12+
loop = asyncio.get_event_loop()
13+
self.loop = loop
14+
self.topics = topics
15+
16+
async def run(self):
17+
for topic in self.topics:
18+
await topic.run()

dam/stream.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
class Stream:
2+
def __init__(self, stream):
3+
self.stream = stream
4+
5+
async def __aiter__(self):
6+
async for item in self.stream:
7+
yield item
8+
9+
async def __anext__(self):
10+
raise StopAsyncIteration

dam/topic.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import asyncio
2+
from typing import Callable
3+
4+
from aiokafka import AIOKafkaConsumer
5+
6+
from dam.stream import Stream
7+
8+
9+
class Topic:
10+
"""Kafka topic"""
11+
12+
def __init__(self, name: str, func: Callable):
13+
self.name = name
14+
self.func = func
15+
16+
def create_consumer(
17+
self, name: str, loop: asyncio.AbstractEventLoop = None
18+
) -> AIOKafkaConsumer:
19+
if loop is None:
20+
loop = asyncio.get_event_loop()
21+
22+
return AIOKafkaConsumer(
23+
name,
24+
loop=loop,
25+
bootstrap_servers="0.0.0.0:29092",
26+
group_id="test-consumer-group",
27+
auto_offset_reset="earliest",
28+
)
29+
30+
async def run(self):
31+
consumer = self.create_consumer(self.name)
32+
await consumer.start()
33+
34+
try:
35+
# Consume messages
36+
await self.func(Stream(consumer))
37+
finally:
38+
# Will leave consumer group; perform autocommit if enabled.
39+
await consumer.stop()
40+
41+
42+
# async def consume_from_kafka(loop):
43+
# logger.debug("Task Consuming from kafka initiated...")
44+
# consumer = AIOKafkaConsumer(
45+
# TOPIC,
46+
# loop=loop,
47+
# bootstrap_servers=BOOTSTRAP_SERVERS,
48+
# group_id=GROUP_ID,
49+
# auto_offset_reset=AUTO_OFFSET_RESET,
50+
# )
51+
52+
# # Get cluster layout and join group `my-group`await consumer.start()
53+
# await consumer.start()
54+
55+
# try:
56+
# # Consume messages
57+
# async for msg in consumer:
58+
# print(f"Task Consuming from kafka")
59+
# print(
60+
# f"Topic: {msg.topic}, Partition: {msg.partition}, Offset: {msg.offset},"
61+
# f" Key: {msg.key}, Value: {msg.value}, Timestamp: {msg.timestamp}\n"
62+
# )
63+
# finally:
64+
# # Will leave consumer group; perform autocommit if enabled.
65+
# await consumer.stop()

example/simple_consumer.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
from dam import App, Topic
3+
4+
loop = asyncio.get_event_loop()
5+
6+
7+
async def hello_world(stream):
8+
async for value in stream:
9+
print(value.value)
10+
11+
12+
async def hello_world2(stream):
13+
async for value in stream:
14+
print("other topic", value.value)
15+
16+
17+
topics = [
18+
Topic("test-topic-worker", hello_world),
19+
Topic("fifo", hello_world2)
20+
]
21+
22+
streamapp = App(topics=topics)
23+
24+
while True:
25+
# This should be any other worker
26+
loop.run_until_complete(streamapp.run())

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[tool.poetry]
22
name = "dam"
33
version = "0.1.0"
4-
description = "Python Stream Processing toolkit"
4+
description = "Python Stream Processing minimalistic toolkit"
55
authors = [
66
"Marcos Schroh <schrohm@gmail.com>",
77
"Santiago Fraire Willemoës <santiwilly@gmail.com>"
@@ -11,6 +11,7 @@ keywords = ["stream", "processing", "streaming", "async"]
1111

1212
[tool.poetry.dependencies]
1313
python = "^3.7"
14+
aiokafka = "^0.5.2"
1415

1516
[tool.poetry.dev-dependencies]
1617
pytest = "^5.3.5"

0 commit comments

Comments
 (0)