Skip to content

Commit a38bc6d

Browse files
alexluongclaude
andcommitted
fix: scope Redis control plane keys by deployment ID (#680)
Control plane keys (installation ID, migration lock, migration status) were not prefixed with DEPLOYMENT_ID, causing collisions when multiple deployments share the same Redis instance. Key renames: - outpostrc (hash) → outpost:installation_id (string) - .outpost:migration:lock → .outpost:migration_lock - outpost:migration:{name} → unchanged (now prefixed when scoped) All keys are prefixed with {deploymentID}: when DEPLOYMENT_ID is set, matching the existing pattern used by tenant store, alert store, etc. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 690f77b commit a38bc6d

10 files changed

Lines changed: 743 additions & 79 deletions

File tree

cmd/outpost-migrate-redis/migration.go

Lines changed: 57 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ import (
1414
)
1515

1616
const (
17-
migrationLockKey = ".outpost:migration:lock"
17+
migrationLockKey = ".outpost:migration_lock"
1818
)
1919

2020
// Migrator handles Redis migrations
2121
type Migrator struct {
22-
client *redisClientWrapper
23-
logger MigrationLogger
24-
migrations map[string]migratorredis.Migration // All available migrations
22+
client *redisClientWrapper
23+
logger MigrationLogger
24+
migrations map[string]migratorredis.Migration // All available migrations
25+
deploymentID string
2526
}
2627

2728
// Close cleans up resources (logger sync, redis connection, etc)
@@ -73,12 +74,28 @@ func NewMigrator(cfg *config.Config, logger MigrationLogger) (*Migrator, error)
7374
}
7475

7576
return &Migrator{
76-
client: client,
77-
logger: logger,
78-
migrations: migrationMap,
77+
client: client,
78+
logger: logger,
79+
migrations: migrationMap,
80+
deploymentID: cfg.DeploymentID,
7981
}, nil
8082
}
8183

84+
func (m *Migrator) deploymentPrefix() string {
85+
if m.deploymentID == "" {
86+
return ""
87+
}
88+
return m.deploymentID + ":"
89+
}
90+
91+
func (m *Migrator) lockKey() string {
92+
return m.deploymentPrefix() + migrationLockKey
93+
}
94+
95+
func (m *Migrator) migrationKey(name string) string {
96+
return fmt.Sprintf("%soutpost:migration:%s", m.deploymentPrefix(), name)
97+
}
98+
8299
// ListMigrations lists all available migrations
83100
func (m *Migrator) ListMigrations() error {
84101
// Build map of name -> description from actual migrations
@@ -98,14 +115,14 @@ func (m *Migrator) acquireLock(ctx context.Context, migrationName string) error
98115

99116
// Try to set lock atomically with SetNX (only sets if not exists)
100117
// Use 1 hour expiry in case process dies without cleanup
101-
success, err := m.client.SetNX(ctx, migrationLockKey, lock, time.Hour).Result()
118+
success, err := m.client.SetNX(ctx, m.lockKey(), lock, time.Hour).Result()
102119
if err != nil {
103120
return fmt.Errorf("failed to acquire lock: %w", err)
104121
}
105122

106123
if !success {
107124
// Lock already exists, get details for error message
108-
lockData, err := m.client.Get(ctx, migrationLockKey).Result()
125+
lockData, err := m.client.Get(ctx, m.lockKey()).Result()
109126
if err != nil {
110127
return fmt.Errorf("migration is already running (could not get lock details: %w)", err)
111128
}
@@ -119,7 +136,7 @@ func (m *Migrator) acquireLock(ctx context.Context, migrationName string) error
119136

120137
// releaseLock releases the migration lock
121138
func (m *Migrator) releaseLock(ctx context.Context) error {
122-
err := m.client.Del(ctx, migrationLockKey).Err()
139+
err := m.client.Del(ctx, m.lockKey()).Err()
123140
if err != nil {
124141
return fmt.Errorf("failed to release lock: %w", err)
125142
}
@@ -130,7 +147,7 @@ func (m *Migrator) releaseLock(ctx context.Context) error {
130147
// Unlock forcefully clears the migration lock (for stuck situations)
131148
func (m *Migrator) Unlock(ctx context.Context, autoApprove bool) error {
132149
// Check if lock exists
133-
lockData, err := m.client.Get(ctx, migrationLockKey).Result()
150+
lockData, err := m.client.Get(ctx, m.lockKey()).Result()
134151
if err != nil && err.Error() == "redis: nil" {
135152
m.logger.LogLockStatus("", false)
136153
return nil
@@ -155,7 +172,7 @@ func (m *Migrator) Unlock(ctx context.Context, autoApprove bool) error {
155172
}
156173
}
157174

158-
err = m.client.Del(ctx, migrationLockKey).Err()
175+
err = m.client.Del(ctx, m.lockKey()).Err()
159176
if err != nil {
160177
return fmt.Errorf("failed to clear lock: %w", err)
161178
}
@@ -221,7 +238,7 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error {
221238

222239
// Mark all migrations as applied
223240
for name := range m.migrations {
224-
if err := setMigrationAsApplied(ctx, m.client, name); err != nil {
241+
if err := m.setMigrationAsApplied(ctx, name); err != nil {
225242
return fmt.Errorf("failed to mark migration %s as applied: %w", name, err)
226243
}
227244
}
@@ -238,7 +255,7 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error {
238255
// Get pending migrations count (not satisfied = not applied and not marked as not_applicable)
239256
pendingCount := 0
240257
for name := range m.migrations {
241-
if !isSatisfied(ctx, m.client, name) {
258+
if !m.isSatisfied(ctx, name) {
242259
pendingCount++
243260
}
244261
}
@@ -255,17 +272,19 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error {
255272

256273
// checkIfFreshInstallation checks if Redis is a fresh installation
257274
func (m *Migrator) checkIfFreshInstallation(ctx context.Context) (bool, error) {
275+
prefix := m.deploymentPrefix()
276+
258277
// Check for any "outpost:*" keys (current format)
259-
outpostKeys, err := m.client.Keys(ctx, "outpost:*").Result()
278+
outpostKeys, err := m.client.Keys(ctx, prefix+"outpost:*").Result()
260279
if err != nil {
261280
return false, fmt.Errorf("failed to check outpost keys: %w", err)
262281
}
263282
if len(outpostKeys) > 0 {
264283
return false, nil // Has current data
265284
}
266285

267-
// Check for any "tenant:*" keys (old format)
268-
tenantKeys, err := m.client.Keys(ctx, "tenant:*").Result()
286+
// Check for any "tenant:*" keys (old format, or deployment-scoped tenant keys)
287+
tenantKeys, err := m.client.Keys(ctx, prefix+"tenant:*").Result()
269288
if err != nil {
270289
return false, fmt.Errorf("failed to check tenant keys: %w", err)
271290
}
@@ -283,7 +302,7 @@ func (m *Migrator) Plan(ctx context.Context) error {
283302
var satisfiedCount, pendingCount int
284303

285304
for name := range m.migrations {
286-
if isSatisfied(ctx, m.client, name) {
305+
if m.isSatisfied(ctx, name) {
287306
satisfiedCount++
288307
} else {
289308
pendingCount++
@@ -453,7 +472,7 @@ func (m *Migrator) ApplyOne(ctx context.Context, autoApprove, rerun bool, migrat
453472
if !ok {
454473
return fmt.Errorf("migration not found: %s", migrationName)
455474
}
456-
alreadyApplied = isApplied(ctx, m.client, migrationName)
475+
alreadyApplied = m.isApplied(ctx, migrationName)
457476

458477
if alreadyApplied && !rerun {
459478
m.logger.LogInfo(fmt.Sprintf("migration %s already applied (use --rerun to re-run)", migrationName))
@@ -511,12 +530,12 @@ func (m *Migrator) ApplyOne(ctx context.Context, autoApprove, rerun bool, migrat
511530
duration := time.Since(startTime)
512531

513532
// Mark migration as applied (or update applied_at if re-running)
514-
if err := setMigrationAsApplied(ctx, m.client, mig.Name()); err != nil {
533+
if err := m.setMigrationAsApplied(ctx, mig.Name()); err != nil {
515534
return fmt.Errorf("failed to mark migration as applied: %w", err)
516535
}
517536

518537
// Record run history
519-
if err := recordMigrationRun(ctx, m.client, mig.Name(), state, rerun, duration); err != nil {
538+
if err := m.recordMigrationRun(ctx, mig.Name(), state, rerun, duration); err != nil {
520539
m.logger.LogWarning(fmt.Sprintf("failed to record run history: %v", err))
521540
// Don't fail the migration for history recording errors
522541
}
@@ -552,7 +571,7 @@ func (m *Migrator) Apply(ctx context.Context, autoApprove bool) error {
552571
applicable, reason := mig.IsApplicable(ctx)
553572
if !applicable {
554573
m.logger.LogInfo(fmt.Sprintf(" %s: Skipped (%s)", mig.Name(), reason))
555-
if err := setMigrationNotApplicable(ctx, m.client, mig.Name(), reason); err != nil {
574+
if err := m.setMigrationNotApplicable(ctx, mig.Name(), reason); err != nil {
556575
return fmt.Errorf("failed to mark %s as not applicable: %w", mig.Name(), err)
557576
}
558577
skipped++
@@ -616,13 +635,13 @@ func (m *Migrator) Apply(ctx context.Context, autoApprove bool) error {
616635
duration := time.Since(startTime)
617636

618637
// Mark as applied
619-
if err := setMigrationAsApplied(ctx, m.client, mig.Name()); err != nil {
638+
if err := m.setMigrationAsApplied(ctx, mig.Name()); err != nil {
620639
m.releaseLock(ctx)
621640
return fmt.Errorf("failed to mark %s as applied: %w", mig.Name(), err)
622641
}
623642

624643
// Record run history
625-
if err := recordMigrationRun(ctx, m.client, mig.Name(), state, false, duration); err != nil {
644+
if err := m.recordMigrationRun(ctx, mig.Name(), state, false, duration); err != nil {
626645
m.logger.LogWarning(fmt.Sprintf("failed to record run history: %v", err))
627646
}
628647

@@ -674,27 +693,25 @@ func (m *Migrator) getPendingMigrations(ctx context.Context) []migratorredis.Mig
674693

675694
var pending []migratorredis.Migration
676695
for _, entry := range sorted {
677-
if !isSatisfied(ctx, m.client, entry.name) {
696+
if !m.isSatisfied(ctx, entry.name) {
678697
pending = append(pending, entry.migration)
679698
}
680699
}
681700
return pending
682701
}
683702

684703
// isSatisfied checks if a migration has been satisfied (applied or not applicable)
685-
func isSatisfied(ctx context.Context, client *redisClientWrapper, name string) bool {
686-
key := fmt.Sprintf("outpost:migration:%s", name)
687-
val, err := client.HGet(ctx, key, "status").Result()
704+
func (m *Migrator) isSatisfied(ctx context.Context, name string) bool {
705+
val, err := m.client.HGet(ctx, m.migrationKey(name), "status").Result()
688706
if err != nil {
689707
return false
690708
}
691709
return val == "applied" || val == "not_applicable"
692710
}
693711

694712
// isApplied checks if a migration has been applied (not just satisfied)
695-
func isApplied(ctx context.Context, client *redisClientWrapper, name string) bool {
696-
key := fmt.Sprintf("outpost:migration:%s", name)
697-
val, err := client.HGet(ctx, key, "status").Result()
713+
func (m *Migrator) isApplied(ctx context.Context, name string) bool {
714+
val, err := m.client.HGet(ctx, m.migrationKey(name), "status").Result()
698715
if err != nil {
699716
return false
700717
}
@@ -718,7 +735,7 @@ func (m *Migrator) getNextMigration(ctx context.Context) (migratorredis.Migratio
718735

719736
// Find first unsatisfied
720737
for _, entry := range sorted {
721-
if !isSatisfied(ctx, m.client, entry.name) {
738+
if !m.isSatisfied(ctx, entry.name) {
722739
return entry.migration, nil
723740
}
724741
}
@@ -743,7 +760,7 @@ func (m *Migrator) getLastAppliedMigration(ctx context.Context) (migratorredis.M
743760

744761
// Find last applied
745762
for _, entry := range sorted {
746-
if isApplied(ctx, m.client, entry.name) {
763+
if m.isApplied(ctx, entry.name) {
747764
return entry.migration, nil
748765
}
749766
}
@@ -752,36 +769,33 @@ func (m *Migrator) getLastAppliedMigration(ctx context.Context) (migratorredis.M
752769
}
753770

754771
// setMigrationAsApplied marks a migration as applied
755-
func setMigrationAsApplied(ctx context.Context, client *redisClientWrapper, name string) error {
756-
key := fmt.Sprintf("outpost:migration:%s", name)
772+
func (m *Migrator) setMigrationAsApplied(ctx context.Context, name string) error {
757773
now := time.Now().Unix()
758774

759775
// Use Redis hash to store migration state
760-
return client.HSet(ctx, key,
776+
return m.client.HSet(ctx, m.migrationKey(name),
761777
"status", "applied",
762778
"applied_at", fmt.Sprintf("%d", now),
763779
).Err()
764780
}
765781

766782
// setMigrationNotApplicable marks a migration as not applicable
767-
func setMigrationNotApplicable(ctx context.Context, client *redisClientWrapper, name, reason string) error {
768-
key := fmt.Sprintf("outpost:migration:%s", name)
783+
func (m *Migrator) setMigrationNotApplicable(ctx context.Context, name, reason string) error {
769784
now := time.Now().Unix()
770785

771-
return client.HSet(ctx, key,
786+
return m.client.HSet(ctx, m.migrationKey(name),
772787
"status", "not_applicable",
773788
"checked_at", fmt.Sprintf("%d", now),
774789
"reason", reason,
775790
).Err()
776791
}
777792

778793
// recordMigrationRun records a migration run in the history
779-
// Key format: outpost:migration:{name}:run:{timestamp}
780-
func recordMigrationRun(ctx context.Context, client *redisClientWrapper, name string, state *migratorredis.State, rerun bool, duration time.Duration) error {
794+
func (m *Migrator) recordMigrationRun(ctx context.Context, name string, state *migratorredis.State, rerun bool, duration time.Duration) error {
781795
now := time.Now().Unix()
782-
key := fmt.Sprintf("outpost:migration:%s:run:%d", name, now)
796+
key := fmt.Sprintf("%soutpost:migration:%s:run:%d", m.deploymentPrefix(), name, now)
783797

784-
return client.HSet(ctx, key,
798+
return m.client.HSet(ctx, key,
785799
"processed", state.Progress.ProcessedItems,
786800
"skipped", state.Progress.SkippedItems,
787801
"failed", state.Progress.FailedItems,

internal/app/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (a *App) initializeInfrastructure(ctx context.Context) error {
226226
}
227227

228228
func (a *App) initializeTelemetry(ctx context.Context) error {
229-
installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig())
229+
installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig(), a.config.DeploymentID)
230230
if err != nil {
231231
return err
232232
}

internal/app/installation.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,34 @@ package app
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/hookdeck/outpost/internal/idgen"
79
"github.com/hookdeck/outpost/internal/redis"
810
"github.com/hookdeck/outpost/internal/telemetry"
911
)
1012

1113
const (
12-
outpostrcKey = "outpostrc"
13-
installationKey = "installation"
14+
installationIDKey = "outpost:installation_id"
1415
)
1516

16-
func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryConfig telemetry.TelemetryConfig) (string, error) {
17+
func installationKey(deploymentID string) string {
18+
if deploymentID == "" {
19+
return installationIDKey
20+
}
21+
return fmt.Sprintf("%s:%s", deploymentID, installationIDKey)
22+
}
23+
24+
func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryConfig telemetry.TelemetryConfig, deploymentID string) (string, error) {
1725
if telemetryConfig.Disabled {
1826
return "", nil
1927
}
2028

29+
key := installationKey(deploymentID)
30+
2131
// First attempt: try to get existing installation ID
22-
installationID, err := redisClient.HGet(ctx, outpostrcKey, installationKey).Result()
32+
installationID, err := redisClient.Get(ctx, key).Result()
2333
if err == nil {
2434
return installationID, nil
2535
}
@@ -31,9 +41,9 @@ func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryCo
3141
// Installation ID doesn't exist, create one atomically
3242
newInstallationID := idgen.Installation()
3343

34-
// Use HSETNX to atomically set the installation ID only if it doesn't exist
44+
// Use SetNX to atomically set the installation ID only if it doesn't exist
3545
// This prevents race conditions when multiple Outpost instances start simultaneously
36-
wasSet, err := redisClient.HSetNX(ctx, outpostrcKey, installationKey, newInstallationID).Result()
46+
wasSet, err := redisClient.SetNX(ctx, key, newInstallationID, time.Duration(0)).Result()
3747
if err != nil {
3848
return "", err
3949
}
@@ -44,7 +54,7 @@ func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryCo
4454

4555
// Another instance set the installation ID while we were generating ours
4656
// Fetch the installation ID that was actually set
47-
installationID, err = redisClient.HGet(ctx, outpostrcKey, installationKey).Result()
57+
installationID, err = redisClient.Get(ctx, key).Result()
4858
if err != nil {
4959
return "", err
5060
}

0 commit comments

Comments
 (0)