Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "asyncapi-python"
version = "0.3.0rc6"
version = "0.3.0rc8"
license = { text = "Apache-2.0" }
description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications."
authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }]
Expand Down
19 changes: 3 additions & 16 deletions src/asyncapi_python/contrib/wire/amqp/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
from asyncapi_python.kernel.document.bindings import AmqpChannelBinding
from asyncapi_python.kernel.document.channel import Channel
from asyncapi_python.kernel.wire import EndpointParams
from asyncapi_python.kernel.wire.utils import (
substitute_parameters,
validate_parameters_strict,
)
from asyncapi_python.kernel.wire.utils import substitute_parameters

from .config import AmqpBindingType, AmqpConfig

Expand Down Expand Up @@ -186,8 +183,7 @@ def resolve_amqp_config(

# Channel address pattern (with parameter substitution)
case (False, None, address, _) if address:
# Strict validation for implicit queue binding
validate_parameters_strict(channel, param_values)
# Validate no wildcards for implicit queue binding
_validate_no_wildcards_in_queue(param_values)
resolved_address = substitute_parameters(address, param_values)
return AmqpConfig(
Expand All @@ -200,8 +196,7 @@ def resolve_amqp_config(

# Operation name pattern (fallback)
case (False, None, None, op_name) if op_name:
# Strict validation for implicit queue binding
validate_parameters_strict(channel, param_values)
# Validate no wildcards for implicit queue binding
_validate_no_wildcards_in_queue(param_values)
return AmqpConfig(
queue_name=op_name,
Expand All @@ -228,13 +223,9 @@ def resolve_queue_binding(
"""Resolve AMQP queue binding configuration

Queue bindings require:
- All channel parameters must be provided (strict validation)
- No wildcards allowed in parameter values
"""

# Strict validation: all parameters required, exact match
validate_parameters_strict(channel, param_values)

# Validate no wildcards in queue binding parameters
_validate_no_wildcards_in_queue(param_values)

Expand Down Expand Up @@ -280,14 +271,10 @@ def resolve_routing_key_binding(
"""Resolve AMQP routing key binding configuration for pub/sub patterns

For routing key bindings:
- All channel-defined parameters must be provided (strict validation)
- Parameter values can explicitly contain wildcards ('*' or '#')
- Wildcards are allowed for topic exchange pattern matching
"""

# Strict validation: all parameters required, exact match
validate_parameters_strict(channel, param_values)

# Determine exchange name and type
# For exchange name, we need concrete values (no wildcards)
# If param_values has placeholders, use them; otherwise use literal exchange name
Expand Down
6 changes: 6 additions & 0 deletions src/asyncapi_python/kernel/endpoint/rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing_extensions import Unpack

from asyncapi_python.kernel.wire import Consumer, Producer
from asyncapi_python.kernel.wire.utils import validate_parameters_strict

from ..exceptions import Reject
from ..typing import (
Expand Down Expand Up @@ -63,6 +64,11 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None:
if not self._reply_codecs:
raise RuntimeError("RPC server operation has no reply messages defined")

# Validate subscription parameters before creating consumer
validate_parameters_strict(
self._operation.channel, self._subscription_parameters
)

# Create consumer for receiving requests
self._consumer = await self._wire.create_consumer(
channel=self._operation.channel,
Expand Down
7 changes: 7 additions & 0 deletions src/asyncapi_python/kernel/endpoint/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing_extensions import Unpack

from asyncapi_python.kernel.wire import Consumer
from asyncapi_python.kernel.wire.utils import validate_parameters_strict

from ..exceptions import Reject
from ..typing import BatchConfig, BatchConsumer, Handler, IncomingMessage, T_Input
Expand Down Expand Up @@ -46,6 +47,11 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None:
f"Use @{self._operation.key} decorator to register a handler function."
)

# Validate subscription parameters before creating consumer
validate_parameters_strict(
self._operation.channel, self._subscription_parameters
)

# Create consumer from wire factory
self._consumer = await self._wire.create_consumer(
channel=self._operation.channel,
Expand Down Expand Up @@ -331,3 +337,4 @@ async def process_batch():
# If processing remaining batch fails, just nack all and continue
for _, wire_message in batch:
await wire_message.nack()
await wire_message.nack()
24 changes: 15 additions & 9 deletions src/asyncapi_python_codegen/validation/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ def location_must_be_payload(ctx: ValidationContext) -> list[ValidationIssue]:

@rule("core")
def location_path_exists_in_schema(ctx: ValidationContext) -> list[ValidationIssue]:
"""Validate location path exists in message payload schemas."""
"""Validate location path exists in ALL message payload schemas.

Parameters with location fields must reference paths that exist in every
message in the channel, not just some of them. This prevents runtime errors
when processing messages that lack the required field.
"""
issues = []

for channel_key, channel_def in ctx.get_channels().items():
Expand All @@ -273,22 +278,23 @@ def location_path_exists_in_schema(ctx: ValidationContext) -> list[ValidationIss
path = location.replace("$message.payload#/", "")
parts = [p for p in path.split("/") if p]

# Check if path exists in ANY message schema
path_found = False
for msg_def in messages.values():
# Check if path exists in ALL message schemas
missing_in_messages = []
for msg_name, msg_def in messages.items():
if not isinstance(msg_def, dict):
continue
if _path_exists_in_schema(msg_def.get("payload"), parts):
path_found = True
break
if not _path_exists_in_schema(msg_def.get("payload"), parts):
missing_in_messages.append(msg_name)

if not path_found and messages:
if missing_in_messages:
issues.append(
ValidationIssue(
severity=Severity.ERROR,
message=f"Parameter '{param_name}' location path '{path}' not found in message schemas",
message=f"Parameter '{param_name}' location path '{path}' not found in all message schemas. "
f"Missing in: {', '.join(missing_in_messages)}",
path=f"$.channels.{channel_key}.parameters.{param_name}.location",
rule="location-path-exists-in-schema",
suggestion=f"Add '{path}' field to all message payloads in this channel",
)
)

Expand Down
171 changes: 171 additions & 0 deletions tests/codegen/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,177 @@ def test_parameter_with_location_warns_not_implemented(tmp_path: Path):
assert "myOp" in operations


def test_location_path_must_exist_in_all_messages(tmp_path: Path):
"""Test that parameter location path must exist in ALL messages, not just some."""
spec_file = tmp_path / "location_missing_in_some.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
alerts:
address: alerts.{location}
parameters:
location:
location: $message.payload#/location
bindings:
amqp:
is: routingKey
exchange:
name: alerts_exchange
type: topic
messages:
alert1:
payload:
type: object
properties:
location:
type: string
message:
type: string
alert2:
payload:
type: object
properties:
message:
type: string
operations:
sendAlert:
action: send
channel:
$ref: '#/channels/alerts'
"""
)

with pytest.raises(ValidationError) as exc_info:
extract_all_operations(spec_file)

# Should fail because 'location' field is missing in alert2
assert any(
"not found in all message schemas" in error.message
and "alert2" in error.message
for error in exc_info.value.errors
)


def test_location_path_exists_in_all_messages_passes(tmp_path: Path):
"""Test that validation passes when location exists in all messages."""
spec_file = tmp_path / "location_in_all.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
alerts:
address: alerts.{location}
parameters:
location:
location: $message.payload#/location
bindings:
amqp:
is: routingKey
exchange:
name: alerts_exchange
type: topic
messages:
alert1:
payload:
type: object
properties:
location:
type: string
message:
type: string
alert2:
payload:
type: object
properties:
location:
type: string
severity:
type: string
operations:
sendAlert:
action: send
channel:
$ref: '#/channels/alerts'
"""
)

# Should succeed - location exists in both messages
operations = extract_all_operations(spec_file, fail_on_error=True)
assert "sendAlert" in operations


def test_location_path_with_single_message(tmp_path: Path):
"""Test that validation works correctly with single message."""
spec_file = tmp_path / "location_single_message.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
users:
address: users.{userId}
parameters:
userId:
location: $message.payload#/userId
bindings:
amqp:
is: queue
messages:
userEvent:
payload:
type: object
properties:
userId:
type: string
name:
type: string
operations:
publishUser:
action: send
channel:
$ref: '#/channels/users'
"""
)

# Should succeed - location exists in the single message
operations = extract_all_operations(spec_file, fail_on_error=True)
assert "publishUser" in operations


def test_location_path_with_no_messages(tmp_path: Path):
"""Test that validation skips channels with no messages."""
spec_file = tmp_path / "location_no_messages.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
emptyChannel:
address: empty.{param}
parameters:
param:
location: $message.payload#/param
bindings:
amqp:
is: queue
operations:
emptyOp:
action: send
channel:
$ref: '#/channels/emptyChannel'
messages:
- payload:
type: object
properties:
param:
type: string
"""
)

# Should succeed - validation skips channels with no messages
operations = extract_all_operations(spec_file, fail_on_error=True)
assert "emptyOp" in operations


def test_undefined_placeholders_in_address(tmp_path: Path):
"""Test that undefined placeholders in address raise error."""
spec_file = tmp_path / "undefined_params.yaml"
Expand Down
Loading
Loading