-
Notifications
You must be signed in to change notification settings - Fork 0
Add amgi-aiopika-amqp support. #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
jackburridge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One main change is the commit history, should be in the form of Conventional Commits, they will then be picked up on merge, triggering a bump, then release
| @pytest.fixture | ||
| def queue_name() -> str: | ||
| return f"receive-{uuid4()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @pytest.fixture | |
| def queue_name() -> str: | |
| return f"receive-{uuid4()}" | |
| @pytest.fixture | |
| def queue_name(rabbitmq_container: RabbitMqContainer) -> str: | |
| with BlockingConnection(rabbitmq_container.get_connection_params()) as connection: | |
| with connection.channel() as channel: | |
| queue_name = f"receive-{uuid4()}" | |
| channel.queue_declare(queue_name, durable=True) | |
| return queue_name |
This should probably be declaring the queue (I've used pika here)
| lifespan_shutdown = await receive() | ||
| assert lifespan_shutdown == {"type": "lifespan.shutdown"} | ||
| await send(cast(Any, {"type": "lifespan.shutdown.complete"})) | ||
| serve_task.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need to cancel the serve task here, I've made a suggestion in the implementation that should allow for graceful shutdown
| @@ -0,0 +1,47 @@ | |||
| [build-system] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build system should be uv_build
|
|
||
| [project] | ||
| name = "amgi-aiopika-amqp" | ||
| version = "0.21.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Versions should match the current version of the overall project, they will be bumped automatically
| ## Installation | ||
|
|
||
| ``` | ||
| pip install amgi-aiopika-amqp==0.21.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should match the global project version, it will be bumped automatically
| await message.ack() | ||
| except Exception: | ||
| await message.nack(requeue=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a little confusion here based on the AMGI spec. The n/ack should be sent via the application, the _MessageSend class should handle an n/ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NATS has something very similar https://github.com/asyncfast/amgi/pull/58/files#diff-e1977146c23e034f9016f9c7c850ac8588911997af82e1583465bfa9e5dd703cR32-R54
| self._connection: Optional[AbstractRobustConnection] = None | ||
| self._channel: Optional[AbstractRobustChannel] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably doesn't need to be properties of the class, they can be instantiated in serve() and be passed around
| async with queue.iterator() as queue_iter: | ||
| async for message in queue_iter: | ||
| if self._stop_event.is_set(): | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| async with queue.iterator() as queue_iter: | |
| async for message in queue_iter: | |
| if self._stop_event.is_set(): | |
| break | |
| async with queue.iterator() as queue_iterator: | |
| queue_aiter = aiter(queue_iterator) | |
| async for message in self._stoppable.call(anext, queue_aiter): |
I think Stoppable can be used here. I believe the coroutine waiting for the next message, and this is why the serve task is having to be cancelled in the tests. Stoppable should allow you to handle this cleanly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @@ -0,0 +1,9 @@ | |||
| services: | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its possible to load definitions on rabbitmq start using the load_definitions option, this can fully setup the queues so they do not need to be declared elsewhere
| requires = [ "hatchling" ] | ||
|
|
||
| [project] | ||
| name = "amgi-aiopika-amqp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| name = "amgi-aiopika-amqp" | |
| name = "amgi-aio-pika" |
The other packages are named after the core dependency. Not to be overly pedantic but once a package is published we probably want to stick with the name (Though I have changed them later on... so who really cares)
| url: str = "amqp://guest:guest@localhost/", | ||
| durable: bool = True, | ||
| ) -> None: | ||
| run(app, queues[0], url, durable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be possible to support multiple queues by simultaneously running multiple queue loops, look at the aiobotocore SQS implementation
| await asyncio.gather( |
Add amgi-aiopika-amqp support.
Closes #32.