Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/asyncapi_python/contrib/wire/amqp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class AmqpConfig:
binding_type: AmqpBindingType = AmqpBindingType.QUEUE
queue_properties: dict[str, Any] = field(default_factory=lambda: {})
binding_arguments: dict[str, Any] = field(default_factory=lambda: {})
arguments: dict[str, Any] = field(default_factory=lambda: {})

def to_producer_args(self) -> dict[str, Any]:
"""Convert to AmqpProducer constructor arguments"""
Expand All @@ -34,6 +35,7 @@ def to_producer_args(self) -> dict[str, Any]:
"exchange_type": self.exchange_type,
"routing_key": self.routing_key,
"queue_properties": self.queue_properties,
"arguments": self.arguments,
}

def to_consumer_args(self) -> dict[str, Any]:
Expand All @@ -46,4 +48,5 @@ def to_consumer_args(self) -> dict[str, Any]:
"binding_type": self.binding_type,
"queue_properties": self.queue_properties,
"binding_arguments": self.binding_arguments,
"arguments": self.arguments,
}
14 changes: 14 additions & 0 deletions src/asyncapi_python/contrib/wire/amqp/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
binding_type: AmqpBindingType = AmqpBindingType.QUEUE,
queue_properties: dict[str, Any] | None = None,
binding_arguments: dict[str, Any] | None = None,
arguments: dict[str, Any] | None = None,
):
self._connection = connection
self._queue_name = queue_name
Expand All @@ -45,6 +46,7 @@ def __init__(
self._binding_type = binding_type
self._queue_properties = queue_properties or {}
self._binding_arguments = binding_arguments or {}
self._arguments = arguments or {}
self._channel: AbstractChannel | None = None
self._queue: AbstractQueue | None = None
self._exchange: AbstractExchange | None = None
Expand All @@ -67,6 +69,7 @@ async def start(self) -> None:
durable=self._queue_properties.get("durable", True),
exclusive=self._queue_properties.get("exclusive", False),
auto_delete=self._queue_properties.get("auto_delete", False),
arguments=self._arguments,
)

# Simple queue binding pattern (default exchange)
Expand All @@ -76,6 +79,7 @@ async def start(self) -> None:
durable=self._queue_properties.get("durable", True),
exclusive=self._queue_properties.get("exclusive", False),
auto_delete=self._queue_properties.get("auto_delete", False),
arguments=self._arguments,
)

# Routing key binding pattern (pub/sub with named exchange)
Expand All @@ -87,24 +91,28 @@ async def start(self) -> None:
name=self._exchange_name,
type=ExchangeType.DIRECT,
durable=True,
arguments=self._arguments,
)
case "topic":
self._exchange = await self._channel.declare_exchange(
name=self._exchange_name,
type=ExchangeType.TOPIC,
durable=True,
arguments=self._arguments,
)
case "fanout":
self._exchange = await self._channel.declare_exchange(
name=self._exchange_name,
type=ExchangeType.FANOUT,
durable=True,
arguments=self._arguments,
)
case "headers":
self._exchange = await self._channel.declare_exchange(
name=self._exchange_name,
type=ExchangeType.HEADERS,
durable=True,
arguments=self._arguments,
)
case unknown_type:
raise ValueError(f"Unsupported exchange type: {unknown_type}")
Expand All @@ -115,6 +123,7 @@ async def start(self) -> None:
durable=self._queue_properties.get("durable", False),
exclusive=self._queue_properties.get("exclusive", True),
auto_delete=self._queue_properties.get("auto_delete", True),
arguments=self._arguments,
)

# Bind queue to exchange with routing key
Expand All @@ -129,24 +138,28 @@ async def start(self) -> None:
name=self._exchange_name,
type=ExchangeType.FANOUT,
durable=True,
arguments=self._arguments,
)
case "headers":
self._exchange = await self._channel.declare_exchange(
name=self._exchange_name,
type=ExchangeType.HEADERS,
durable=True,
arguments=self._arguments,
)
case "topic":
self._exchange = await self._channel.declare_exchange(
name=self._exchange_name,
type=ExchangeType.TOPIC,
durable=True,
arguments=self._arguments,
)
case "direct":
self._exchange = await self._channel.declare_exchange(
name=self._exchange_name,
type=ExchangeType.DIRECT,
durable=True,
arguments=self._arguments,
)
case unknown_type:
raise ValueError(f"Unsupported exchange type: {unknown_type}")
Expand All @@ -157,6 +170,7 @@ async def start(self) -> None:
durable=self._queue_properties.get("durable", False),
exclusive=self._queue_properties.get("exclusive", True),
auto_delete=self._queue_properties.get("auto_delete", True),
arguments=self._arguments,
)

# Bind queue to exchange with binding arguments (for headers exchange)
Expand Down
23 changes: 19 additions & 4 deletions src/asyncapi_python/contrib/wire/amqp/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ def __init__(
exchange_type: str = "direct",
routing_key: str = "",
queue_properties: dict[str, Any] | None = None,
arguments: dict[str, Any] | None = None,
):
self._connection = connection
self._queue_name = queue_name
self._exchange_name = exchange_name
self._exchange_type = exchange_type
self._routing_key = routing_key
self._queue_properties = queue_properties or {}
self._arguments = arguments or {}
self._channel: AbstractChannel | None = None
self._target_exchange: AbstractExchange | None = None
self._started = False
Expand All @@ -61,27 +63,40 @@ async def start(self) -> None:
durable=self._queue_properties.get("durable", True),
exclusive=self._queue_properties.get("exclusive", False),
auto_delete=self._queue_properties.get("auto_delete", False),
arguments=self._arguments,
)

# Named exchange patterns
case (exchange_name, "direct"):
self._target_exchange = await self._channel.declare_exchange(
name=exchange_name, type=ExchangeType.DIRECT, durable=True
name=exchange_name,
type=ExchangeType.DIRECT,
durable=True,
arguments=self._arguments,
)

case (exchange_name, "topic"):
self._target_exchange = await self._channel.declare_exchange(
name=exchange_name, type=ExchangeType.TOPIC, durable=True
name=exchange_name,
type=ExchangeType.TOPIC,
durable=True,
arguments=self._arguments,
)

case (exchange_name, "fanout"):
self._target_exchange = await self._channel.declare_exchange(
name=exchange_name, type=ExchangeType.FANOUT, durable=True
name=exchange_name,
type=ExchangeType.FANOUT,
durable=True,
arguments=self._arguments,
)

case (exchange_name, "headers"):
self._target_exchange = await self._channel.declare_exchange(
name=exchange_name, type=ExchangeType.HEADERS, durable=True
name=exchange_name,
type=ExchangeType.HEADERS,
durable=True,
arguments=self._arguments,
)

case (exchange_name, unknown_type):
Expand Down
29 changes: 29 additions & 0 deletions src/asyncapi_python/contrib/wire/amqp/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def resolve_amqp_config(
"exclusive": True,
"auto_delete": True,
},
arguments={},
)

# Reply channel with explicit address - check if direct queue or topic exchange
Expand All @@ -133,6 +134,7 @@ def resolve_amqp_config(
"exclusive": True,
"auto_delete": True,
},
arguments={},
)
else:
# Topic-based reply pattern - shared exchange with filtering
Expand All @@ -143,6 +145,7 @@ def resolve_amqp_config(
routing_key=app_id, # Filter messages by app_id
binding_type=AmqpBindingType.REPLY,
queue_properties={"durable": True, "exclusive": False},
arguments={},
)

# Reply channel with binding - defer to binding resolution
Expand Down Expand Up @@ -192,6 +195,7 @@ def resolve_amqp_config(
routing_key=resolved_address,
binding_type=AmqpBindingType.QUEUE,
queue_properties={"durable": True, "exclusive": False},
arguments={},
)

# Operation name pattern (fallback)
Expand All @@ -204,6 +208,7 @@ def resolve_amqp_config(
routing_key=op_name,
binding_type=AmqpBindingType.QUEUE,
queue_properties={"durable": True, "exclusive": False},
arguments={},
)

