Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "asyncapi-python"
version = "0.3.0rc4"
version = "0.3.0rc5"
license = { text = "Apache-2.0" }
description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications."
authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }]
Expand Down
141 changes: 112 additions & 29 deletions src/asyncapi_python_codegen/generators/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datamodel_code_generator.__main__ import main as datamodel_codegen

from asyncapi_python.kernel.document import Operation
from asyncapi_python_codegen.parser.types import ParseContext, navigate_json_pointer


class MessageGenerator:
Expand Down Expand Up @@ -67,35 +68,110 @@ def _collect_message_schemas(
return schemas # type: ignore[return-value]

def _load_component_schemas(self, spec_path: Path) -> dict[str, Any]:
"""Load component schemas from the AsyncAPI specification file."""
try:
with spec_path.open("r") as f:
spec = yaml.safe_load(f)
"""Load component schemas from the AsyncAPI specification file and all referenced files."""
all_schemas: dict[str, Any] = {}
visited_files: set[Path] = set()

components = spec.get("components", {})
schemas = components.get("schemas", {})
messages = components.get("messages", {})
def load_schemas_from_file(file_path: Path) -> None:
"""Recursively load schemas from a file and its references."""
abs_path = file_path.absolute()

# Combine schemas and message payloads
all_schemas = {}
# Avoid infinite loops
if abs_path in visited_files:
return
visited_files.add(abs_path)

# Add component schemas directly
for schema_name, schema_def in schemas.items():
all_schemas[schema_name] = schema_def
try:
with abs_path.open("r") as f:
spec = yaml.safe_load(f)

# Add message payloads from components (only if not already present from schemas)
for msg_name, msg_def in messages.items():
if isinstance(msg_def, dict) and "payload" in msg_def:
schema_name = self._to_pascal_case(msg_name)
# Only add if we don't already have this schema from the schemas section
components = spec.get("components", {})
schemas = components.get("schemas", {})
messages = components.get("messages", {})

# Add component schemas directly
for schema_name, schema_def in schemas.items():
if schema_name not in all_schemas:
all_schemas[schema_name] = msg_def["payload"]
# Check if this schema is itself a reference
if isinstance(schema_def, dict) and "$ref" in schema_def:
ref_value: Any = schema_def["$ref"] # type: ignore[misc]
# Resolve the reference using ParseContext utilities
if isinstance(ref_value, str):
try:
context = ParseContext(abs_path)
target_context = context.resolve_reference(
ref_value
)

# Load and navigate to the referenced schema
with target_context.filepath.open("r") as ref_file:
ref_spec = yaml.safe_load(ref_file)

if target_context.json_pointer:
resolved_schema = navigate_json_pointer(
ref_spec, target_context.json_pointer
)
else:
resolved_schema = ref_spec

all_schemas[schema_name] = resolved_schema
except Exception as e:
print(
f"Warning: Could not resolve reference {ref_value} in {abs_path}: {e}"
)
all_schemas[schema_name] = schema_def
else:
all_schemas[schema_name] = schema_def

# Add message payloads from components
for msg_name, msg_def in messages.items():
if isinstance(msg_def, dict) and "payload" in msg_def:
schema_name = self._to_pascal_case(msg_name)
if schema_name not in all_schemas:
all_schemas[schema_name] = msg_def["payload"]

# Find and process all external file references
self._find_and_process_refs(
spec, abs_path.parent, load_schemas_from_file
)

except Exception as e:
print(f"Warning: Could not load component schemas from {abs_path}: {e}")

# Start loading from the main spec file
load_schemas_from_file(spec_path)

return all_schemas # type: ignore[return-value]

def _find_and_process_refs(
self, data: Any, base_dir: Path, process_file: Any
) -> None:
"""Recursively find all $ref entries pointing to external files."""
if isinstance(data, dict):
# Check if this is a reference
if "$ref" in data:
ref_value: Any = data["$ref"] # type: ignore[misc]
if isinstance(ref_value, str) and not ref_value.startswith("#"):
# External reference - extract file path
file_part: str
if "#" in ref_value:
file_part = ref_value.split("#")[0]
else:
file_part = ref_value

if file_part:
# Resolve relative path
ref_path = (base_dir / file_part).resolve()
process_file(ref_path)

return all_schemas # type: ignore[return-value]
# Recurse into all dict values
for value in data.values(): # type: ignore[misc]
self._find_and_process_refs(value, base_dir, process_file)

except Exception as e:
print(f"Warning: Could not load component schemas from {spec_path}: {e}")
return {}
elif isinstance(data, list):
# Recurse into all list items
for item in data: # type: ignore[misc]
self._find_and_process_refs(item, base_dir, process_file)

def _resolve_references(self, schemas: dict[str, Any]) -> dict[str, Any]:
"""Recursively resolve $ref references to use #/$defs/... instead of #/components/schemas/..."""
Expand All @@ -105,17 +181,24 @@ def resolve_in_object(obj: Any) -> Any:
resolved_obj: dict[str, Any] = {}
for key, value in obj.items(): # type: ignore[misc]
if key == "$ref" and isinstance(value, str):
# Transform references from #/components/schemas/... to #/$defs/...
if value.startswith("#/components/schemas/"):
schema_name = value.split("/")[-1]
# Extract schema name from the reference
schema_name = value.split("/")[-1]

# Transform all component references to #/$defs/...
if "#/components/schemas/" in value:
# Internal or external schema reference
resolved_obj[key] = f"#/$defs/{schema_name}"
elif value.startswith("#/components/messages/"):
elif "#/components/messages/" in value:
# Handle message references - convert message name to PascalCase
msg_name = value.split("/")[-1]
schema_name = self._to_pascal_case(msg_name)
schema_name = self._to_pascal_case(schema_name)
resolved_obj[key] = f"#/$defs/{schema_name}"
else:
elif value.startswith("#"):
# Other internal references, keep as-is
resolved_obj[key] = value
else:
# External file reference (e.g., "./commons2.yaml#/components/schemas/Foo")
# Extract just the schema name and point to #/$defs
resolved_obj[key] = f"#/$defs/{schema_name}"
else:
resolved_obj[key] = resolve_in_object(value)
return resolved_obj
Expand Down
24 changes: 24 additions & 0 deletions tests/codegen/specs/deep_recursion/level1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Level 1: Main entry point
asyncapi: "3.0.0"
info:
title: Deep Recursion Test - Level 1
version: 1.0.0
description: Main file that starts the 4-level reference chain

operations:
process.data:
action: send
channel:
$ref: "level2.yaml#/channels/data_channel"

components:
schemas:
Level1Schema:
type: object
properties:
level:
type: integer
const: 1
message:
type: string
const: "from_level_1"
22 changes: 22 additions & 0 deletions tests/codegen/specs/deep_recursion/level2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Level 2: References Level 3
channels:
data_channel:
address: data.queue
title: Data Channel from Level 2
messages:
data_message:
$ref: "level3.yaml#/components/messages/DataMessage"

components:
schemas:
Level2Schema:
type: object
properties:
level:
type: integer
const: 2
message:
type: string
const: "from_level_2"
level1_ref:
$ref: "level1.yaml#/components/schemas/Level1Schema"
28 changes: 28 additions & 0 deletions tests/codegen/specs/deep_recursion/level3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Level 3: References Level 4
components:
messages:
DataMessage:
title: Data Message from Level 3
payload:
type: object
properties:
id:
type: string
level3_data:
type: string
const: "from_level_3"
deep_schema:
$ref: "level4.yaml#/components/schemas/Level4Schema"

schemas:
Level3Schema:
type: object
properties:
level:
type: integer
const: 3
message:
type: string
const: "from_level_3"
level2_ref:
$ref: "level2.yaml#/components/schemas/Level2Schema"
21 changes: 21 additions & 0 deletions tests/codegen/specs/deep_recursion/level4.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Level 4: Deepest level - no more references
components:
schemas:
Level4Schema:
type: object
properties:
level:
type: integer
const: 4
message:
type: string
const: "from_level_4_deepest"
metadata:
type: object
properties:
depth:
type: integer
const: 4
status:
type: string
enum: ["deep", "deeper", "deepest"]
42 changes: 42 additions & 0 deletions tests/codegen/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,45 @@ def test_invalid_yaml_structure():
extract_all_operations(invalid_yaml)
finally:
invalid_yaml.unlink(missing_ok=True)


def test_four_level_deep_recursion():
"""Test 4-level deep file reference chain: Level1->Level2->Level3->Level4.

This test verifies that the MessageGenerator recursively collects component schemas
from all referenced files, not just the main spec file.
"""
from src.asyncapi_python_codegen.generators.messages import MessageGenerator

spec_path = Path("tests/codegen/specs/deep_recursion/level1.yaml")

# Test that MessageGenerator collects schemas from all 4 levels
generator = MessageGenerator()
schemas = generator._load_component_schemas(spec_path)

# Without recursive file loading, we would only get Level1Schema
# With recursive loading, we should get schemas from all 4 files
assert "Level1Schema" in schemas, "Level1Schema from main file not found"
assert (
"Level2Schema" in schemas
), "Level2Schema from level2.yaml not found (recursive loading failed)"
assert (
"Level3Schema" in schemas
), "Level3Schema from level3.yaml not found (recursive loading failed)"
assert (
"Level4Schema" in schemas
), "Level4Schema from level4.yaml not found (recursive loading failed)"
assert "DataMessage" in schemas, "DataMessage from level3.yaml not found"

# Verify the deepest level schema has correct structure
level4_schema = schemas["Level4Schema"]
assert level4_schema["properties"]["level"]["const"] == 4
assert level4_schema["properties"]["message"]["const"] == "from_level_4_deepest"

# Also verify operations can be extracted (tests parser, not generator)
operations = extract_all_operations(spec_path)
assert len(operations) == 1

process_data = operations["process.data"]
assert process_data.channel.address == "data.queue"
assert process_data.channel.title == "Data Channel from Level 2"
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading