Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .claude/skills/bash/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
name: bash
description: How to use the shell successfully
allowed-tools: Bash
---

Execute shell commands inside their own bash shell, like so:

```bash
bash -c 'command'
```

This makes sure the shell is polluted by any outside state.
11 changes: 11 additions & 0 deletions .claude/skills/rust/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@ for i in 0..1000 {
}
```

### Import instead of using absolute paths

```rust
/// BAD
let v = tokio::net::TcpStream::connect("localhost:8080");

/// GOOD
use tokio::net::TcpStream;
let v = TcpStream::connect("localhost:8080");
```

### Profile Before Optimizing

```bash
Expand Down
47 changes: 46 additions & 1 deletion integration/copy_data/dev.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/bin/bash
set -e
trap 'kill 0' SIGINT SIGTERM
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog"
PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN}
Expand All @@ -11,11 +10,57 @@ export PGHOST=127.0.0.1
export PGPORT=5432
export PGPASSWORD=pgdog

cleanup() {
if [ -n "${REPL_PID}" ]; then
kill ${REPL_PID} 2>/dev/null || true
wait ${REPL_PID} 2>/dev/null || true
fi
}
trap cleanup EXIT

pushd ${SCRIPT_DIR}

psql -f init.sql

${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog
${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover

# Start replication in the background.
${PGDOG_BIN} data-sync --replicate-only --from-database source --to-database destination --publication pgdog &
REPL_PID=$!

# Give replication a moment to connect.
sleep 2

# Check that the replication process is still alive.
if ! kill -0 ${REPL_PID} 2>/dev/null; then
echo "ERROR: replication process exited early"
wait ${REPL_PID}
exit $?
fi

# Run pgbench against the source database — writes land on the source and
# get replicated to the destination shards via logical replication.
pgbench -h 127.0.0.1 -p 5432 -U pgdog pgdog \
-t 1000 -c 3 --protocol extended \
-f "${SCRIPT_DIR}/pgbench.sql" -P 1

# Let replication catch up.
sleep 3

# Stop replication and capture its exit code.
kill ${REPL_PID} 2>/dev/null || true
set +e
wait ${REPL_PID}
REPL_EXIT=$?
set -e
REPL_PID=""

# 0, 130 (SIGINT), 143 (SIGTERM) are all normal shutdown codes.
if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 ]; then
echo "ERROR: replication process exited with code ${REPL_EXIT}"
exit ${REPL_EXIT}
fi

popd
40 changes: 40 additions & 0 deletions integration/copy_data/pgbench.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
\set tenant_id random(1, 20)
\set user_id (1021 * random(10000, 100000))
\set order_id random(100001, 999999999)
\set log_id random(100001, 999999999)
\set order_amount random(1, 50000) / 100.0

-- Upsert a user for this tenant.
INSERT INTO copy_data.users (id, tenant_id, email, settings)
VALUES (:user_id, :tenant_id, 'bench_' || :user_id || '@example.com', '{"theme":"dark"}')
ON CONFLICT (id, tenant_id) DO UPDATE SET settings = EXCLUDED.settings;

-- Read the user back.
SELECT id, tenant_id, email, created_at FROM copy_data.users
WHERE id = :user_id AND tenant_id = :tenant_id;

-- Insert an order with an explicit large id.
INSERT INTO copy_data.orders (id, user_id, tenant_id, amount)
VALUES (:order_id, :user_id, :tenant_id, :order_amount)
ON CONFLICT (id) DO NOTHING;

-- Read recent orders for this tenant.
SELECT id, user_id, amount, created_at FROM copy_data.orders
WHERE tenant_id = :tenant_id ORDER BY created_at DESC LIMIT 5;

-- Update the user settings.
UPDATE copy_data.users SET settings = jsonb_set(settings, '{last_bench}', to_jsonb(now()::text))
WHERE id = :user_id AND tenant_id = :tenant_id;

-- Log an action with an explicit large id.
INSERT INTO copy_data.log_actions (id, tenant_id, action)
VALUES (:log_id, :tenant_id, 'bench')
ON CONFLICT (id) DO NOTHING;

-- Clean up everything we created.
DELETE FROM copy_data.orders WHERE id = :order_id;

DELETE FROM copy_data.log_actions WHERE id = :log_id;

DELETE FROM copy_data.users
WHERE id = :user_id AND tenant_id = :tenant_id AND email = 'bench_' || :user_id || '@example.com';
12 changes: 5 additions & 7 deletions pgdog/src/backend/pool/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,12 @@ impl Address {
},
password: if server_auth.rds_iam() {
String::new()
} else if let Some(password) = database.password.clone() {
password
} else if let Some(password) = user.server_password.clone() {
password
} else {
if let Some(password) = database.password.clone() {
password
} else if let Some(password) = user.server_password.clone() {
password
} else {
user.password().to_string()
}
user.password().to_string()
},
server_auth,
server_iam_region: user.server_iam_region.clone(),
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/replication/logical/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum Error {
#[error("out of sync during relation prepare, got {0}")]
RelationOutOfSync(char),

#[error("out of sync during row write, got {0}")]
SendOutOfSync(char),

#[error("missing data")]
MissingData,

Expand Down
25 changes: 12 additions & 13 deletions pgdog/src/backend/replication/logical/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ impl TableCopy {
state.bytes_per_sec = state.bytes / elapsed as usize;
}

data_sync_progress(&self, &state);
data_sync_progress(self, &state);
}
}

pub(crate) fn error(&self, error: &LogicalError) {
data_sync_error(&self, error);
data_sync_error(self, error);
}

pub(crate) fn update_sql(&self, sql: &str) {
Expand All @@ -83,7 +83,7 @@ impl TableCopy {

impl Drop for TableCopy {
fn drop(&mut self) {
data_sync_done(&self);
data_sync_done(self);
COPIES.copies.remove(self);
}
}
Expand Down Expand Up @@ -300,16 +300,15 @@ impl SchemaStatement {
}

pub(crate) fn running(&mut self) {
if let Some(entry) =
SchemaStatements::get()
.stmts
.remove(&self.task)
.and_then(|mut entry| {
entry.running = true;
entry.statement.started_at = Some(SystemTime::now());

Some(entry)
})
if let Some(entry) = SchemaStatements::get()
.stmts
.remove(&self.task)
.map(|mut entry| {
entry.running = true;
entry.statement.started_at = Some(SystemTime::now());

entry
})
{
self.task = entry.clone();
schema_sync_task(&self.task);
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/replication/logical/subscriber/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ pub mod copy;
pub mod parallel_connection;
pub mod stream;

#[cfg(test)]
mod tests;

pub use context::StreamContext;
pub use copy::CopySubscriber;
pub use parallel_connection::ParallelConnection;
Expand Down
Loading
Loading