Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/asyncapi_python_codegen/validation/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ def location_must_be_payload(ctx: ValidationContext) -> list[ValidationIssue]:

@rule("core")
def location_path_exists_in_schema(ctx: ValidationContext) -> list[ValidationIssue]:
"""Validate location path exists in message payload schemas."""
"""Validate location path exists in ALL message payload schemas.

Parameters with location fields must reference paths that exist in every
message in the channel, not just some of them. This prevents runtime errors
when processing messages that lack the required field.
"""
issues = []

for channel_key, channel_def in ctx.get_channels().items():
Expand All @@ -273,22 +278,23 @@ def location_path_exists_in_schema(ctx: ValidationContext) -> list[ValidationIss
path = location.replace("$message.payload#/", "")
parts = [p for p in path.split("/") if p]

# Check if path exists in ANY message schema
path_found = False
for msg_def in messages.values():
# Check if path exists in ALL message schemas
missing_in_messages = []
for msg_name, msg_def in messages.items():
if not isinstance(msg_def, dict):
continue
if _path_exists_in_schema(msg_def.get("payload"), parts):
path_found = True
break
if not _path_exists_in_schema(msg_def.get("payload"), parts):
missing_in_messages.append(msg_name)

if not path_found and messages:
if missing_in_messages:
issues.append(
ValidationIssue(
severity=Severity.ERROR,
message=f"Parameter '{param_name}' location path '{path}' not found in message schemas",
message=f"Parameter '{param_name}' location path '{path}' not found in all message schemas. "
f"Missing in: {', '.join(missing_in_messages)}",
path=f"$.channels.{channel_key}.parameters.{param_name}.location",
rule="location-path-exists-in-schema",
suggestion=f"Add '{path}' field to all message payloads in this channel",
)
)

Expand Down
171 changes: 171 additions & 0 deletions tests/codegen/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,177 @@ def test_parameter_with_location_warns_not_implemented(tmp_path: Path):
assert "myOp" in operations


def test_location_path_must_exist_in_all_messages(tmp_path: Path):
"""Test that parameter location path must exist in ALL messages, not just some."""
spec_file = tmp_path / "location_missing_in_some.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
alerts:
address: alerts.{location}
parameters:
location:
location: $message.payload#/location
bindings:
amqp:
is: routingKey
exchange:
name: alerts_exchange
type: topic
messages:
alert1:
payload:
type: object
properties:
location:
type: string
message:
type: string
alert2:
payload:
type: object
properties:
message:
type: string
operations:
sendAlert:
action: send
channel:
$ref: '#/channels/alerts'
"""
)

with pytest.raises(ValidationError) as exc_info:
extract_all_operations(spec_file)

# Should fail because 'location' field is missing in alert2
assert any(
"not found in all message schemas" in error.message
and "alert2" in error.message
for error in exc_info.value.errors
)


def test_location_path_exists_in_all_messages_passes(tmp_path: Path):
"""Test that validation passes when location exists in all messages."""
spec_file = tmp_path / "location_in_all.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
alerts:
address: alerts.{location}
parameters:
location:
location: $message.payload#/location
bindings:
amqp:
is: routingKey
exchange:
name: alerts_exchange
type: topic
messages:
alert1:
payload:
type: object
properties:
location:
type: string
message:
type: string
alert2:
payload:
type: object
properties:
location:
type: string
severity:
type: string
operations:
sendAlert:
action: send
channel:
$ref: '#/channels/alerts'
"""
)

# Should succeed - location exists in both messages
operations = extract_all_operations(spec_file, fail_on_error=True)
assert "sendAlert" in operations


def test_location_path_with_single_message(tmp_path: Path):
"""Test that validation works correctly with single message."""
spec_file = tmp_path / "location_single_message.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
users:
address: users.{userId}
parameters:
userId:
location: $message.payload#/userId
bindings:
amqp:
is: queue
messages:
userEvent:
payload:
type: object
properties:
userId:
type: string
name:
type: string
operations:
publishUser:
action: send
channel:
$ref: '#/channels/users'
"""
)

# Should succeed - location exists in the single message
operations = extract_all_operations(spec_file, fail_on_error=True)
assert "publishUser" in operations


def test_location_path_with_no_messages(tmp_path: Path):
"""Test that validation skips channels with no messages."""
spec_file = tmp_path / "location_no_messages.yaml"
spec_file.write_text(
"""
asyncapi: 3.0.0
channels:
emptyChannel:
address: empty.{param}
parameters:
param:
location: $message.payload#/param
bindings:
amqp:
is: queue
operations:
emptyOp:
action: send
channel:
$ref: '#/channels/emptyChannel'
messages:
- payload:
type: object
properties:
param:
type: string
"""
)

# Should succeed - validation skips channels with no messages
operations = extract_all_operations(spec_file, fail_on_error=True)
assert "emptyOp" in operations


def test_undefined_placeholders_in_address(tmp_path: Path):
"""Test that undefined placeholders in address raise error."""
spec_file = tmp_path / "undefined_params.yaml"
Expand Down
Loading