Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/codspeed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: CodSpeed

on:
push:
branches:
- "main"
- "master"
pull_request:
workflow_dispatch:

permissions:
contents: read
id-token: write

jobs:
benchmarks:
name: Run benchmarks
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[dev]"

- name: Run benchmarks
uses: CodSpeedHQ/action@v4
with:
mode: simulation
run: pytest benchmarks/ --codspeed
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
<a href="https://codecov.io/gh/vadikko2/python-cqrs">
<img src="https://img.shields.io/codecov/c/github/vadikko2/python-cqrs?logo=codecov&logoColor=white" alt="Coverage">
</a>
<a href="https://codspeed.io/vadikko2/python-cqrs?utm_source=badge">
<img src="https://img.shields.io/endpoint?url=https://codspeed.io/badge.json" alt="CodSpeed">
</a>
<a href="https://mkdocs.python-cqrs.dev/">
<img src="https://img.shields.io/badge/docs-mkdocs-blue?logo=readthedocs" alt="Documentation">
</a>
Expand Down Expand Up @@ -928,16 +931,16 @@ async def process_files_stream(
mediator: cqrs.StreamingRequestMediator = fastapi.Depends(streaming_mediator_factory),
) -> fastapi.responses.StreamingResponse:
async def generate_sse():
yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n"
yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\\n\\n"

async for result in mediator.stream(command):
sse_data = {
"type": "progress",
"data": result.to_dict(),
}
yield f"data: {json.dumps(sse_data)}\n\n"
yield f"data: {json.dumps(sse_data)}\\n\\n"

yield f"data: {json.dumps({'type': 'complete'})}\n\n"
yield f"data: {json.dumps({'type': 'complete'})}\\n\\n"

return fastapi.responses.StreamingResponse(
generate_sse(),
Expand All @@ -950,8 +953,8 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap

## Protobuf messaging

The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\
Protocol buffers are Googles language-neutral, platform-neutral, extensible mechanism for serializing structured data –
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data –
think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use
special generated source code to easily write and read your structured data to and from a variety of data streams and
using a variety of languages.
Empty file added benchmarks/__init__.py
Empty file.
75 changes: 75 additions & 0 deletions benchmarks/test_benchmark_event_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Benchmarks for event handling performance."""

import dataclasses
import typing

import cqrs
import di
import pytest
from cqrs.events import bootstrap


@dataclasses.dataclass(frozen=True)
class UserJoinedEvent(cqrs.DCEvent):
user_id: str
meeting_id: str


class UserJoinedEventHandler(cqrs.EventHandler[UserJoinedEvent]):
def __init__(self):
self.processed_events: typing.List[UserJoinedEvent] = []

async def handle(self, event: UserJoinedEvent) -> None:
self.processed_events.append(event)


def events_mapper(mapper: cqrs.EventMap) -> None:
mapper.bind(UserJoinedEvent, UserJoinedEventHandler)


@pytest.fixture
def event_mediator():
return bootstrap.bootstrap(
di_container=di.Container(),
events_mapper=events_mapper,
)


@pytest.mark.benchmark
def test_benchmark_event_processing(benchmark, event_mediator):
"""Benchmark event processing performance."""
event = UserJoinedEvent(user_id="user_1", meeting_id="meeting_1")

async def run():
await event_mediator.send(event)

benchmark(lambda: run())


@pytest.mark.benchmark
def test_benchmark_multiple_events(benchmark, event_mediator):
"""Benchmark processing multiple events in sequence."""
events = [
UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") for i in range(10)
]

async def run():
for evt in events:
await event_mediator.send(evt)

benchmark(lambda: run())


@pytest.mark.benchmark
def test_benchmark_notification_event(benchmark):
"""Benchmark notification event creation and serialization."""

def run():
event = cqrs.NotificationEvent[UserJoinedEvent](
event_name="UserJoined",
topic="test_topic",
payload=UserJoinedEvent(user_id="user_1", meeting_id="meeting_1"),
)
return event.to_dict()

benchmark(run)
104 changes: 104 additions & 0 deletions benchmarks/test_benchmark_request_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Benchmarks for request handling performance."""

import dataclasses
import typing
from collections import defaultdict

import cqrs
import di
import pytest
from cqrs.requests import bootstrap

STORAGE = defaultdict[str, typing.List[str]](lambda: [])


@dataclasses.dataclass
class JoinMeetingCommand(cqrs.DCRequest):
user_id: str
meeting_id: str


@dataclasses.dataclass
class ReadMeetingQuery(cqrs.DCRequest):
meeting_id: str


@dataclasses.dataclass
class ReadMeetingQueryResult(cqrs.DCResponse):
users: list[str]


class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
@property
def events(self):
return []

async def handle(self, request: JoinMeetingCommand) -> None:
STORAGE[request.meeting_id].append(request.user_id)


class ReadMeetingQueryHandler(
cqrs.RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult],
):
@property
def events(self):
return []

async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
return ReadMeetingQueryResult(users=STORAGE[request.meeting_id])


def command_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)


def query_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(ReadMeetingQuery, ReadMeetingQueryHandler)


@pytest.fixture
def mediator():
return bootstrap.bootstrap(
di_container=di.Container(),
queries_mapper=query_mapper,
commands_mapper=command_mapper,
)


@pytest.mark.benchmark
def test_benchmark_command_handling(benchmark, mediator):
"""Benchmark command handling performance."""
command = JoinMeetingCommand(user_id="user_1", meeting_id="meeting_1")

async def run():
await mediator.send(command)

benchmark(lambda: run())


@pytest.mark.benchmark
def test_benchmark_query_handling(benchmark, mediator):
"""Benchmark query handling performance."""
# Setup: Add some data first
STORAGE["meeting_1"] = ["user_1", "user_2", "user_3"]
query = ReadMeetingQuery(meeting_id="meeting_1")

async def run():
return await mediator.send(query)

benchmark(lambda: run())


@pytest.mark.benchmark
def test_benchmark_multiple_commands(benchmark, mediator):
"""Benchmark handling multiple commands in sequence."""
commands = [
JoinMeetingCommand(user_id=f"user_{i}", meeting_id="meeting_2")
for i in range(10)
]

async def run():
for cmd in commands:
await mediator.send(cmd)

benchmark(lambda: run())
87 changes: 87 additions & 0 deletions benchmarks/test_benchmark_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Benchmarks for serialization and deserialization performance."""

import dataclasses

import cqrs
import pytest


@dataclasses.dataclass
class SampleRequest(cqrs.DCRequest):
field1: str
field2: int
field3: list[str]
field4: dict[str, int]


@dataclasses.dataclass
class SampleResponse(cqrs.DCResponse):
result: str
data: dict[str, str]


@pytest.mark.benchmark
def test_benchmark_request_to_dict(benchmark):
"""Benchmark request serialization to dictionary."""
request = SampleRequest(
field1="test_value",
field2=42,
field3=["a", "b", "c"],
field4={"key1": 1, "key2": 2},
)

benchmark(lambda: request.to_dict())


@pytest.mark.benchmark
def test_benchmark_request_from_dict(benchmark):
"""Benchmark request deserialization from dictionary."""
data = {
"field1": "test_value",
"field2": 42,
"field3": ["a", "b", "c"],
"field4": {"key1": 1, "key2": 2},
}

benchmark(lambda: SampleRequest.from_dict(**data))


@pytest.mark.benchmark
def test_benchmark_response_to_dict(benchmark):
"""Benchmark response serialization to dictionary."""
response = SampleResponse(
result="success",
data={"key1": "value1", "key2": "value2"},
)

benchmark(lambda: response.to_dict())


@pytest.mark.benchmark
def test_benchmark_response_from_dict(benchmark):
"""Benchmark response deserialization from dictionary."""
data = {
"result": "success",
"data": {"key1": "value1", "key2": "value2"},
}

benchmark(lambda: SampleResponse.from_dict(**data))


@pytest.mark.benchmark
def test_benchmark_complex_nested_structure(benchmark):
"""Benchmark serialization of complex nested structures."""

class NestedRequest(cqrs.Request):
level1: dict[str, list[dict[str, str]]]
level2: list[dict[str, int]]

request = NestedRequest(
level1={
"group1": [{"name": "item1", "value": "val1"}] * 5,
"group2": [{"name": "item2", "value": "val2"}] * 5,
},
level2=[{"counter": i} for i in range(10)],
)

benchmark(lambda: request.to_dict())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dev = [
"ruff==0.6.2",
"vermin>=1.6.0",
"pytest-cov>=4.0.0",
"pytest-codspeed==4.2.0",
# Tests
"aio-pika==9.3.0", # from rabbit
"aiokafka==0.10.0", # from kafka
Expand Down