Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 70 additions & 45 deletions cmd/outpost-migrate-redis/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
)

const (
migrationLockKey = ".outpost:migration:lock"
migrationLockKey = ".outpost:migration_lock"
)

// Migrator handles Redis migrations
type Migrator struct {
client *redisClientWrapper
logger MigrationLogger
migrations map[string]migratorredis.Migration // All available migrations
client *redisClientWrapper
logger MigrationLogger
migrations map[string]migratorredis.Migration // All available migrations
deploymentID string
}

// Close cleans up resources (logger sync, redis connection, etc)
Expand Down Expand Up @@ -73,12 +74,28 @@ func NewMigrator(cfg *config.Config, logger MigrationLogger) (*Migrator, error)
}

return &Migrator{
client: client,
logger: logger,
migrations: migrationMap,
client: client,
logger: logger,
migrations: migrationMap,
deploymentID: cfg.DeploymentID,
}, nil
}

func (m *Migrator) deploymentPrefix() string {
if m.deploymentID == "" {
return ""
}
return m.deploymentID + ":"
}

func (m *Migrator) lockKey() string {
return m.deploymentPrefix() + migrationLockKey
}

func (m *Migrator) migrationKey(name string) string {
return fmt.Sprintf("%soutpost:migration:%s", m.deploymentPrefix(), name)
}

// ListMigrations lists all available migrations
func (m *Migrator) ListMigrations() error {
// Build map of name -> description from actual migrations
Expand All @@ -98,14 +115,14 @@ func (m *Migrator) acquireLock(ctx context.Context, migrationName string) error

// Try to set lock atomically with SetNX (only sets if not exists)
// Use 1 hour expiry in case process dies without cleanup
success, err := m.client.SetNX(ctx, migrationLockKey, lock, time.Hour).Result()
success, err := m.client.SetNX(ctx, m.lockKey(), lock, time.Hour).Result()
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}

if !success {
// Lock already exists, get details for error message
lockData, err := m.client.Get(ctx, migrationLockKey).Result()
lockData, err := m.client.Get(ctx, m.lockKey()).Result()
if err != nil {
return fmt.Errorf("migration is already running (could not get lock details: %w)", err)
}
Expand All @@ -119,7 +136,7 @@ func (m *Migrator) acquireLock(ctx context.Context, migrationName string) error

// releaseLock releases the migration lock
func (m *Migrator) releaseLock(ctx context.Context) error {
err := m.client.Del(ctx, migrationLockKey).Err()
err := m.client.Del(ctx, m.lockKey()).Err()
if err != nil {
return fmt.Errorf("failed to release lock: %w", err)
}
Expand All @@ -130,7 +147,7 @@ func (m *Migrator) releaseLock(ctx context.Context) error {
// Unlock forcefully clears the migration lock (for stuck situations)
func (m *Migrator) Unlock(ctx context.Context, autoApprove bool) error {
// Check if lock exists
lockData, err := m.client.Get(ctx, migrationLockKey).Result()
lockData, err := m.client.Get(ctx, m.lockKey()).Result()
if err != nil && err.Error() == "redis: nil" {
m.logger.LogLockStatus("", false)
return nil
Expand All @@ -155,7 +172,7 @@ func (m *Migrator) Unlock(ctx context.Context, autoApprove bool) error {
}
}

err = m.client.Del(ctx, migrationLockKey).Err()
err = m.client.Del(ctx, m.lockKey()).Err()
if err != nil {
return fmt.Errorf("failed to clear lock: %w", err)
}
Expand Down Expand Up @@ -221,7 +238,7 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error {

// Mark all migrations as applied
for name := range m.migrations {
if err := setMigrationAsApplied(ctx, m.client, name); err != nil {
if err := m.setMigrationAsApplied(ctx, name); err != nil {
return fmt.Errorf("failed to mark migration %s as applied: %w", name, err)
}
}
Expand All @@ -238,7 +255,7 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error {
// Get pending migrations count (not satisfied = not applied and not marked as not_applicable)
pendingCount := 0
for name := range m.migrations {
if !isSatisfied(ctx, m.client, name) {
if !m.isSatisfied(ctx, name) {
pendingCount++
}
}
Expand All @@ -255,35 +272,48 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error {

// checkIfFreshInstallation checks if Redis is a fresh installation
func (m *Migrator) checkIfFreshInstallation(ctx context.Context) (bool, error) {
prefix := m.deploymentPrefix()

// Check for any "outpost:*" keys (current format)
outpostKeys, err := m.client.Keys(ctx, "outpost:*").Result()
// Use SCAN instead of KEYS to avoid blocking Redis on large keyspaces
hasKeys, err := m.hasAnyKeys(ctx, prefix+"outpost:*")
if err != nil {
return false, fmt.Errorf("failed to check outpost keys: %w", err)
}
if len(outpostKeys) > 0 {
if hasKeys {
return false, nil // Has current data
}

// Check for any "tenant:*" keys (old format)
tenantKeys, err := m.client.Keys(ctx, "tenant:*").Result()
// Check for any "tenant:*" keys (old format, or deployment-scoped tenant keys)
hasKeys, err = m.hasAnyKeys(ctx, prefix+"tenant:*")
if err != nil {
return false, fmt.Errorf("failed to check tenant keys: %w", err)
}
if len(tenantKeys) > 0 {
if hasKeys {
return false, nil // Has old data
}

// No keys found - it's a fresh installation
return true, nil
}

// hasAnyKeys uses SCAN to check if at least one key matches the pattern
// without blocking Redis like KEYS does.
func (m *Migrator) hasAnyKeys(ctx context.Context, pattern string) (bool, error) {
keys, _, err := m.client.Scan(ctx, 0, pattern, 1).Result()
if err != nil {
return false, err
}
return len(keys) > 0, nil
}

// Plan shows what changes would be made without applying them
func (m *Migrator) Plan(ctx context.Context) error {
// First show current status
var satisfiedCount, pendingCount int

for name := range m.migrations {
if isSatisfied(ctx, m.client, name) {
if m.isSatisfied(ctx, name) {
satisfiedCount++
} else {
pendingCount++
Expand Down Expand Up @@ -453,7 +483,7 @@ func (m *Migrator) ApplyOne(ctx context.Context, autoApprove, rerun bool, migrat
if !ok {
return fmt.Errorf("migration not found: %s", migrationName)
}
alreadyApplied = isApplied(ctx, m.client, migrationName)
alreadyApplied = m.isApplied(ctx, migrationName)

if alreadyApplied && !rerun {
m.logger.LogInfo(fmt.Sprintf("migration %s already applied (use --rerun to re-run)", migrationName))
Expand Down Expand Up @@ -511,12 +541,12 @@ func (m *Migrator) ApplyOne(ctx context.Context, autoApprove, rerun bool, migrat
duration := time.Since(startTime)

// Mark migration as applied (or update applied_at if re-running)
if err := setMigrationAsApplied(ctx, m.client, mig.Name()); err != nil {
if err := m.setMigrationAsApplied(ctx, mig.Name()); err != nil {
return fmt.Errorf("failed to mark migration as applied: %w", err)
}

// Record run history
if err := recordMigrationRun(ctx, m.client, mig.Name(), state, rerun, duration); err != nil {
if err := m.recordMigrationRun(ctx, mig.Name(), state, rerun, duration); err != nil {
m.logger.LogWarning(fmt.Sprintf("failed to record run history: %v", err))
// Don't fail the migration for history recording errors
}
Expand Down Expand Up @@ -552,7 +582,7 @@ func (m *Migrator) Apply(ctx context.Context, autoApprove bool) error {
applicable, reason := mig.IsApplicable(ctx)
if !applicable {
m.logger.LogInfo(fmt.Sprintf(" %s: Skipped (%s)", mig.Name(), reason))
if err := setMigrationNotApplicable(ctx, m.client, mig.Name(), reason); err != nil {
if err := m.setMigrationNotApplicable(ctx, mig.Name(), reason); err != nil {
return fmt.Errorf("failed to mark %s as not applicable: %w", mig.Name(), err)
}
skipped++
Expand Down Expand Up @@ -616,13 +646,13 @@ func (m *Migrator) Apply(ctx context.Context, autoApprove bool) error {
duration := time.Since(startTime)

// Mark as applied
if err := setMigrationAsApplied(ctx, m.client, mig.Name()); err != nil {
if err := m.setMigrationAsApplied(ctx, mig.Name()); err != nil {
m.releaseLock(ctx)
return fmt.Errorf("failed to mark %s as applied: %w", mig.Name(), err)
}

// Record run history
if err := recordMigrationRun(ctx, m.client, mig.Name(), state, false, duration); err != nil {
if err := m.recordMigrationRun(ctx, mig.Name(), state, false, duration); err != nil {
m.logger.LogWarning(fmt.Sprintf("failed to record run history: %v", err))
}

Expand Down Expand Up @@ -674,27 +704,25 @@ func (m *Migrator) getPendingMigrations(ctx context.Context) []migratorredis.Mig

var pending []migratorredis.Migration
for _, entry := range sorted {
if !isSatisfied(ctx, m.client, entry.name) {
if !m.isSatisfied(ctx, entry.name) {
pending = append(pending, entry.migration)
}
}
return pending
}

// isSatisfied checks if a migration has been satisfied (applied or not applicable)
func isSatisfied(ctx context.Context, client *redisClientWrapper, name string) bool {
key := fmt.Sprintf("outpost:migration:%s", name)
val, err := client.HGet(ctx, key, "status").Result()
func (m *Migrator) isSatisfied(ctx context.Context, name string) bool {
val, err := m.client.HGet(ctx, m.migrationKey(name), "status").Result()
if err != nil {
return false
}
return val == "applied" || val == "not_applicable"
}

// isApplied checks if a migration has been applied (not just satisfied)
func isApplied(ctx context.Context, client *redisClientWrapper, name string) bool {
key := fmt.Sprintf("outpost:migration:%s", name)
val, err := client.HGet(ctx, key, "status").Result()
func (m *Migrator) isApplied(ctx context.Context, name string) bool {
val, err := m.client.HGet(ctx, m.migrationKey(name), "status").Result()
if err != nil {
return false
}
Expand All @@ -718,7 +746,7 @@ func (m *Migrator) getNextMigration(ctx context.Context) (migratorredis.Migratio

// Find first unsatisfied
for _, entry := range sorted {
if !isSatisfied(ctx, m.client, entry.name) {
if !m.isSatisfied(ctx, entry.name) {
return entry.migration, nil
}
}
Expand All @@ -743,7 +771,7 @@ func (m *Migrator) getLastAppliedMigration(ctx context.Context) (migratorredis.M

// Find last applied
for _, entry := range sorted {
if isApplied(ctx, m.client, entry.name) {
if m.isApplied(ctx, entry.name) {
return entry.migration, nil
}
}
Expand All @@ -752,36 +780,33 @@ func (m *Migrator) getLastAppliedMigration(ctx context.Context) (migratorredis.M
}

// setMigrationAsApplied marks a migration as applied
func setMigrationAsApplied(ctx context.Context, client *redisClientWrapper, name string) error {
key := fmt.Sprintf("outpost:migration:%s", name)
func (m *Migrator) setMigrationAsApplied(ctx context.Context, name string) error {
now := time.Now().Unix()

// Use Redis hash to store migration state
return client.HSet(ctx, key,
return m.client.HSet(ctx, m.migrationKey(name),
"status", "applied",
"applied_at", fmt.Sprintf("%d", now),
).Err()
}

// setMigrationNotApplicable marks a migration as not applicable
func setMigrationNotApplicable(ctx context.Context, client *redisClientWrapper, name, reason string) error {
key := fmt.Sprintf("outpost:migration:%s", name)
func (m *Migrator) setMigrationNotApplicable(ctx context.Context, name, reason string) error {
now := time.Now().Unix()

return client.HSet(ctx, key,
return m.client.HSet(ctx, m.migrationKey(name),
"status", "not_applicable",
"checked_at", fmt.Sprintf("%d", now),
"reason", reason,
).Err()
}

// recordMigrationRun records a migration run in the history
// Key format: outpost:migration:{name}:run:{timestamp}
func recordMigrationRun(ctx context.Context, client *redisClientWrapper, name string, state *migratorredis.State, rerun bool, duration time.Duration) error {
func (m *Migrator) recordMigrationRun(ctx context.Context, name string, state *migratorredis.State, rerun bool, duration time.Duration) error {
now := time.Now().Unix()
key := fmt.Sprintf("outpost:migration:%s:run:%d", name, now)
key := fmt.Sprintf("%soutpost:migration:%s:run:%d", m.deploymentPrefix(), name, now)

return client.HSet(ctx, key,
return m.client.HSet(ctx, key,
"processed", state.Progress.ProcessedItems,
"skipped", state.Progress.SkippedItems,
"failed", state.Progress.FailedItems,
Expand Down
2 changes: 1 addition & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (a *App) initializeInfrastructure(ctx context.Context) error {
}

func (a *App) initializeTelemetry(ctx context.Context) error {
installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig())
installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig(), a.config.DeploymentID)
if err != nil {
return err
}
Expand Down
24 changes: 17 additions & 7 deletions internal/app/installation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,34 @@ package app

import (
"context"
"fmt"
"time"

"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/telemetry"
)

const (
outpostrcKey = "outpostrc"
installationKey = "installation"
installationIDKey = "outpost:installation_id"
)

func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryConfig telemetry.TelemetryConfig) (string, error) {
func installationKey(deploymentID string) string {
if deploymentID == "" {
return installationIDKey
}
return fmt.Sprintf("%s:%s", deploymentID, installationIDKey)
}

func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryConfig telemetry.TelemetryConfig, deploymentID string) (string, error) {
if telemetryConfig.Disabled {
return "", nil
}

key := installationKey(deploymentID)

// First attempt: try to get existing installation ID
installationID, err := redisClient.HGet(ctx, outpostrcKey, installationKey).Result()
installationID, err := redisClient.Get(ctx, key).Result()
if err == nil {
return installationID, nil
}
Expand All @@ -31,9 +41,9 @@ func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryCo
// Installation ID doesn't exist, create one atomically
newInstallationID := idgen.Installation()

// Use HSETNX to atomically set the installation ID only if it doesn't exist
// Use SetNX to atomically set the installation ID only if it doesn't exist
// This prevents race conditions when multiple Outpost instances start simultaneously
wasSet, err := redisClient.HSetNX(ctx, outpostrcKey, installationKey, newInstallationID).Result()
wasSet, err := redisClient.SetNX(ctx, key, newInstallationID, time.Duration(0)).Result()
if err != nil {
return "", err
}
Expand All @@ -44,7 +54,7 @@ func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryCo

// Another instance set the installation ID while we were generating ours
// Fetch the installation ID that was actually set
installationID, err = redisClient.HGet(ctx, outpostrcKey, installationKey).Result()
installationID, err = redisClient.Get(ctx, key).Result()
if err != nil {
return "", err
}
Expand Down
Loading
Loading