Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pgdog/docs/issues/omni-table-subscriber-deadlock.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,20 @@ successive `[0.000 MB/sec]` log lines, confirms this deadlock.

## Solutions

### Solution 1: sequential per-destination apply in `send()`
### Solution 1: sequential per-destination apply in `send()` ✅ implemented

**What:** collapse the three loops in `send()` into a single loop that completes
the full write→read cycle for each destination before moving to the next.

Collapse the three loops in `send()` (`stream.rs:238-277`) into one that completes write→read per
destination before moving on:

```rust
for conn in &mut conns {
conn.send(&vec![bind.clone().into(), Execute::new().into(), Flush.into()].into()).await?;
conn.flush().await?;
for _ in 0..2 {
let msg = conn.read().await?;
// ... existing response handling ...
// ... response handling ...
}
}
```
Expand All @@ -99,6 +101,13 @@ until `Sync` in `commit()`), so two omni rows still produce a cross-row, cross-d
This is the case in `repro_deadlock.sh`. **Not sufficient alone** for any workload with multi-row
omni transactions.

**Parallelism cost:** sequential apply trades per-row latency for safety — destination
round-trips are now additive rather than overlapping. Parallel sends cannot be restored
without additional mechanisms: they break consistent lock ordering, allowing sub-0 to
acquire dest-0's lock while sub-1 acquires dest-1's lock before either reads back,
re-introducing the cycle. The structural fix is destination-partitioned apply (see
below), which assigns disjoint destination shards to each subscriber so parallel sends
within a subscriber are safe.
---

### Solution 2: `lock_timeout` on destination connections
Expand Down
25 changes: 8 additions & 17 deletions pgdog/src/backend/replication/logical/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,28 +223,19 @@ impl StreamSubscriber {

// Dispatch a pre-built bind to the matching shard(s).
async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> {
let mut conns: Vec<_> = self
.connections
.iter_mut()
.enumerate()
.filter(|(shard, _)| match val {
Shard::Direct(direct) => *shard == *direct,
Shard::Multi(multi) => multi.contains(shard),
for (shard, conn) in self.connections.iter_mut().enumerate() {
let routes = match val {
Shard::Direct(direct) => shard == *direct,
Shard::Multi(multi) => multi.contains(&shard),
_ => true,
})
.map(|(_, server)| server)
.collect();
};
if !routes {
continue;
}

for conn in &mut conns {
conn.send(&vec![bind.clone().into(), Execute::new().into(), Flush.into()].into())
.await?;
}

for conn in &mut conns {
conn.flush().await?;
}

for conn in &mut conns {
// Keep server connections always synchronized.
for _ in 0..2 {
let msg = conn.read().await?;
Expand Down
Loading