Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions internal/consistency/diff/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,17 @@ func (t *TableDiffTask) fetchRows(nodeName string, r Range) ([]types.OrderedMap,
} else {
processedVal = nil
}
case pgtype.UUID:
if v.Status == pgtype.Present {
processedVal = fmt.Sprintf("%x-%x-%x-%x-%x",
v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16])
} else {
processedVal = nil
}
case [16]byte: // pgx/v5 returns UUIDs as [16]byte
// nil caught above
processedVal = fmt.Sprintf("%x-%x-%x-%x-%x",
v[0:4], v[4:6], v[6:8], v[8:10], v[10:16])
case time.Time:
processedVal = v
case string:
Expand Down
171 changes: 171 additions & 0 deletions tests/integration/table_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func TestTableDiffSimplePK(t *testing.T) {
t.Run("VariousDataTypes", func(t *testing.T) {
testTableDiff_VariousDataTypes(t, false)
})
t.Run("UUIDColumn", func(t *testing.T) {
testTableDiff_UUIDColumn(t, false)
})
t.Run("ByteaColumnSizeCheck", func(t *testing.T) {
testTableDiff_ByteaColumnSizeCheck(t, false)
})
Expand Down Expand Up @@ -98,6 +101,9 @@ func TestTableDiffCompositePK(t *testing.T) {
t.Run("VariousDataTypes", func(t *testing.T) {
testTableDiff_VariousDataTypes(t, true)
})
t.Run("UUIDColumn", func(t *testing.T) {
testTableDiff_UUIDColumn(t, true)
})
t.Run("ByteaColumnSizeCheck", func(t *testing.T) {
testTableDiff_ByteaColumnSizeCheck(t, true)
})
Expand Down Expand Up @@ -913,6 +919,171 @@ CREATE TABLE IF NOT EXISTS %s.%s (
log.Println("TestTableDiff_VariousDataTypes completed.")
}

func testTableDiff_UUIDColumn(t *testing.T, compositeKey bool) {
ctx := context.Background()
tableName := "uuid_test_table"
qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName)

compositeKeyPart := ""
if compositeKey {
compositeKeyPart = ", name"
}

createTableSQL := fmt.Sprintf(`
CREATE SCHEMA IF NOT EXISTS "%s";
CREATE TABLE IF NOT EXISTS %s.%s (
id INT,
name TEXT,
col_uuid UUID,
PRIMARY KEY(id%s)
);`, testSchema, testSchema, tableName, compositeKeyPart)

for i, pool := range []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} {
nodeName := pgCluster.ClusterNodes[i]["Name"].(string)
_, err := pool.Exec(ctx, createTableSQL)
if err != nil {
t.Fatalf("Failed to create uuid_test_table on node %s: %v", nodeName, err)
}
_, err = pool.Exec(ctx, fmt.Sprintf("TRUNCATE TABLE %s.%s CASCADE", testSchema, tableName))
if err != nil {
t.Fatalf("Failed to truncate uuid_test_table on node %s: %v", nodeName, err)
}
addToRepSetSQL := fmt.Sprintf(`SELECT spock.repset_add_table('default', '%s');`, qualifiedTableName)
_, err = pool.Exec(ctx, addToRepSetSQL)
if err != nil {
t.Fatalf("Failed to add table to replication set on %s: %v", nodeName, err)
}
}
log.Printf("Table %s created on both nodes", qualifiedTableName)

t.Cleanup(func() {
removeFromRepSetSQL := fmt.Sprintf(`SELECT spock.repset_remove_table('default', '%s');`, qualifiedTableName)
_, err := pgCluster.Node1Pool.Exec(ctx, removeFromRepSetSQL)
if err != nil {
t.Logf("cleanup: failed to remove table from replication set: %v", err)
}
for _, pool := range []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} {
_, err := pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", qualifiedTableName))
if err != nil {
t.Logf("Failed to drop test table %s: %v", qualifiedTableName, err)
}
}
files, _ := filepath.Glob("*_diffs-*.json")
for _, f := range files {
os.Remove(f)
}
})

// Row 1: same on both nodes
row1UUID := "550e8400-e29b-41d4-a716-446655440001"
// Row 2: different UUID on each node
row2UUIDNode1 := "550e8400-e29b-41d4-a716-446655440002"
row2UUIDNode2 := "660e8400-e29b-41d4-a716-446655440002"

