Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/amqp-topic/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Generated code directories
publisher/
subscriber1/
subscriber2/

# Virtual environment
.venv/
64 changes: 64 additions & 0 deletions examples/amqp-topic/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
.PHONY: venv install generate publisher subscriber1 subscriber2 clean help

# Virtual environment
VENV_NAME := .venv
PYTHON := $(VENV_NAME)/bin/python
PIP := $(VENV_NAME)/bin/pip
CODEGEN := $(VENV_NAME)/bin/asyncapi-python-codegen

help:
@echo "Available targets:"
@echo " make venv - Create virtual environment"
@echo " make install - Install dependencies"
@echo " make generate - Generate code from AsyncAPI specs"
@echo " make publisher - Run the publisher"
@echo " make subscriber1 - Run subscriber 1"
@echo " make subscriber2 - Run subscriber 2"
@echo " make clean - Remove virtual environment and generated code"

venv:
@echo "Creating virtual environment..."
python3 -m venv $(VENV_NAME)
@echo "✅ Virtual environment created"

install: venv
@echo "Installing dependencies..."
$(PIP) install -e ../../[amqp,codegen]
@echo "✅ Dependencies installed"

generate: install
@echo "Generating publisher code..."
$(CODEGEN) spec/publisher.asyncapi.yaml publisher --force
@echo "✅ Publisher code generated"
@echo ""
@echo "Generating subscriber1 code..."
$(CODEGEN) spec/subscriber1.asyncapi.yaml subscriber1 --force
@echo "✅ Subscriber1 code generated"
@echo ""
@echo "Generating subscriber2 code..."
$(CODEGEN) spec/subscriber2.asyncapi.yaml subscriber2 --force
@echo "✅ Subscriber2 code generated"

publisher: generate
@echo "Starting publisher..."
@echo ""
$(PYTHON) main-publisher.py

subscriber1: generate
@echo "Starting subscriber 1..."
@echo ""
$(PYTHON) main-subscriber1.py

subscriber2: generate
@echo "Starting subscriber 2..."
@echo ""
$(PYTHON) main-subscriber2.py

clean:
@echo "Cleaning up..."
rm -rf $(VENV_NAME)
rm -rf publisher/
rm -rf subscriber1/
rm -rf subscriber2/
rm -rf __pycache__
@echo "✅ Cleanup complete"
138 changes: 138 additions & 0 deletions examples/amqp-topic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# AMQP Topic Exchange Example

Demonstrates **parameterized channels with wildcard subscriptions** using AMQP topic exchanges.

## Overview

Weather alert system showing:
- Publishers send to specific routing keys (concrete parameters)
- Subscribers use wildcards (`*` and `#`) for pattern matching
- Topic exchange routes messages based on routing key patterns

## Architecture

```
Topic Exchange: weather_alerts
Channel: weather.{location}.{severity}

Routing Keys:
weather.NYC.high
weather.LA.low
weather.CHI.critical
```

## Project Structure

```
examples/amqp-topic/
├── spec/
│ ├── common.asyncapi.yaml # Shared channel/message definitions
│ ├── publisher.asyncapi.yaml # Publisher app spec
│ ├── subscriber1.asyncapi.yaml # Subscriber 1 spec
│ └── subscriber2.asyncapi.yaml # Subscriber 2 spec
├── main-publisher.py # Publisher implementation
├── main-subscriber1.py # Subscriber 1 implementation
├── main-subscriber2.py # Subscriber 2 implementation
├── Makefile # Build and run commands
└── README.md
```

## Usage

### 1. Generate Code

```bash
make generate
```

This generates type-safe Python code from AsyncAPI specs:
- `publisher/` - from `spec/publisher.asyncapi.yaml`
- `subscriber1/` - from `spec/subscriber1.asyncapi.yaml`
- `subscriber2/` - from `spec/subscriber2.asyncapi.yaml`

### 2. Run Publisher

```bash
make publisher
```

Publishes weather alerts to the topic exchange.

### 3. Run Subscribers

Terminal 1:
```bash
make subscriber1
```

Terminal 2:
```bash
make subscriber2
```

## Key Features

### Parameterized Channels

Channel address: `weather.{location}.{severity}`

Parameters are extracted from message payload:
```python
WeatherAlert(
location="NYC", # → {location}
severity="high", # → {severity}
...
)
# Creates routing key: weather.NYC.high
```

### Wildcard Subscriptions

Subscribers can use AMQP wildcards for pattern matching:
- `*` - Matches exactly one word
- `#` - Matches zero or more words

**This Example**:
- **Subscriber 1**: `weather.NYC.*` - All NYC alerts (any severity)
- Uses `parameters={"location": "NYC"}`
- Receives: NYC-HIGH
- **Subscriber 2**: `weather.*.critical` - Critical alerts (any location)
- Uses `parameters={"severity": "critical"}`
- Receives: CHI-CRITICAL

**Other Possible Patterns**:
- `weather.LA.*` - All LA alerts
- `weather.*.high` - High severity alerts from any location
- `weather.*.*` - ALL weather alerts (empty parameters)

### Parameter Validation

The runtime enforces:
- ✅ All required parameters must be provided
- ✅ Exact match required (strict validation)
- ✅ Queue bindings reject wildcards (concrete values only)
- ✅ Routing key bindings accept wildcards (pattern matching)

## Development

### Clean Up

```bash
make clean
```

Removes virtual environment and generated code.

### Help

```bash
make help
```

Shows available Makefile targets.

## Learn More

- [AsyncAPI Specification](https://www.asyncapi.com/)
- [AMQP Topic Exchanges](https://www.rabbitmq.com/tutorials/tutorial-five-python.html)
- [AsyncAPI Python Documentation](https://github.com/yourorg/asyncapi-python)
106 changes: 106 additions & 0 deletions examples/amqp-topic/main-publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Weather Alert Publisher

Publishes weather alerts to an AMQP topic exchange with dynamic routing keys.
The routing key is built from the message payload fields (location and severity).

Example usage:
python main-publisher.py
"""

import asyncio
from datetime import datetime, timezone
from os import environ

from asyncapi_python.contrib.wire.amqp import AmqpWire
from publisher import Application
from publisher.messages.json import WeatherAlert, Severity

# AMQP connection URI (can be overridden via environment variable)
AMQP_URI = environ.get("AMQP_URI", "amqp://guest:guest@localhost")

# Initialize application with AMQP wire
app = Application(AmqpWire(AMQP_URI))


async def main() -> None:
"""Main publisher routine"""
print("🌤️ Weather Alert Publisher")
print("=" * 50)
print(f"Connecting to: {AMQP_URI}")

# Start the application
await app.start()
print("✅ Connected to AMQP broker")
print()

# Sample weather alerts to publish
alerts = [
WeatherAlert(
location="NYC",
severity=Severity.HIGH,
temperature=95,
description="Heat wave warning in effect. Stay hydrated!",
timestamp=datetime.now(timezone.utc),
),
WeatherAlert(
location="LA",
severity=Severity.LOW,
temperature=72,
description="Sunny and pleasant weather expected.",
timestamp=datetime.now(timezone.utc),
),
WeatherAlert(
location="CHI",
severity=Severity.CRITICAL,
temperature=5,
description="Severe winter storm approaching. Travel not recommended.",
timestamp=datetime.now(timezone.utc),
),
WeatherAlert(
location="MIA",
severity=Severity.MEDIUM,
temperature=88,
description="Scattered thunderstorms expected this afternoon.",
timestamp=datetime.now(timezone.utc),
),
WeatherAlert(
location="SEA",
severity=Severity.LOW,
temperature=65,
description="Light rain throughout the day.",
timestamp=datetime.now(timezone.utc),
),
]

# Publish each alert
print("📡 Publishing weather alerts...")
print()

for alert in alerts:
# The routing key will be dynamically built as: weather.{location}.{severity}
# For example: weather.NYC.high, weather.LA.low, etc.
await app.producer.publish_weather_alert(alert)

print(f"✉️ Published alert:")
print(f" Routing Key: weather.{alert.location}.{alert.severity.value}")
print(f" Location: {alert.location}")
print(f" Severity: {alert.severity.value}")
print(f" Temperature: {alert.temperature}°F")
print(f" Description: {alert.description}")
print()

# Small delay between messages for visibility
await asyncio.sleep(0.5)

print(f"✅ Published {len(alerts)} weather alerts")
print()

# Stop the application
await app.stop()
print("👋 Disconnected from AMQP broker")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading