Skip to content
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "asyncapi-python"
version = "0.3.0rc3"
version = "0.3.0rc4"
license = { text = "Apache-2.0" }
description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications."
authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }]
Expand Down
48 changes: 33 additions & 15 deletions src/asyncapi_python_codegen/templates/application.py.j2
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Generated AsyncAPI application."""

from __future__ import annotations

from typing import Any
Expand All @@ -7,52 +8,69 @@ from asyncapi_python.kernel.wire import AbstractWireFactory
from asyncapi_python.kernel.codec import CodecFactory
from asyncapi_python.contrib.codec.registry import CodecRegistry
from asyncapi_python.kernel.endpoint import AbstractEndpoint
from asyncapi_python.kernel.endpoint.abc import EndpointParams

from .router import ProducerRouter, ConsumerRouter
import sys


class Application(BaseApplication):
"""{{ app_title }} - {{ app_description }}

AsyncAPI Version: {{ asyncapi_version }}
Application Version: {{ app_version }}
"""

def __init__(self, wire_factory: AbstractWireFactory[Any, Any]):

def __init__(
self,
wire_factory: AbstractWireFactory[Any, Any],
*,
endpoint_params: EndpointParams | None = None,
):
"""Initialize the AsyncAPI application.

Args:
wire_factory: Wire protocol factory for message transport
endpoint_params: Optional endpoint configuration (service_name, default_rpc_timeout, etc.)
"""
# Use CodecRegistry with current module for message serialization
current_module = sys.modules[self.__module__.rsplit('.', 1)[0]]
current_module = sys.modules[self.__module__.rsplit(".", 1)[0]]
codec_factory = CodecRegistry(current_module)

super().__init__(wire_factory=wire_factory, codec_factory=codec_factory)


# Pass endpoint_params to BaseApplication if provided
if endpoint_params is not None:
super().__init__(
wire_factory=wire_factory,
codec_factory=codec_factory,
endpoint_params=endpoint_params,
)
else:
super().__init__(wire_factory=wire_factory, codec_factory=codec_factory)

# Initialize semantic routers with factories
self.producer = ProducerRouter(wire_factory, codec_factory)
self.consumer = ConsumerRouter(wire_factory, codec_factory)

# Register all endpoints from routers
self._register_router_endpoints(self.producer)
self._register_router_endpoints(self.consumer)

def _register_router_endpoints(self, router: object) -> None:
"""Recursively register all endpoints from router tree.

Args:
router: Router object to scan for endpoints
"""
if isinstance(router, AbstractEndpoint):
# This router is an endpoint - register it directly
self._add_endpoint(router)
elif hasattr(router, '__dict__'):
elif hasattr(router, "__dict__"):
# This router aggregates others - recurse through attributes
for attr_name in dir(router):
if not attr_name.startswith('_'):
if not attr_name.startswith("_"):
attr = getattr(router, attr_name, None)
# Check if it's a router-like object (has __dict__ or is an endpoint)
if attr is not None and (isinstance(attr, AbstractEndpoint) or hasattr(attr, '__dict__')):
self._register_router_endpoints(attr)
if attr is not None and (
isinstance(attr, AbstractEndpoint) or hasattr(attr, "__dict__")
):
self._register_router_endpoints(attr)
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading