Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ e2e_args=-tags=e2e_test -count=1 -timeout=45m \
$(if $(filter 1,$(E2E_DEBUG)),-debug) \
$(if $(E2E_DEBUG_DIR),-debug-dir $(E2E_DEBUG_DIR))

cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \
cluster_test_args=-tags=cluster_test -count=1 -timeout=15m \
$(if $(CLUSTER_TEST_PARALLEL),-parallel $(CLUSTER_TEST_PARALLEL)) \
$(if $(CLUSTER_TEST_RUN),-run $(CLUSTER_TEST_RUN)) \
-args \
Expand Down
3 changes: 3 additions & 0 deletions changes/unreleased/Added-20260504-150235.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: Added a feature to enable manual Postgres minor version updates in systemd clusters. The Control Plane will now update its copy of the database spec when it detects changes to an instance's Postgres or Spock version.
time: 2026-05-04T15:02:35.045407-04:00
3 changes: 3 additions & 0 deletions changes/unreleased/Changed-20260504-152348.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Changed
body: Changed the instance monitoring system to query the Postgres and Spock versions for replica instances and report them in the databases API.
time: 2026-05-04T15:23:48.324604-04:00
292 changes: 292 additions & 0 deletions clustertest/external_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
//go:build cluster_test

package clustertest

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane"
"github.com/pgEdge/control-plane/client"
)

func TestExternalUpgrade(t *testing.T) {
// Tests that the control plane updates its records when the user upgrades
// a database outside of our API.
t.Parallel()
ctx := t.Context()

const (
startPostgresVersion string = "18.2"
upgradePostgresVersion string = "18.3"
upgradeImage string = "ghcr.io/pgedge/pgedge-postgres:18.3-spock5.0.6-standard-1"
spockVersion string = "5"
sleepDuration time.Duration = 5 * time.Second
)

// Helper functions
assertSpecVersions := func(t *testing.T, spec *controlplane.DatabaseSpec, expectedSpecVersion string, expectedNodeVersions map[string]string) {
t.Helper()

actualNodeVersions := make(map[string]string, len(spec.Nodes))
for _, node := range spec.Nodes {
var version string
if node.PostgresVersion != nil {
version = *node.PostgresVersion
}
actualNodeVersions[node.Name] = version
}
var actualSpecVersion string
if spec.PostgresVersion != nil {
actualSpecVersion = *spec.PostgresVersion
}
require.Equal(t, expectedSpecVersion, actualSpecVersion)
require.Equal(t, expectedNodeVersions, actualNodeVersions)
}
assertInstanceVersions := func(t *testing.T, instances []*controlplane.Instance, expectedNodeHostVersions map[string]map[string]string) {
t.Helper()

actualNodeHostVersions := map[string]map[string]string{}
for _, instance := range instances {
require.Equal(t, client.InstanceStateAvailable, instance.State)

if _, ok := actualNodeHostVersions[instance.NodeName]; !ok {
actualNodeHostVersions[instance.NodeName] = map[string]string{}
}
var version string
if instance.Postgres.Version != nil {
version = *instance.Postgres.Version
}
actualNodeHostVersions[instance.NodeName][instance.HostID] = version
}
require.Equal(t, expectedNodeHostVersions, actualNodeHostVersions)
}
upgradeService := func(t *testing.T, databaseID, nodeName, hostID string) {
t.Helper()

tLogf(t, "upgrading %s %s instance", nodeName, hostID)

ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
defer cancel()

serviceName := dockerCmd(t, ctx,
"service",
"ls",
fmt.Sprintf("--filter=label=pgedge.database.id=%s", databaseID),
fmt.Sprintf("--filter=label=pgedge.node.name=%s", nodeName),
fmt.Sprintf("--filter=label=pgedge.host.id=%s", hostID),
"--format={{.Name}}",
)
require.NotEmpty(t, serviceName)
dockerCmd(t, ctx,
"service",
"update",
fmt.Sprintf("--image=%s", upgradeImage),
// disabling healthchecks to speed up startup time
"--no-healthcheck",
serviceName,
)
}

env := map[string]string{
"PGEDGE_DATABASES_MONITOR_INTERVAL_SECONDS": "3",
}
cluster := NewCluster(t, ClusterConfig{
Hosts: []HostConfig{
{ID: "host-1", ExtraEnv: env},
{ID: "host-2", ExtraEnv: env},
{ID: "host-3", ExtraEnv: env},
},
})
cluster.Init(t)

spec := &controlplane.DatabaseSpec{
DatabaseName: "test_upgrade",
PostgresVersion: pointerTo(startPostgresVersion),
SpockVersion: pointerTo(spockVersion),
Nodes: []*controlplane.DatabaseNodeSpec{
{
Name: "n1",
HostIds: []controlplane.Identifier{"host-1", "host-2"},
},
{
Name: "n2",
HostIds: []controlplane.Identifier{"host-3"},
},
},
}

tLog(t, "creating database")

createResp, err := cluster.Client().CreateDatabase(ctx, &controlplane.CreateDatabaseRequest{
Spec: spec,
})
require.NoError(t, err)

databaseID := createResp.Database.ID

t.Cleanup(func() {
// Use a new context for cleanup operations since t.Context is canceled.
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()

if testConfig.skipCleanup {
tLogf(t, "skipping cleanup for database '%s'", databaseID)
return
}

tLogf(t, "cleaning up database '%s'", databaseID)

resp, err := cluster.Client().DeleteDatabase(ctx, &controlplane.DeleteDatabasePayload{
DatabaseID: databaseID,
})
if err != nil {
tLogf(t, "failed to delete database '%s': %v", databaseID, err)
return
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

tLog(t, "waiting for database deletion to complete")

err = waitForTaskComplete(ctx, cluster.Client(), databaseID, resp.Task.TaskID, time.Minute)
if err != nil {
tLogf(t, "failed while waiting for database deletion '%s'", databaseID)
return
}
})

tLog(t, "waiting for database creation to complete")

err = waitForTaskComplete(ctx, cluster.Client(), databaseID, createResp.Task.TaskID, 3*time.Minute)
require.NoError(t, err)

tLog(t, "sleeping to allow instance monitor interval to complete")

time.Sleep(sleepDuration)

tLogf(t, "asserting that all instances and spec versions are %s", startPostgresVersion)

db, err := cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{
DatabaseID: databaseID,
})
require.NoError(t, err)

assertSpecVersions(t, db.Spec, startPostgresVersion, map[string]string{
"n1": "",
"n2": "",
})
assertInstanceVersions(t, db.Instances, map[string]map[string]string{
"n1": map[string]string{
"host-1": startPostgresVersion,
"host-2": startPostgresVersion,
},
"n2": map[string]string{
"host-3": startPostgresVersion,
},
})

tLog(t, "getting database docker service names")

upgradeService(t, string(databaseID), "n1", "host-2")
upgradeService(t, string(databaseID), "n2", "host-3")

tLog(t, "sleeping to allow instance monitor interval and version reconciliation to complete")

time.Sleep(sleepDuration)

tLogf(t, "asserting that n2 is %s in the spec and that the n1-host-2 and n2-host-3 instances are %s", upgradePostgresVersion, upgradePostgresVersion)

db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{
DatabaseID: databaseID,
})
require.NoError(t, err)

assertSpecVersions(t, db.Spec, startPostgresVersion, map[string]string{
"n1": "",
"n2": upgradePostgresVersion,
})
assertInstanceVersions(t, db.Instances, map[string]map[string]string{
"n1": map[string]string{
"host-1": startPostgresVersion,
"host-2": upgradePostgresVersion,
},
"n2": map[string]string{
"host-3": upgradePostgresVersion,
},
})

upgradeService(t, string(databaseID), "n1", "host-1")

tLog(t, "sleeping to allow monitor interval and version reconciliation to complete")

time.Sleep(sleepDuration)

tLogf(t, "asserting the top-level version is %s and that all instances are %s", upgradePostgresVersion, upgradePostgresVersion)

db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{
DatabaseID: databaseID,
})
require.NoError(t, err)

assertSpecVersions(t, db.Spec, upgradePostgresVersion, map[string]string{
"n1": "",
"n2": "",
})
assertInstanceVersions(t, db.Instances, map[string]map[string]string{
"n1": map[string]string{
"host-1": upgradePostgresVersion,
"host-2": upgradePostgresVersion,
},
"n2": map[string]string{
"host-3": upgradePostgresVersion,
},
})

tLog(t, "performing a no-op update")

// We still expect to see some resource updates in the logs because the
// version number shows up in a few resources states. This does trigger a
// patroni reload in Swarm databases, which eats up time, but no actual
// changes should occur.

updateResp, err := cluster.Client().UpdateDatabase(ctx, &controlplane.UpdateDatabasePayload{
DatabaseID: databaseID,
Request: &controlplane.UpdateDatabaseRequest{
Spec: db.Spec,
},
})
require.NoError(t, err)

tLog(t, "waiting for database update to complete")

err = waitForTaskComplete(ctx, cluster.Client(), databaseID, updateResp.Task.TaskID, 3*time.Minute)
require.NoError(t, err)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
tLog(t, "sleeping to allow instance monitor interval to complete")

time.Sleep(sleepDuration)

tLog(t, "asserting that top-level versions have not changed")

db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{
DatabaseID: databaseID,
})
require.NoError(t, err)

assertSpecVersions(t, db.Spec, upgradePostgresVersion, map[string]string{
"n1": "",
"n2": "",
})
assertInstanceVersions(t, db.Instances, map[string]map[string]string{
"n1": map[string]string{
"host-1": upgradePostgresVersion,
"host-2": upgradePostgresVersion,
},
"n2": map[string]string{
"host-3": upgradePostgresVersion,
},
})
}
2 changes: 1 addition & 1 deletion clustertest/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func printContainerLogs(ctx context.Context, t testing.TB, hostID string, contai
tLog(t, "container is nil")
return
}
logs, err := containerLogs(t.Context(), t, container)
logs, err := containerLogs(ctx, t, container)
if err != nil {
tLogf(t, "failed to extract container logs: %s", err)
} else {
Expand Down
60 changes: 31 additions & 29 deletions clustertest/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -159,42 +160,43 @@ func pointerTo[T any](v T) *T {
return &v
}

func dockerCmd(t testing.TB, ctx context.Context, args ...string) string {
t.Helper()

tLogf(t, "executing command: docker %s", strings.Join(args, " "))

var w strings.Builder
cmd := exec.CommandContext(ctx, "docker", args...)
cmd.Stdout = &w
cmd.Stderr = &w
err := cmd.Run()
out := w.String()
require.NoError(t, err, "docker command failed: %s", out)

return strings.TrimSpace(out)
}

// waitForTaskComplete polls a database task until it completes, fails, or times out.
func waitForTaskComplete(ctx context.Context, c client.Client, dbID api.Identifier, taskID string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for task %s to complete", taskID)
case <-ticker.C:
task, err := c.GetDatabaseTask(ctx, &api.GetDatabaseTaskPayload{
DatabaseID: dbID,
TaskID: taskID,
})
if err != nil {
return fmt.Errorf("failed to get task: %w", err)
}

switch task.Status {
case client.TaskStatusCompleted:
return nil
case client.TaskStatusFailed:
errMsg := "unknown error"
if task.Error != nil {
errMsg = *task.Error
}
return fmt.Errorf("task failed: %s", errMsg)
case client.TaskStatusCanceled:
return fmt.Errorf("task was canceled")
// "pending", "running", "canceling" - continue waiting
}
task, err := c.WaitForDatabaseTask(ctx, &api.GetDatabaseTaskPayload{
DatabaseID: dbID,
TaskID: taskID,
})
if err != nil {
return fmt.Errorf("failed to wait for task: %w", err)
}
if task.Status != client.TaskStatusCompleted {
var taskError string
if task.Error != nil {
taskError = *task.Error
}
return fmt.Errorf("task status is '%s' instead of 'completed', error=%s", task.Status, taskError)
}

return nil
}

// waitForDatabaseAvailable polls a database until it reaches available state or times out.
Expand Down
Loading