Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions api-reference/pipecat-subagents/bus.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,66 @@ runner = AgentRunner(bus=bus)
The Redis pub/sub channel name.
</ParamField>

## PgmqBus

Distributed agent bus backed by PGMQ (PostgreSQL Message Queue) for cross-process communication. Each instance creates its own queue and broadcasts messages to all peer queues sharing the same channel prefix.

<Note>Requires the pgmq extra: `uv add "pipecat-ai-subagents[pgmq]"`</Note>

```python
from pgmq.async_queue import PGMQueue
from pipecat_subagents.bus.network.pgmq import PgmqBus

pgmq = PGMQueue(
host="aws-0-us-east-1.pooler.supabase.com",
port="5432",
database="postgres",
username="postgres.<project-ref>",
password="...",
pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat_acme")
runner = AgentRunner(bus=bus)
```

<Tip>
Prefer the session-mode pooler (e.g. port 5432 in Supabase) when available. Transaction-mode pooling (e.g. port 6543) works in practice but may log benign warnings. The connection pool must allow at least 2 concurrent connections (4+ recommended under load).
</Tip>

### Configuration

<ParamField path="pgmq" type="PGMQueue" required>
An initialized `PGMQueue` client. Must call `await pgmq.init()` before passing.
</ParamField>

<ParamField path="serializer" type="MessageSerializer | None" default="None">
The
[`MessageSerializer`](/api-reference/pipecat-subagents/serializers#messageserializer)
for encoding/decoding messages. Defaults to
[`JSONMessageSerializer`](/api-reference/pipecat-subagents/serializers#jsonmessageserializer).
</ParamField>

<ParamField path="channel" type="str" default="pipecat_bus">
Channel prefix for queue names. Sanitized to alphanumeric + underscore.
</ParamField>

<ParamField path="visibility_timeout" type="int" default="30">
Seconds a read message stays invisible before redelivery.
</ParamField>

<ParamField path="batch_size" type="int" default="10">
Maximum messages to fetch per read.
</ParamField>

<ParamField path="poll_interval_ms" type="int" default="100">
Long-poll check interval in milliseconds.
</ParamField>

<ParamField path="max_poll_seconds" type="int" default="5">
Maximum seconds the reader blocks per poll cycle.
</ParamField>

## BusBridgeProcessor

Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session agent's pipeline to exchange frames with other agents via the `AgentBus`.
Expand Down
27 changes: 26 additions & 1 deletion subagents/fundamentals/agent-bus.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Think of it as an internal event bus -- agents publish messages and other agents

## Bus implementations

Pipecat Subagents provides two bus implementations:
Pipecat Subagents provides three bus implementations:

### AsyncQueueBus (local)

Expand Down Expand Up @@ -42,6 +42,31 @@ All processes that share the same Redis channel can exchange messages. The progr
`RedisBus` requires the `redis` extra: `uv add "pipecat-ai-subagents[redis]"`
</Info>

### PgmqBus (distributed)

An alternative distributed bus backed by PGMQ (PostgreSQL Message Queue). Each instance creates its own queue and broadcasts to peer queues.

```python
from pgmq.async_queue import PGMQueue
from pipecat_subagents.bus.network.pgmq import PgmqBus

pgmq = PGMQueue(
host="aws-0-us-east-1.pooler.supabase.com",
port="5432",
database="postgres",
username="postgres.<project-ref>",
password="...",
pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app")
runner = AgentRunner(bus=bus)
```

<Info>
`PgmqBus` requires the `pgmq` extra: `uv add "pipecat-ai-subagents[pgmq]"`
</Info>

## Message types

Messages on the bus fall into three categories:
Expand Down
34 changes: 31 additions & 3 deletions subagents/learn/distributed-agents.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ Distributed agents are agents connected to the **same bus** but running in diffe
Agents](/subagents/learn/proxy-agents) instead.
</Note>

## Setting up RedisBus
## Setting up a distributed bus

Pipecat Subagents provides two distributed bus implementations: RedisBus and PgmqBus. Choose based on your infrastructure.

### RedisBus

Each process creates its own `AgentRunner` with a `RedisBus` connected to the same Redis channel:

Expand All @@ -31,6 +35,30 @@ All runners sharing the same `channel` can exchange messages. Agents discover ea

<Info>Install the Redis extra: `uv add "pipecat-ai-subagents[redis]"`</Info>

### PgmqBus

Alternatively, use `PgmqBus` backed by PostgreSQL Message Queue:

```python
from pgmq.async_queue import PGMQueue
from pipecat_subagents.bus.network.pgmq import PgmqBus
from pipecat_subagents.runner import AgentRunner

pgmq = PGMQueue(
host="aws-0-us-east-1.pooler.supabase.com",
port="5432",
database="postgres",
username="postgres.<project-ref>",
password="...",
pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app")
runner = AgentRunner(bus=bus, handle_sigint=True)
```

<Info>Install the PGMQ extra: `uv add "pipecat-ai-subagents[pgmq]"`</Info>

## Example: distributed handoff

This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.
Expand Down Expand Up @@ -157,6 +185,6 @@ Runners exchange registry information automatically over the shared bus. To get

## Considerations

- **Latency**: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance.
- **Serialization**: `RedisBus` serializes messages to JSON. Custom frame types need to be registered with the serializer.
- **Latency**: Network buses add overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the bus server (Redis or PostgreSQL).
- **Serialization**: Both `RedisBus` and `PgmqBus` serialize messages to JSON. Custom frame types need to be registered with the serializer.
- **Single channel**: All agents on the same channel see all messages. Use different channels for different sessions or applications.
Loading