Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref());
matches!(phase, None | Some("complete"))
// "partial" counts as complete for sweep purposes: the
// migration Job ran and we accepted the result; we no longer
// depend on the previous restore being around.
matches!(phase, None | Some("complete") | Some("partial"))
} else {
true
};
Expand Down Expand Up @@ -1172,15 +1175,50 @@ async fn reconcile_schema_migration(
return Ok(false);
}
JobStatus::Succeeded => {
info!(replica = %replica_name, "migration Job succeeded");
// The migration script reports a partial-success callback body
// when psql exited cleanly but some individual statements
// failed (typical when dbt views reference renamed/dropped
// upstream columns). We treat it as completion either way —
// the replica must come up — but surface partials as a
// Warning event so operators can find them.
let callback = ctx.schema_migration_results.take(namespace, &replica_name);
let is_partial = callback
.as_deref()
.is_some_and(|b| b.starts_with("partial"));

if is_partial {
warn!(
replica = %replica_name,
result = ?callback,
"migration Job succeeded with statement errors; some persistent_schemas objects may need regenerating"
);
if let Err(e) = ctx
.recorder
.publish(
&Event {
type_: EventType::Warning,
reason: "SchemaMigrationPartial".into(),
note: callback.clone(),
action: "Restore".into(),
secondary: Some(new_restore.object_ref(&())),
},
&replica.object_ref(&()),
)
.await
{
warn!(replica = %replica_name, error = %e, "failed to publish SchemaMigrationPartial event");
}
} else {
info!(replica = %replica_name, "migration Job succeeded");
}

// Update status
let phase = if is_partial { "partial" } else { "complete" };
let replicas: Api<PostgresPhysicalReplica> =
Api::namespaced(client.clone(), namespace);
let patch = serde_json::json!({
"status": {
"schemaMigrationJob": null,
"schemaMigrationPhase": "complete",
"schemaMigrationPhase": phase,
}
});
replicas
Expand Down
67 changes: 57 additions & 10 deletions src/controllers/replica/schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ pub fn migration_job_name(replica_name: &str) -> String {
/// SCHEMAS (comma-separated list)
/// MIGRATION_CALLBACK_URL
static MIGRATION_SCRIPT: &str = r#"#!/bin/bash
set -o pipefail
# pipefail is intentionally NOT set: psql's per-statement failures don't
# propagate into the script's exit code (see ON_ERROR_STOP discussion
# below), and pg_dump can fail mid-stream after producing partial output
# — we still want psql to apply whatever it did receive, then exit
# normally so the replica can come up.

# Parse comma-separated schema list
IFS=',' read -ra SCHEMA_ARRAY <<< "$SCHEMAS"
Expand All @@ -58,6 +62,17 @@ for schema in "${SCHEMA_ARRAY[@]}"; do
echo "Migrating schema: $schema"
done

# Capture psql's stderr for visibility on partial failures.
PSQL_STDERR=$(mktemp)

# ON_ERROR_STOP is deliberately NOT set: persistent_schemas like dbt
# contain views derived from upstream tables, and across upstream schema
# changes (renamed columns, dropped tables) some view DDL in the old
# replica's schema becomes invalid against the new restore's source
# tables. Failing the whole migration on the first such error blocks the
# replica from coming up at all. Tolerance trades schema completeness
# for replica availability — clients can regenerate the broken views
# afterward, but the replica must be reachable.
PGPASSWORD="$SOURCE_PASSWORD" pg_dump \
-h "$SOURCE_HOST" -p 5432 -U "$SOURCE_USER" -d "$SOURCE_DB" \
"${SCHEMA_ARGS[@]}" \
Expand All @@ -66,21 +81,30 @@ PGPASSWORD="$SOURCE_PASSWORD" pg_dump \
--verbose \
| PGPASSWORD="$TARGET_PASSWORD" psql \
-h "$TARGET_HOST" -p 5432 -U "$TARGET_USER" -d "$TARGET_DB" \
-v ON_ERROR_STOP=1 --quiet
--quiet 2> >(tee "$PSQL_STDERR" >&2)

EXIT_CODE=$?
PSQL_EXIT=$?
PSQL_ERROR_COUNT=$(grep -c '^ERROR:' "$PSQL_STDERR" 2>/dev/null || echo 0)
PSQL_ERROR_COUNT=${PSQL_ERROR_COUNT:-0}
rm -f "$PSQL_STDERR"

if [ $EXIT_CODE -eq 0 ]; then
echo ""
echo ""
if [ "$PSQL_EXIT" -ne 0 ]; then
echo "=== psql exited non-zero ($PSQL_EXIT); proceeding so the replica can come up ===" >&2
fi

if [ "$PSQL_ERROR_COUNT" -gt 0 ]; then
echo "=== Schema migration tolerated $PSQL_ERROR_COUNT statement error(s); some objects may need regenerating ===" >&2
report_result "partial: $PSQL_ERROR_COUNT statement error(s)"
else
echo "=== Schema migration completed successfully ==="
report_result 'success'
else
echo ""
echo "=== Schema migration failed with exit code $EXIT_CODE ===" >&2
report_result "Migration failed with exit code $EXIT_CODE"
fi

exit $EXIT_CODE
# Always exit 0: any non-fatal issues are reported via the callback
# above. Treating partial migrations as Job failures puts the operator
# into a retry loop that never converges (the same views keep failing).
exit 0
"#;

/// Build the schema migration Job spec.
Expand Down Expand Up @@ -266,6 +290,29 @@ mod tests {
}
}

#[test]
fn migration_script_is_tolerant_to_statement_errors() {
// The migration script must NOT use `ON_ERROR_STOP=1`. Persistent
// schemas (e.g. dbt) contain views derived from upstream tables;
// when upstream schema migrations rename or drop those columns,
// some view recreations fail. Aborting the entire migration on
// the first such error blocks the replica from coming up, which
// is a worse outcome than a partial migration that clients can
// patch up afterwards.
assert!(
!MIGRATION_SCRIPT.contains("ON_ERROR_STOP=1"),
"migration script must not enable ON_ERROR_STOP=1 — statement errors should be tolerated so the replica can come up"
);
assert!(
MIGRATION_SCRIPT.contains("exit 0"),
"migration script must exit 0 on completion; non-fatal errors are reported via the callback body"
);
assert!(
MIGRATION_SCRIPT.contains("partial"),
"migration script must report partial migrations via the callback so the operator can surface them"
);
}

#[test]
fn migration_job_name_format() {
assert_eq!(
Expand Down