Skip to content

Conversation

@TomKimber
Copy link

Add amgi-aiopika-amqp support.

Closes #32.

Copy link
Contributor

@jackburridge jackburridge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One main change is the commit history, should be in the form of Conventional Commits, they will then be picked up on merge, triggering a bump, then release

Comment on lines +29 to +31
@pytest.fixture
def queue_name() -> str:
return f"receive-{uuid4()}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@pytest.fixture
def queue_name() -> str:
return f"receive-{uuid4()}"
@pytest.fixture
def queue_name(rabbitmq_container: RabbitMqContainer) -> str:
with BlockingConnection(rabbitmq_container.get_connection_params()) as connection:
with connection.channel() as channel:
queue_name = f"receive-{uuid4()}"
channel.queue_declare(queue_name, durable=True)
return queue_name

This should probably be declaring the queue (I've used pika here)

lifespan_shutdown = await receive()
assert lifespan_shutdown == {"type": "lifespan.shutdown"}
await send(cast(Any, {"type": "lifespan.shutdown.complete"}))
serve_task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to cancel the serve task here, I've made a suggestion in the implementation that should allow for graceful shutdown

@@ -0,0 +1,47 @@
[build-system]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build system should be uv_build


[project]
name = "amgi-aiopika-amqp"
version = "0.21.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Versions should match the current version of the overall project, they will be bumped automatically

## Installation

```
pip install amgi-aiopika-amqp==0.21.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should match the global project version, it will be bumped automatically

Comment on lines +103 to +105
await message.ack()
except Exception:
await message.nack(requeue=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a little confusion here based on the AMGI spec. The n/ack should be sent via the application, the _MessageSend class should handle an n/ack

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +89 to +90
self._connection: Optional[AbstractRobustConnection] = None
self._channel: Optional[AbstractRobustChannel] = None
Copy link
Contributor

@jackburridge jackburridge Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably doesn't need to be properties of the class, they can be instantiated in serve() and be passed around

Comment on lines +114 to +117
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if self._stop_event.is_set():
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if self._stop_event.is_set():
break
async with queue.iterator() as queue_iterator:
queue_aiter = aiter(queue_iterator)
async for message in self._stoppable.call(anext, queue_aiter):

I think Stoppable can be used here. I believe the coroutine waiting for the next message, and this is why the serve task is having to be cancelled in the tests. Stoppable should allow you to handle this cleanly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,9 @@
services:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its possible to load definitions on rabbitmq start using the load_definitions option, this can fully setup the queues so they do not need to be declared elsewhere

https://www.rabbitmq.com/docs/definitions

requires = [ "hatchling" ]

[project]
name = "amgi-aiopika-amqp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name = "amgi-aiopika-amqp"
name = "amgi-aio-pika"

The other packages are named after the core dependency. Not to be overly pedantic but once a package is published we probably want to stick with the name (Though I have changed them later on... so who really cares)

url: str = "amqp://guest:guest@localhost/",
durable: bool = True,
) -> None:
run(app, queues[0], url, durable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be possible to support multiple queues by simultaneously running multiple queue loops, look at the aiobotocore SQS implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AMQP support

3 participants