-
Notifications
You must be signed in to change notification settings - Fork 16
Description
Summary
When using the recommended multi-queue pattern (one worker per queue via with_queues()), it is not possible to enqueue a task to a different queue from within a running task.
Setup
broker = AioPikaBroker(url, task_queues=[queue_default, queue_mail])
@broker.task("default_task")
async def default_task():
await mail_task.kiq() # wants to enqueue to queue_mail
@broker.task("mail_task", queue_name="queue_mail")
async def mail_task(): ...
def get_worker_default():
return broker.with_queues(queue_default)
def get_worker_mail():
return broker.with_queues(queue_mail)Workers started with:
taskiq worker ... -fsd myapp:get_worker_default
taskiq worker ... -fsd myapp:get_worker_mail
Problem
with_queues() mutates _task_queues on the original broker instance (self._task_queues = list(queues); return self). After the worker factory is called at startup, the shared broker object is permanently narrowed to a single queue.
Any .kiq() call from inside default_task then hits the len(_task_queues) == 1 shortcut in kick() and silently routes mail_task to queue_default instead of queue_mail.
Why copy.copy() doesn't fix it
A shallow copy before calling with_queues() avoids mutating the original, but introduces a different problem:
def get_worker_default():
return copy.copy(broker).with_queues(queue_default)Taskiq calls startup() on the returned (narrowed) copy, which establishes the AMQP write_channel on the copy. The original broker — the instance held by every @broker.task decorator — never has startup() called in the worker process. Its write_channel stays None, so every .kiq() call raises NoStartupError.
Root Cause
_task_queues serves two distinct purposes:
- Consuming — which queues this worker process binds to and reads from.
- Publishing validation — the routing key lookup in
kick().
with_queues() was designed for purpose 1, but because both share the same field, narrowing for listening silently breaks publishing.
Expected Behaviour
A task running in a dedicated-queue worker should be able to enqueue tasks to any of the broker's configured queues, regardless of which subset the worker is listening to.
Suggested Fix
Separate the two concerns internally. For example:
- Add a
_listening_queues: list[Queue] | Nonefield (defaults toNone= listen to all). with_queues()sets only_listening_queues.- Queue binding/consumption during
startup()uses_listening_queues(falling back to_task_queues). kick()always validates against the full_task_queues.
This would make with_queues() a true "listen-only narrowing" operation without any side-effect on publishing.