Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/asyncapi_python/contrib/codec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@

from .registry import CodecRegistry


__all__ = ["CodecRegistry"]
2 changes: 1 addition & 1 deletion src/asyncapi_python/contrib/codec/json.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from typing import Type, ClassVar
from types import ModuleType
from typing import ClassVar, Type

from pydantic import BaseModel, ValidationError

Expand Down
6 changes: 4 additions & 2 deletions src/asyncapi_python/contrib/codec/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import ClassVar, Any
from types import ModuleType
from asyncapi_python.kernel.codec import CodecFactory, Codec
from typing import Any, ClassVar

from asyncapi_python.kernel.codec import Codec, CodecFactory
from asyncapi_python.kernel.document.message import Message

from .json import JsonCodecFactory


Expand Down
4 changes: 2 additions & 2 deletions src/asyncapi_python/contrib/wire/amqp/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
try:
from aio_pika import ExchangeType # type: ignore[import-not-found]
from aio_pika.abc import ( # type: ignore[import-not-found]
AbstractConnection,
AbstractChannel,
AbstractQueue,
AbstractConnection,
AbstractExchange,
AbstractQueue,
)
except ImportError as e:
raise ImportError(
Expand Down
28 changes: 18 additions & 10 deletions src/asyncapi_python/contrib/wire/amqp/factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""AMQP wire factory implementation"""

import secrets
from typing import Optional, Callable, Any, cast
from typing import Any, Callable, Optional, cast

from typing_extensions import Unpack

try:
Expand All @@ -13,11 +14,11 @@
) from e

from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams
from asyncapi_python.kernel.wire.typing import Producer, Consumer
from asyncapi_python.kernel.wire.typing import Consumer, Producer

from .message import AmqpWireMessage, AmqpIncomingMessage
from .producer import AmqpProducer
from .consumer import AmqpConsumer
from .message import AmqpIncomingMessage, AmqpWireMessage
from .producer import AmqpProducer
from .resolver import resolve_amqp_config


Expand All @@ -31,7 +32,6 @@ class AmqpWire(AbstractWireFactory[AmqpWireMessage, AmqpIncomingMessage]):
def __init__(
self,
connection_url: str,
service_name: str = "app",
robust: bool = False,
reconnect_interval: float = 1.0,
max_reconnect_interval: float = 60.0,
Expand All @@ -45,7 +45,6 @@ def __init__(

Args:
connection_url: AMQP connection URL
service_name: Service name prefix for app_id
robust: Enable robust connection with auto-reconnect (default: False)
reconnect_interval: Initial reconnect interval in seconds (for robust mode)
max_reconnect_interval: Maximum reconnect interval in seconds (for robust mode)
Expand All @@ -55,9 +54,10 @@ def __init__(
on_connection_lost: Callback when connection is lost (for non-robust mode)
"""
self._connection_url = connection_url
# Generate app_id with service name plus 8 random hex characters
# Generate fallback app_id with random hex characters
# Note: For RPC, app_id should be provided via EndpointParams from application level
random_hex = secrets.token_hex(4) # 4 bytes = 8 hex chars
self._app_id = f"{service_name}-{random_hex}"
self._app_id = f"wire-{random_hex}"
self._connection: AbstractConnection | None = None
self._robust = robust
self._reconnect_interval = reconnect_interval
Expand Down Expand Up @@ -135,8 +135,12 @@ async def create_consumer(
# Generate operation name from available information
operation_name = self._generate_operation_name(kwargs)

# Use provided app_id if available, otherwise use instance app_id
# This allows application-level control over queue naming
app_id = kwargs.get("app_id", self._app_id)

# Resolve AMQP configuration using pattern matching
config = resolve_amqp_config(kwargs, operation_name, self._app_id)
config = resolve_amqp_config(kwargs, operation_name, app_id)

connection = await self._get_connection()

Expand All @@ -154,8 +158,12 @@ async def create_producer(
# Generate operation name from available information
operation_name = self._generate_operation_name(kwargs)

# Use provided app_id if available, otherwise use instance app_id
# This allows application-level control over queue naming
app_id = kwargs.get("app_id", self._app_id)

# Resolve AMQP configuration using pattern matching
config = resolve_amqp_config(kwargs, operation_name, self._app_id)
config = resolve_amqp_config(kwargs, operation_name, app_id)

connection = await self._get_connection()

Expand Down
40 changes: 34 additions & 6 deletions src/asyncapi_python/contrib/wire/amqp/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from typing import Any

try:
from aio_pika import Message as AmqpMessage, ExchangeType # type: ignore[import-not-found]
from aio_pika import ExchangeType
from aio_pika import Message as AmqpMessage # type: ignore[import-not-found]
from aio_pika.abc import ( # type: ignore[import-not-found]
AbstractConnection,
AbstractChannel,
AbstractConnection,
AbstractExchange,
)
except ImportError as e:
Expand Down Expand Up @@ -100,11 +101,38 @@ async def stop(self) -> None:

self._started = False

async def send_batch(self, messages: list[AmqpWireMessage]) -> None:
"""Send a batch of messages using the configured exchange"""
async def send_batch(
self, messages: list[AmqpWireMessage], *, address_override: str | None = None
) -> None:
"""Send a batch of messages using the configured exchange

Args:
messages: Messages to send
address_override: Optional dynamic routing key/queue to override static config.
If provided, overrides self._routing_key for this send operation.
If None, uses static routing_key from configuration/bindings.
"""
if not self._started or not self._channel or not self._target_exchange:
raise RuntimeError("Producer not started")

# Determine effective routing key: override takes precedence over static config
effective_routing_key = (
address_override if address_override is not None else self._routing_key
)

# Validate we have a destination
# Fail ONLY if both are truly missing:
# - address_override is None (not provided by caller)
# - AND self._routing_key is "" (no static config was derived from channel/bindings/operation)
# Note: empty string IS valid when explicitly configured (fanout exchanges, default exchange)
if address_override is None and not self._routing_key:
raise ValueError(
f"Cannot send: no routing destination available. "
f"RPC replies require reply_to from the request, or the channel must "
f"have address/bindings/operation-name to derive destination. "
f"(address_override={address_override}, routing_key={self._routing_key!r})"
)

for message in messages:
amqp_message = AmqpMessage(
body=message.payload,
Expand All @@ -113,8 +141,8 @@ async def send_batch(self, messages: list[AmqpWireMessage]) -> None:
reply_to=message.reply_to,
)

# Publish to the configured target exchange (not always default)
# Publish to the configured target exchange with dynamic or static routing key
await self._target_exchange.publish(
amqp_message,
routing_key=self._routing_key,
routing_key=effective_routing_key,
)
42 changes: 29 additions & 13 deletions src/asyncapi_python/contrib/wire/amqp/resolver.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Binding resolution with comprehensive pattern matching"""

from typing import Any
from asyncapi_python.kernel.wire import EndpointParams
from asyncapi_python.kernel.document.channel import Channel

from asyncapi_python.kernel.document.bindings import AmqpChannelBinding
from asyncapi_python.kernel.document.channel import Channel
from asyncapi_python.kernel.wire import EndpointParams

from .config import AmqpConfig, AmqpBindingType
from .utils import validate_parameters_strict, substitute_parameters
from .config import AmqpBindingType, AmqpConfig
from .utils import substitute_parameters, validate_parameters_strict


def resolve_amqp_config(
Expand Down Expand Up @@ -57,17 +58,32 @@ def resolve_amqp_config(
},
)

# Reply channel with explicit address - shared channel with filtering
# Reply channel with explicit address - check if direct queue or topic exchange
case (True, _, address, _) if address:
resolved_address = substitute_parameters(address, param_values)
return AmqpConfig(
queue_name=f"reply-{app_id}", # App-specific reply queue
exchange_name=resolved_address, # Shared exchange for replies
exchange_type="topic", # Enable pattern matching for filtering
routing_key=app_id, # Filter messages by app_id
binding_type=AmqpBindingType.REPLY,
queue_properties={"durable": True, "exclusive": False},
)
# If address starts with "reply-", treat it as a direct queue name (RPC pattern)
if resolved_address.startswith("reply-"):
return AmqpConfig(
queue_name=resolved_address, # Use address as queue name
exchange_name="", # Default exchange for direct routing
routing_key=resolved_address, # Route directly to queue
binding_type=AmqpBindingType.REPLY,
queue_properties={
"durable": False,
"exclusive": True,
"auto_delete": True,
},
)
else:
# Topic-based reply pattern - shared exchange with filtering
return AmqpConfig(
queue_name=f"reply-{app_id}", # App-specific reply queue
exchange_name=resolved_address, # Shared exchange for replies
exchange_type="topic", # Enable pattern matching for filtering
routing_key=app_id, # Filter messages by app_id
binding_type=AmqpBindingType.REPLY,
queue_properties={"durable": True, "exclusive": False},
)

# Reply channel with binding - defer to binding resolution
case (True, binding, _, _) if binding and binding.type == "queue":
Expand Down
1 change: 1 addition & 0 deletions src/asyncapi_python/contrib/wire/amqp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# TODO: This thing should be general wire utils, not tied to specific wire

import re

from asyncapi_python.kernel.document.channel import Channel


Expand Down
30 changes: 26 additions & 4 deletions src/asyncapi_python/contrib/wire/in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator

from typing_extensions import Unpack

from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams
from asyncapi_python.kernel.wire.typing import Producer, Consumer
from asyncapi_python.kernel.wire.typing import Consumer, Producer


@dataclass
Expand Down Expand Up @@ -142,13 +143,34 @@ async def stop(self) -> None:
"""Stop the producer"""
self._started = False

async def send_batch(self, messages: list[InMemoryMessage]) -> None:
"""Send a batch of messages to the channel"""
async def send_batch(
self, messages: list[InMemoryMessage], *, address_override: str | None = None
) -> None:
"""Send a batch of messages to the channel

Args:
messages: Messages to send
address_override: Optional dynamic channel name to override static config.
If provided, overrides self._channel_name for this send operation.
If None, uses static channel_name from configuration.
"""
if not self._started:
raise RuntimeError("Producer not started")

# Determine effective channel: override takes precedence over static config
effective_channel = (
address_override if address_override is not None else self._channel_name
)

# Validate we have a destination
if not effective_channel:
raise ValueError(
f"Cannot send: no channel specified. "
f"address_override={address_override}, channel_name={self._channel_name}"
)

for message in messages:
await _bus.publish(self._channel_name, message)
await _bus.publish(effective_channel, message)


class InMemoryConsumer(Consumer[InMemoryIncomingMessage]):
Expand Down
8 changes: 5 additions & 3 deletions src/asyncapi_python/kernel/application.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import asyncio
from typing import TypedDict, Any
from typing_extensions import Unpack, Required, NotRequired
from typing import Any, TypedDict

from typing_extensions import NotRequired, Required, Unpack

from asyncapi_python.kernel.document.operation import Operation
from asyncapi_python.kernel.wire import AbstractWireFactory

from .codec import CodecFactory
from .endpoint import AbstractEndpoint, EndpointFactory
from .endpoint.abc import EndpointParams
from .codec import CodecFactory


class BaseApplication:
Expand Down
1 change: 1 addition & 0 deletions src/asyncapi_python/kernel/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Generic, Protocol

from asyncapi_python.kernel.document.message import Message

from .typing import T_DecodedPayload, T_EncodedPayload


Expand Down
14 changes: 7 additions & 7 deletions src/asyncapi_python/kernel/document/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from .bindings import (
AmqpChannelBinding,
AmqpExchange,
AmqpExchangeType,
AmqpOperationBinding,
AmqpQueue,
)
from .channel import AddressParameter, Channel, ChannelBindings
from .common import ExternalDocs, Server, Tag
from .message import (
Expand All @@ -15,13 +22,6 @@
OperationTrait,
SecurityScheme,
)
from .bindings import (
AmqpChannelBinding,
AmqpOperationBinding,
AmqpExchange,
AmqpQueue,
AmqpExchangeType,
)

__all__ = [
# channel
Expand Down
2 changes: 1 addition & 1 deletion src/asyncapi_python/kernel/document/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, Literal, Optional
from enum import Enum
from typing import Any, Dict, Literal, Optional


class AmqpExchangeType(str, Enum):
Expand Down
5 changes: 3 additions & 2 deletions src/asyncapi_python/kernel/document/channel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dataclasses import dataclass
from typing import Any
from .message import Message
from .common import *

from .bindings import AmqpChannelBinding
from .common import *
from .message import Message

__all__ = ["AddressParameter", "ChannelBindings", "Channel"]

Expand Down
4 changes: 3 additions & 1 deletion src/asyncapi_python/kernel/document/message.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any
from .common import *

from .bindings import AmqpMessageBinding
from .common import *

__all__ = [
"CorrelationId",
Expand Down
Loading
Loading