Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .schema/pgdog.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
"resharding_copy_retry_max_attempts": 5,
"resharding_copy_retry_min_delay": 1000,
"resharding_parallel_copies": 1,
"resharding_replication_retry_max_attempts": 5,
"resharding_replication_retry_min_delay": 1000,
"rollback_timeout": 5000,
"server_lifetime": 86400000,
"shutdown_termination_timeout": null,
Expand Down Expand Up @@ -402,6 +404,15 @@
"maximum": 255,
"minimum": 0
},
"lock_timeout": {
"description": "This setting configures the `lock_timeout` connection parameter on all connections to Postgres for this database.\nAborts any statement that waits longer than the specified duration to acquire a lock.\nUnlike `statement_timeout`, this only counts time spent waiting for locks, not execution time.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/databases/#lock_timeout",
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0
},
"min_pool_size": {
"description": "Overrides the `min_pool_size` setting. The connection pool will maintain at minimum this many connections.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/databases/#min_pool_size",
"type": [
Expand Down Expand Up @@ -1002,6 +1013,20 @@
"default": 1,
"minimum": 0
},
"resharding_replication_retry_max_attempts": {
"description": "Maximum number of consecutive replication-subscriber errors tolerated before\nthe source error is propagated. Each failure triggers `slot.reconnect()`,\nafter which Postgres re-streams every event since the last acked commit.\n`0` retries indefinitely.\n\n_Default:_ `5`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_max_attempts",
"type": "integer",
"format": "uint",
"default": 5,
"minimum": 0
},
"resharding_replication_retry_min_delay": {
"description": "Delay in milliseconds between replication subscriber retry attempts.\n\n_Default:_ `1000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_min_delay",
"type": "integer",
"format": "uint64",
"default": 1000,
"minimum": 0
},
"rollback_timeout": {
"description": "How long to allow for `ROLLBACK` queries to run on server connections with unfinished transactions.\n\n_Default:_ `5000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#rollback_timeout",
"type": "integer",
Expand Down
9 changes: 9 additions & 0 deletions .schema/users.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@
"format": "uint64",
"minimum": 0
},
"lock_timeout": {
"description": "Lock timeout.\n\nSets the `lock_timeout` on all server connections at connection creation.\nAborts any statement that waits longer than the specified duration to acquire a lock.\nUnlike `statement_timeout`, this only counts time spent waiting for locks, not execution time.\nRecommended for replication destination connections to prevent cross-shard deadlocks\nfrom hanging indefinitely.\n\n**Note:** Nothing is preventing the user from manually changing this setting at runtime,\ne.g., by running `SET lock_timeout TO 0`;\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#lock_timeout",
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0
},
"min_pool_size": {
"description": "Overrides [`min_pool_size`](https://docs.pgdog.dev/configuration/pgdog.toml/general/#min_pool_size) for this user. Opens at least this many connections on pooler startup and keeps them open despite [`idle_timeout`](https://docs.pgdog.dev/configuration/pgdog.toml/general/#idle_timeout).\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#min_pool_size",
"type": [
Expand Down
77 changes: 76 additions & 1 deletion docs/REPLICATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,79 @@ Postgres re-emits after reconnect.
`Sync` would not work: it commits when Postgres saw no error, but PgDog raises some errors
(e.g. `FullIdentityMissingOld` on a missing OLD tuple) *after* a successful `CommandComplete`.
FATAL disconnect is the only signal that rolls back regardless. Connections come from
`Pool::standalone`, so dropping them closes the socket instead of returning to a pool.
`Pool::standalone`, so dropping them closes the socket instead of returning to a pool.

---

## Retry and reconnect

The publisher loop retries transient errors without restarting the whole resharding pipeline.

### Configuration

| Parameter | Env var | Default | Meaning |
|---|---|---|---|
| `resharding_replication_retry_max_attempts` | `PGDOG_RESHARDING_REPLICATION_RETRY_MAX_ATTEMPTS` | 0 | Maximum retry attempts; `0` = retry indefinitely |
| `resharding_replication_retry_min_delay` | `PGDOG_RESHARDING_REPLICATION_RETRY_MIN_DELAY` | — | Fixed sleep between attempts |

### What is retried

Any error that passes `err.is_retryable()` — typically network-level failures on either the source
replication connection or a destination shard connection. Non-retryable errors (schema mismatches,
slot invalidation, exceeded attempt count) propagate immediately and abort the task.

### Two-step reconnect

A single retry performs two independent reconnects:

1. **`slot.reconnect()`** — drops and re-establishes the source replication connection.
`slot.lsn` is preserved; Postgres re-streams from that position via
`START_REPLICATION SLOT … LOGICAL <slot.lsn>`.

2. **`stream.reconnect()`** — tears down every destination `Server` handle.
Dropping each handle sends `Terminate`; Postgres rolls back any open implicit transaction
on that shard. Clears `in_transaction`, `changed_tables`, and the per-session caches
(`relations`, `statements`, `keys`). Then calls `connect()` to open fresh connections and
re-prepare named statements.

Step 2 is essential when a failure occurs mid-transaction. Without it, destination shards hold
an open implicit transaction containing partial DML. Postgres re-delivers the same `Begin` +
DML + `Commit` sequence on the new source connection, and those rows would be appended to the
already-dirty destination transaction — producing duplicates or constraint errors.

### LSN tracking

`StreamSubscriber` keeps two positions. `lsn` advances on `Begin` (to the future commit LSN) and is used for in-flight deduplication. `committed_lsn` advances only after `commit()` confirms all destination shards — this is what `status_update()` reports to Postgres.

The split matters for KeepAlive: if the reply used `lsn`, Postgres would record a future commit LSN as `confirmed_flush_lsn` while the transaction is still open. On reconnect that would skip the open transaction entirely. `committed_lsn` ensures re-delivery always starts from the last safely committed position.

### Full retry flow

```mermaid
sequenceDiagram
participant PG as Source Postgres
participant PUB as Publisher
participant SS as StreamSubscriber
participant DS as Destination shards

PG->>PUB: Begin (final_lsn=200)
PUB->>SS: handle(Begin)
SS->>DS: Bind/Execute row DML (implicit txn open)
Note over PUB: transient error

Note over PUB: retry: sleep(delay)
PUB->>PG: slot.reconnect() → START_REPLICATION from committed_lsn=100
PUB->>SS: stream.reconnect()
SS->>DS: Terminate (implicit txn rolled back)
SS->>DS: connect() — fresh connections, named stmts re-prepared

PG->>PUB: Begin (final_lsn=200) [re-delivered]
PUB->>SS: handle(Begin)
SS->>DS: Bind/Execute row DML
PG->>PUB: Commit (end_lsn=200)
PUB->>SS: handle(Commit)
SS->>DS: Sync
DS-->>SS: ReadyForQuery
SS->>PUB: StatusUpdate(committed_lsn=200)
PUB->>PG: StandbyStatusUpdate(last_flushed=200)
```
49 changes: 47 additions & 2 deletions integration/copy_data/retry_test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ shard0_count() { shard0_psql -tAc "SELECT COUNT(*) FROM $1"; }
shard1_count() { shard1_psql -tAc "SELECT COUNT(*) FROM $1"; }

# Pass a psql helper as $1; checks whether the canary row is present on that node.
has_canary() { local fn=$1; "${fn}" -tAc "SELECT 1 FROM copy_data.settings WHERE setting_name='${CANARY}' LIMIT 1" 2>/dev/null | grep -q 1; }
has_canary() { local fn=$1 name=${2:-${CANARY}}; "${fn}" -tAc "SELECT 1 FROM copy_data.settings WHERE setting_name='${name}' LIMIT 1" 2>/dev/null | grep -q 1; }

pushd "${COMPOSE_DIR}"

Expand Down Expand Up @@ -199,6 +199,51 @@ if [ "${CANARY_DELIVERED}" -ne 1 ]; then
fi
echo "[retry_test] OK: canary replicated to both shards."

# Assertion 3: replication survives a destination shard outage.
RETRY_CANARY="repl_retry_$(date +%s)_$$"
echo "[retry_test] Killing shard_0 to test replication retry..."
docker compose kill shard_0

echo "[retry_test] Inserting retry canary ${RETRY_CANARY} into source..."
src_psql -c "INSERT INTO copy_data.settings (setting_name, setting_value) VALUES ('${RETRY_CANARY}', 'repl_retry');"

sleep 2

echo "[retry_test] Starting shard_0..."
docker compose start shard_0

READY_ATTEMPTS=0
until pg_isready -h 127.0.0.1 -p 15433 -U pgdog -d pgdog1 -q; do
READY_ATTEMPTS=$((READY_ATTEMPTS + 1))
if [ "${READY_ATTEMPTS}" -ge 120 ]; then
echo "[retry_test] FAIL: shard_0 not ready after $((READY_ATTEMPTS / 2))s"
exit 1
fi
sleep 0.5
done
echo "[retry_test] shard_0 is ready."

RETRY_DELIVERED=0
for _ in $(seq 1 "${REPLICATION_TIMEOUT}"); do
if has_canary shard0_psql "${RETRY_CANARY}" && has_canary shard1_psql "${RETRY_CANARY}"; then
RETRY_DELIVERED=1
break
fi
if ! kill -0 "${PGDOG_PID}" 2>/dev/null; then
echo "[retry_test] FAIL: pgdog server exited while waiting for retry canary"
exit 1
fi
sleep 1
done

if [ "${RETRY_DELIVERED}" -ne 1 ]; then
echo "[retry_test] FAIL: retry canary ${RETRY_CANARY} not replicated within ${REPLICATION_TIMEOUT}s"
echo "[retry_test] replication did not retry and recover after shard_0 outage"
admin_psql -c 'SHOW REPLICATION_SLOTS;' || true
exit 1
fi
echo "[retry_test] OK: retry canary replicated to both shards after shard_0 outage."

# Verify row counts.
# Sharded tables: sum across both destination shards must equal source.
SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity"
Expand Down Expand Up @@ -237,7 +282,7 @@ if [ "${FAILED}" -ne 0 ]; then
exit 1
fi

echo "[retry_test] PASS: COPY_DATA survived shard outage + pool reload; replication delivered canary."
echo "[retry_test] PASS: COPY_DATA survived shard outage + pool reload; replication delivered canary; replication retried after destination shard outage."

# Stop pgdog cleanly before tearing down compose.
kill "${PGDOG_PID}" 2>/dev/null || true
Expand Down
3 changes: 3 additions & 0 deletions integration/resharding/pgdog.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[general]
resharding_replication_retry_min_delay = 100

[[databases]]
name = "source"
host = "127.0.0.1"
Expand Down
1 change: 1 addition & 0 deletions integration/resharding/users.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ database = "destination"
name = "pgdog"
password = "pgdog"
schema_admin = true
lock_timeout = 100
6 changes: 6 additions & 0 deletions pgdog-config/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ pub struct Database {
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#statement_timeout
pub statement_timeout: Option<u64>,
/// This setting configures the `lock_timeout` connection parameter on all connections to Postgres for this database.
/// Aborts any statement that waits longer than the specified duration to acquire a lock.
/// Unlike `statement_timeout`, this only counts time spent waiting for locks, not execution time.
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#lock_timeout
pub lock_timeout: Option<u64>,
/// Overrides the `idle_timeout` setting. Idle server connections exceeding this timeout will be closed automatically.
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#idle_timeout
Expand Down
30 changes: 30 additions & 0 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,25 @@ pub struct General {
#[serde(default = "General::resharding_copy_retry_min_delay")]
pub resharding_copy_retry_min_delay: u64,

/// Maximum number of consecutive replication-subscriber errors tolerated before
/// the source error is propagated. Each failure triggers `slot.reconnect()`,
/// after which Postgres re-streams every event since the last acked commit.
/// `0` retries indefinitely.
///
/// _Default:_ `5`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_max_attempts
#[serde(default = "General::resharding_replication_retry_max_attempts")]
pub resharding_replication_retry_max_attempts: usize,

/// Delay in milliseconds between replication subscriber retry attempts.
///
/// _Default:_ `1000`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_min_delay
#[serde(default = "General::resharding_replication_retry_min_delay")]
pub resharding_replication_retry_min_delay: u64,

/// Automatically reload the schema cache used by PgDog to route queries upon detecting DDL statements.
///
/// **Note:** This setting requires PgDog Enterprise Edition to work as expected. If using the open source edition, it will only work with single-node PgDog deployments, e.g., in local development or CI.
Expand Down Expand Up @@ -800,6 +819,9 @@ impl Default for General {
resharding_parallel_copies: Self::resharding_parallel_copies(),
resharding_copy_retry_max_attempts: Self::resharding_copy_retry_max_attempts(),
resharding_copy_retry_min_delay: Self::resharding_copy_retry_min_delay(),
resharding_replication_retry_max_attempts:
Self::resharding_replication_retry_max_attempts(),
resharding_replication_retry_min_delay: Self::resharding_replication_retry_min_delay(),
reload_schema_on_ddl: Self::reload_schema_on_ddl(),
load_schema: Self::load_schema(),
cutover_replication_lag_threshold: Self::cutover_replication_lag_threshold(),
Expand Down Expand Up @@ -1045,6 +1067,14 @@ impl General {
1000
}

fn resharding_replication_retry_max_attempts() -> usize {
Self::env_or_default("PGDOG_RESHARDING_REPLICATION_RETRY_MAX_ATTEMPTS", 5)
}

fn resharding_replication_retry_min_delay() -> u64 {
Self::env_or_default("PGDOG_RESHARDING_REPLICATION_RETRY_MIN_DELAY", 1000)
}

fn default_shutdown_termination_timeout() -> Option<u64> {
Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT")
}
Expand Down
13 changes: 13 additions & 0 deletions pgdog-config/src/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ pub struct User {
///
/// https://docs.pgdog.dev/configuration/users.toml/users/#statement_timeout
pub statement_timeout: Option<u64>,
/// Lock timeout.
///
/// Sets the `lock_timeout` on all server connections at connection creation.
/// Aborts any statement that waits longer than the specified duration to acquire a lock.
/// Unlike `statement_timeout`, this only counts time spent waiting for locks, not execution time.
/// Recommended for replication destination connections to prevent cross-shard deadlocks
/// from hanging indefinitely.
///
/// **Note:** Nothing is preventing the user from manually changing this setting at runtime,
/// e.g., by running `SET lock_timeout TO 0`;
///
/// https://docs.pgdog.dev/configuration/users.toml/users/#lock_timeout
pub lock_timeout: Option<u64>,
/// Sets the `replication=database` parameter on user connections to Postgres. Allows this user to use replication commands.
///
/// _Default:_ `false`
Expand Down
3 changes: 3 additions & 0 deletions pgdog-stats/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ pub struct Config {
pub rollback_timeout: Duration,
/// Statement timeout
pub statement_timeout: Option<Duration>,
/// Lock timeout
pub lock_timeout: Option<Duration>,
/// Replication mode.
pub replication_mode: bool,
/// Pooler mode.
Expand Down Expand Up @@ -360,6 +362,7 @@ impl Default for Config {
ban_timeout: Duration::from_secs(300),
rollback_timeout: Duration::from_secs(5),
statement_timeout: None,
lock_timeout: None,
replication_mode: false,
pooler_mode: PoolerMode::default(),
read_only: false,
Expand Down
25 changes: 25 additions & 0 deletions pgdog/docs/issues/omni-table-subscriber-deadlock.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,28 @@ is neither.

Ship 1+2 short-term; pursue destination-partitioned apply long-term. Solutions 3 and 4 trade the
deadlock for worse failure modes and throughput. Solution 5 is orthogonal.


---

## Applied solution

**Solution 2 (`lock_timeout`) is in effect.** `lock_timeout` propagates from the pool config
(`database.lock_timeout` / `user.lock_timeout` in `pgdog-config`) through `Pool::standalone()`
to all destination shard connections, including replication ones. No separate replication-specific
knob is needed; configure it on the destination database or user entry.

**Retry and reconnect is in effect.** When a retryable error occurs (including a lock-timeout
abort), the publisher loop calls `slot.reconnect()` to re-establish the source stream from the
last committed LSN, then `stream.reconnect()` to close and reopen all destination connections.
This rolls back any open implicit transaction on the destinations before re-delivery, preventing
duplicate DML on retry. See `docs/REPLICATION.md` → *Retry and reconnect*.

**Solution 1 (sequential per-destination apply) is not yet applied.** The three-loop `send()`
in `stream.rs` still fans out write, flush, and read as separate passes. Multi-row omni
transactions remain susceptible to the deadlock; `lock_timeout` bounds recovery time but does
not prevent it.

**Status:** lock-timeout + retry gives bounded, recoverable deadlocks for all cases.
Single-row single-cycle deadlocks are the primary concern in practice; Solution 1 would
eliminate those structurally. Destination-partitioned apply remains the long-term fix.
Loading
Loading