insertSQL := fmt.Sprintf("INSERT INTO %s.%s (id, name, col_uuid) VALUES ($1, $2, $3)", testSchema, tableName)

insertRow := func(pool *pgxpool.Pool, id int, name string, uuid string) {
tx, err := pool.Begin(ctx)
if err != nil {
t.Fatalf("Failed to begin transaction: %v", err)
}
defer tx.Rollback(ctx)

_, err = tx.Exec(ctx, "SELECT spock.repair_mode(true)")
if err != nil {
t.Fatalf("Failed to enable spock repair mode: %v", err)
}
_, err = tx.Exec(ctx, insertSQL, id, name, uuid)
if err != nil {
t.Fatalf("Failed to insert row id %d: %v", id, err)
}
_, err = tx.Exec(ctx, "SELECT spock.repair_mode(false)")
if err != nil {
t.Fatalf("Failed to disable spock repair mode: %v", err)
}
if err = tx.Commit(ctx); err != nil {
t.Fatalf("Failed to commit transaction: %v", err)
}
}

// Insert same row on both nodes
insertRow(pgCluster.Node1Pool, 1, "same", row1UUID)
insertRow(pgCluster.Node2Pool, 1, "same", row1UUID)

// Insert row with different UUID on each node
insertRow(pgCluster.Node1Pool, 2, "different", row2UUIDNode1)
insertRow(pgCluster.Node2Pool, 2, "different", row2UUIDNode2)

log.Printf("Data loaded into %s with UUID variations", qualifiedTableName)

nodesToCompare := []string{serviceN1, serviceN2}
tdTask := newTestTableDiffTask(t, qualifiedTableName, nodesToCompare)

err := tdTask.RunChecks(false)
if err != nil {
t.Fatalf("table-diff validations and checks failed: %v", err)
}

if err := tdTask.ExecuteTask(); err != nil {
t.Fatalf("ExecuteTask failed for UUID table: %v", err)
}

pairKey := serviceN1 + "/" + serviceN2
if strings.Compare(serviceN1, serviceN2) > 0 {
pairKey = serviceN2 + "/" + serviceN1
}

nodeDiffs, ok := tdTask.DiffResult.NodeDiffs[pairKey]
if !ok {
t.Fatalf("Expected diffs for pair %s, but none found. Result: %+v", pairKey, tdTask.DiffResult)
}

// Should have 1 diff (row 2 with different UUIDs)
if len(nodeDiffs.Rows[serviceN1]) != 1 {
t.Errorf("Expected 1 row in diffs for %s, got %d. Rows: %+v",
serviceN1, len(nodeDiffs.Rows[serviceN1]), nodeDiffs.Rows[serviceN1])
}
if len(nodeDiffs.Rows[serviceN2]) != 1 {
t.Errorf("Expected 1 row in diffs for %s, got %d. Rows: %+v",
serviceN2, len(nodeDiffs.Rows[serviceN2]), nodeDiffs.Rows[serviceN2])
}

// Verify the UUID is formatted correctly (as a string in standard UUID format)
for _, row := range nodeDiffs.Rows[serviceN1] {
uuid, ok := row.Get("col_uuid")
if !ok {
t.Errorf("col_uuid not found in diff row")
continue
}
uuidStr, ok := uuid.(string)
if !ok {
t.Errorf("Expected col_uuid to be string, got %T: %v", uuid, uuid)
continue
}
if uuidStr != row2UUIDNode1 {
t.Errorf("Expected UUID %s, got %s", row2UUIDNode1, uuidStr)
}
}

for _, row := range nodeDiffs.Rows[serviceN2] {
uuid, ok := row.Get("col_uuid")
if !ok {
t.Errorf("col_uuid not found in diff row")
continue
}
uuidStr, ok := uuid.(string)
if !ok {
t.Errorf("Expected col_uuid to be string, got %T: %v", uuid, uuid)
continue
}
if uuidStr != row2UUIDNode2 {
t.Errorf("Expected UUID %s, got %s", row2UUIDNode2, uuidStr)
}
}

log.Println("TestTableDiff_UUIDColumn completed.")
}

func testTableDiff_TableFiltering(t *testing.T) {
ctx := context.Background()
tableName := "customers"
Expand Down
Loading