Skip to content

Cannot enqueue to another queue from within a dedicated-queue worker #51

@NicolasFerec

Description

@NicolasFerec

Summary

When using the recommended multi-queue pattern (one worker per queue via with_queues()), it is not possible to enqueue a task to a different queue from within a running task.

Setup

broker = AioPikaBroker(url, task_queues=[queue_default, queue_mail])

@broker.task("default_task")
async def default_task():
    await mail_task.kiq()  # wants to enqueue to queue_mail

@broker.task("mail_task", queue_name="queue_mail")
async def mail_task(): ...

def get_worker_default():
    return broker.with_queues(queue_default)

def get_worker_mail():
    return broker.with_queues(queue_mail)

Workers started with:

taskiq worker ... -fsd myapp:get_worker_default
taskiq worker ... -fsd myapp:get_worker_mail

Problem

with_queues() mutates _task_queues on the original broker instance (self._task_queues = list(queues); return self). After the worker factory is called at startup, the shared broker object is permanently narrowed to a single queue.

Any .kiq() call from inside default_task then hits the len(_task_queues) == 1 shortcut in kick() and silently routes mail_task to queue_default instead of queue_mail.

Why copy.copy() doesn't fix it

A shallow copy before calling with_queues() avoids mutating the original, but introduces a different problem:

def get_worker_default():
    return copy.copy(broker).with_queues(queue_default)

Taskiq calls startup() on the returned (narrowed) copy, which establishes the AMQP write_channel on the copy. The original broker — the instance held by every @broker.task decorator — never has startup() called in the worker process. Its write_channel stays None, so every .kiq() call raises NoStartupError.

Root Cause

_task_queues serves two distinct purposes:

  1. Consuming — which queues this worker process binds to and reads from.
  2. Publishing validation — the routing key lookup in kick().

with_queues() was designed for purpose 1, but because both share the same field, narrowing for listening silently breaks publishing.

Expected Behaviour

A task running in a dedicated-queue worker should be able to enqueue tasks to any of the broker's configured queues, regardless of which subset the worker is listening to.

Suggested Fix

Separate the two concerns internally. For example:

  • Add a _listening_queues: list[Queue] | None field (defaults to None = listen to all).
  • with_queues() sets only _listening_queues.
  • Queue binding/consumption during startup() uses _listening_queues (falling back to _task_queues).
  • kick() always validates against the full _task_queues.

This would make with_queues() a true "listen-only narrowing" operation without any side-effect on publishing.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions