Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions e2e/add_node_data_safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
// causes the apply worker to start from the beginning of WAL, producing
// duplicate-key errors or silently overwriting rows.
//
// Covers: Change 1 — EnsureReplicationOriginExists + AdvanceReplicationOrigin
// wired into ReplicationSlotAdvanceFromCTSResource.
// Covers: ReplicationOriginAdvanceResource which ensures the replication
// origin exists and is advanced to the same LSN as the replication slot.
// This resource depends on ReplicationSlotAdvanceFromCTSResource which runs
// on the provider's host, while origin advancement runs on the subscriber's
// host (since cross-host connections are not allowed).
func TestAddNodeOriginAdvanced(t *testing.T) {
t.Parallel()

Expand All @@ -30,10 +33,11 @@ func TestAddNodeOriginAdvanced(t *testing.T) {
dbName = "origin_adv_db"
)

ctx, cancel := context.WithTimeout(t.Context(), 7*time.Minute)
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel()

hostIDs := fixture.HostIDs()
t.Log("Step 1: Creating 2-node database fixture")
db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{
Spec: &controlplane.DatabaseSpec{
DatabaseName: dbName,
Expand All @@ -51,9 +55,11 @@ func TestAddNodeOriginAdvanced(t *testing.T) {
},
},
})
t.Logf("Database created: %s", db.ID)

// Write rows on n2 so its WAL position is meaningfully ahead of the slot's
// consistent_point. This gives the origin advancement a non-trivial LSN.
t.Log("Step 2: Writing 100 test rows to n2")
n2Opts := ConnectionOptions{
Matcher: And(WithNode("n2"), WithRole("primary")),
Username: username,
Expand All @@ -68,18 +74,27 @@ func TestAddNodeOriginAdvanced(t *testing.T) {
require.NoError(t, err)
}
})
t.Log("Test data written successfully")

// Add n3 with n1 as source.
t.Log("Step 3: Adding n3 node with n1 as source")
db.Spec.Nodes = append(db.Spec.Nodes, &controlplane.DatabaseNodeSpec{
Name: "n3",
HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[2])},
SourceNode: pointerTo("n1"),
})
t.Log("Starting database update to add n3 (this may take several minutes)")
require.NoError(t, db.Update(ctx, UpdateOptions{Spec: db.Spec}))
t.Log("Database update completed, n3 node added successfully")

t.Log("Step 3b: Waiting for replication to complete across all nodes")
db.WaitForReplication(ctx, t, username, password)
t.Log("Replication complete")

// The replication slot spk_<db>_n2_sub_n2_n3 lives on n2.
// The origin with the same name lives on n3 (subscriber side).
slotName := e2eReplicationSlotName(dbName, "n2", "n3")
t.Logf("Step 4: Checking replication origin on n3 (slot name: %s)", slotName)

n3Opts := ConnectionOptions{
Matcher: And(WithNode("n3"), WithRole("primary")),
Expand All @@ -98,12 +113,14 @@ func TestAddNodeOriginAdvanced(t *testing.T) {
)`, slotName,
).Scan(&lsn)
require.NoError(t, err)
t.Logf("Replication origin LSN: %s (expected: not 0/0)", lsn)

assert.NotEqual(t, "0/0", lsn,
"replication origin %q on n3 should be advanced past 0/0 (got %s); "+
"a zeroed origin risks the apply worker replaying historical WAL",
slotName, lsn)
})
t.Log("Test completed successfully")
}

// e2eReplicationSlotName mirrors postgres.ReplicationSlotName without
Expand Down