# No match - reject creation
Expand Down Expand Up @@ -245,20 +250,24 @@ def resolve_queue_binding(
# Extract queue properties
queue_config = getattr(binding, "queue", None)
queue_properties = {"durable": True, "exclusive": False} # Defaults
arguments: dict[str, Any] = {}
if queue_config:
if hasattr(queue_config, "durable"):
queue_properties["durable"] = queue_config.durable
if hasattr(queue_config, "exclusive"):
queue_properties["exclusive"] = queue_config.exclusive
if hasattr(queue_config, "auto_delete"):
queue_properties["auto_delete"] = queue_config.auto_delete
if hasattr(queue_config, "arguments") and queue_config.arguments:
arguments = queue_config.arguments

return AmqpConfig(
queue_name=queue_name,
exchange_name="", # Queue bindings use default exchange
routing_key=queue_name, # For default exchange, routing_key = queue_name
binding_type=AmqpBindingType.QUEUE,
queue_properties=queue_properties,
arguments=arguments,
)


Expand Down Expand Up @@ -303,6 +312,15 @@ def resolve_routing_key_binding(
if exchange_config and hasattr(exchange_config, "type"):
exchange_type = exchange_config.type

# Extract exchange arguments
arguments: dict[str, Any] = {}
if (
exchange_config
and hasattr(exchange_config, "arguments")
and exchange_config.arguments
):
arguments = exchange_config.arguments

# Determine routing key - this is where wildcards are allowed
match (getattr(binding, "routingKey", None), channel.address, operation_name):
case (routing_key, _, _) if routing_key:
Expand All @@ -327,6 +345,7 @@ def resolve_routing_key_binding(
routing_key=resolved_routing_key,
binding_type=AmqpBindingType.ROUTING_KEY,
queue_properties={"durable": False, "exclusive": True, "auto_delete": True},
arguments=arguments,
)


Expand Down Expand Up @@ -366,6 +385,15 @@ def resolve_exchange_binding(
if exchange_config and hasattr(exchange_config, "type"):
exchange_type = exchange_config.type

# Extract exchange arguments
arguments: dict[str, Any] = {}
if (
exchange_config
and hasattr(exchange_config, "arguments")
and exchange_config.arguments
):
arguments = exchange_config.arguments

# Extract binding arguments for headers exchange from dataclass
binding_args: dict[str, Any] = {}
# Note: bindingKeys is not part of AmqpChannelBinding spec
Expand All @@ -379,4 +407,5 @@ def resolve_exchange_binding(
binding_type=AmqpBindingType.EXCHANGE,
queue_properties={"durable": False, "exclusive": True, "auto_delete": True},
binding_arguments=binding_args,
arguments=arguments,
)
8 changes: 6 additions & 2 deletions src/asyncapi_python/kernel/document/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ class AmqpExchange:
durable: Optional[bool] = None
auto_delete: Optional[bool] = None
vhost: Optional[str] = None
arguments: Optional[Dict[str, Any]] = None

def __repr__(self) -> str:
"""Custom repr to handle enum properly for code generation."""
from asyncapi_python.kernel.document.bindings import AmqpExchangeType

_ = AmqpExchangeType # Explicitly reference the import
return f"spec.AmqpExchange(name={self.name!r}, type=spec.AmqpExchangeType.{self.type.name}, durable={self.durable!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r})"
return f"spec.AmqpExchange(name={self.name!r}, type=spec.AmqpExchangeType.{self.type.name}, durable={self.durable!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r}, arguments={self.arguments!r})"


@dataclass
Expand All @@ -44,10 +45,11 @@ class AmqpQueue:
exclusive: Optional[bool] = None
auto_delete: Optional[bool] = None
vhost: Optional[str] = None
arguments: Optional[Dict[str, Any]] = None

def __repr__(self) -> str:
"""Custom repr for code generation."""
return f"spec.AmqpQueue(name={self.name!r}, durable={self.durable!r}, exclusive={self.exclusive!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r})"
return f"spec.AmqpQueue(name={self.name!r}, durable={self.durable!r}, exclusive={self.exclusive!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r}, arguments={self.arguments!r})"


@dataclass
Expand Down Expand Up @@ -159,6 +161,7 @@ def create_amqp_binding_from_dict(binding_dict: Dict[str, Any]) -> AmqpChannelBi
exclusive=queue_config.get("exclusive"),
auto_delete=queue_config.get("auto_delete"),
vhost=queue_config.get("vhost"),
arguments=queue_config.get("arguments"),
)
elif binding_type == "routingKey" and "exchange" in binding_dict:
exchange_config = binding_dict["exchange"]
Expand All @@ -176,6 +179,7 @@ def create_amqp_binding_from_dict(binding_dict: Dict[str, Any]) -> AmqpChannelBi
durable=exchange_config.get("durable"),
auto_delete=exchange_config.get("auto_delete"),
vhost=exchange_config.get("vhost"),
arguments=exchange_config.get("arguments"),
)

return binding
Loading