Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ jobs:
test-ps-all-missing
needs_non_pg_snapshot: false

- name: redaction
namespaces: >-
test-redaction
needs_non_pg_snapshot: false
needs_pg18_snapshot: true

steps:
- uses: actions/checkout@v6

Expand Down Expand Up @@ -164,6 +170,11 @@ jobs:
load_image postgres:16-alpine
load_image alpine:latest

if [ "${{ matrix.needs_pg18_snapshot }}" = "true" ]; then
load_image postgres:18
load_image nginx:alpine
fi

- name: Deploy MinIO
run: |
kubectl apply -f tests/fixtures/minio.yaml
Expand Down Expand Up @@ -201,6 +212,34 @@ jobs:
kubectl logs job/setup-kopia-repo --all-containers --prefix
echo "--- Kopia repository ready ---"

- name: Set up PG-18 kopia snapshot
if: matrix.needs_pg18_snapshot
run: |
kubectl apply -f tests/fixtures/setup-kopia-repo-pg18.yaml
for i in $(seq 1 60); do
STATUS=$(kubectl get job/setup-kopia-repo-pg18 -o jsonpath='{.status.conditions[?(@.type=="Complete")].status}' 2>/dev/null)
FAILED=$(kubectl get job/setup-kopia-repo-pg18 -o jsonpath='{.status.conditions[?(@.type=="Failed")].status}' 2>/dev/null)
if [ "$STATUS" = "True" ]; then
echo "PG-18 setup job completed successfully"
break
fi
if [ "$FAILED" = "True" ]; then
echo "PG-18 setup job failed!"
kubectl describe job/setup-kopia-repo-pg18
kubectl logs job/setup-kopia-repo-pg18 --all-containers --prefix
exit 1
fi
if [ "$i" = "60" ]; then
echo "PG-18 setup job timed out after 300s"
kubectl describe job/setup-kopia-repo-pg18
kubectl logs job/setup-kopia-repo-pg18 --all-containers --prefix 2>/dev/null || true
exit 1
fi
sleep 5
done
echo "--- PG-18 setup job logs ---"
kubectl logs job/setup-kopia-repo-pg18 --all-containers --prefix

- name: Set up non-postgres kopia snapshot
if: matrix.needs_non_pg_snapshot
run: |
Expand Down
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Defines a continuously-refreshed replica of a PostgreSQL database restored from
| `postgresExtraConfig` | `string` | No | — | Extra lines appended to `postgresql.conf` (e.g. `shared_preload_libraries`). |
| `notifications` | `[]NotificationConfig` | No | `[]` | Notification targets called on restore events. |
| `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. |
| `redaction` | `RedactionSpec` | No | — | If set, apply a Tamanu/dbt-shaped masking manifest to the restored data via the `postgresql_anonymizer` extension before switchover. Requires PostgreSQL 18+. |

The cron expression is parsed using the [cronexpr](https://docs.rs/cronexpr) crate.
It has two interesting features:
Expand All @@ -114,6 +115,35 @@ The jitter is a random duration between -time/2 and +time/2.
For example, `10m` will result in a jitter between -5m and 5m.
When using `H` in the cron expression, you might want to set the jitter to zero to properly take advantage of the spread-but-stable behaviour.

#### RedactionSpec

Configures applying a column-masking manifest to the restored data using the [postgresql_anonymizer](https://gitlab.com/dalibo/postgresql_anonymizer) extension.
The manifest follows the [Tamanu masking spec](https://github.com/beyondessential/tamanu/tree/main/database#masking) — any dbt project that publishes the same `meta.masking` annotation shape can be pointed at.

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `manifestUrl` | `string` | Yes | — | HTTP(S) URL of the masking manifest. May contain a literal `{version}` placeholder. |
| `version` | `string` | No | — | Pinned version substituted into `{version}`. Mutually exclusive with `versionQuery`. |
| `versionQuery` | `string` | No | — | SQL query that returns one row, one text column with the version string. Run as the operator's superuser against the restore. Mutually exclusive with `version`. |
| `versionFallbackToBase` | `bool` | No | `false` | If the manifest URL with the discovered/pinned version 404s, retry with the `major.minor.0` base version. |

Example (Tamanu):

```yaml
spec:
redaction:
manifestUrl: "https://docs.data.bes.au/tamanu/v{version}/manifest.json"
versionQuery: "SELECT value FROM local_system_facts WHERE key = 'currentVersion'"
versionFallbackToBase: true
```

Notes:

- Works on any PostgreSQL major the operator otherwise supports. There's no PG-version gate because the prelude apt-installs `postgresql_anonymizer_$N` from [Dalibo Labs](https://apt.dalibo.org/labs/) per the running restore's PG version and copies the files into the standard system extension dirs (`/usr/share/postgresql/$N/extension`, `/usr/lib/postgresql/$N/lib`).
- The download is cached on the restore PVC at `/pgdata/.anon-cache/`, so a pod restart doesn't re-fetch the package — it just re-copies the cached files into the (fresh) container writable layer.
- The postgres container runs as root for the prelude (to apt-install and write to system paths) then drops back to UID 999 via `gosu` before exec'ing `postgres`. `gosu` is preinstalled in the official `postgres` image.
- During redaction the database is writable; once anonymisation completes, the operator sets `default_transaction_read_only = on` at the database level and demotes the analytics user back to non-superuser when `spec.readOnly` is true.

#### SnapshotFilter

| Field | Type | Required | Description |
Expand Down Expand Up @@ -165,6 +195,9 @@ Additional fields for `target: graphQL`:
| `schemaMigrationJob` | `string` | Name of the active schema migration Job (set while migration is in progress). |
| `schemaMigrationPhase` | `string` | Phase of the schema migration (`active`, `complete`, or `failed: <reason>`). |
| `persistentSchemaDataSize` | `Quantity` | Measured size of persistent schema data from the last successful migration. Used to size the next restore PVC. |
| `redactionPhase` | `string` | Phase of the current restore's redaction (`active`, `complete`, `partial`, or `failed: <reason>`). `partial` means anonymisation ran but some per-column SECURITY LABEL statements were tolerated as errors (e.g. column missing on this DB version). `failed:` is sticky — it doesn't auto-retry; the next scheduled restore clears it. |
| `redactionVersion` | `string` | The manifest version resolved during the last redaction run (when `manifestUrl` is version-templated). |
| `redactionColumnsApplied` | `uint32` | Number of columns the last redaction run attempted to mask. |
| `consecutiveRestoreFailures` | `uint32` | Number of consecutive restore failures. Reset to 0 on success. After 3 consecutive failures the operator stops scheduling new restores until the counter is reset (automatically on next successful restore, or manually via `kubectl patch --subresource=status`). |

---
Expand Down
28 changes: 28 additions & 0 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
};
use scheduling::ScheduleDecision;

mod redaction;
mod resources;
mod scheduling;
mod schema_migration;
Expand Down Expand Up @@ -162,6 +163,19 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
)
});

// Handle redaction before schema migration: if redaction is set,
// it rewrites the data in place, and any persistent_schemas migration
// pulls from the (already-redacted) source tables.
if replica.spec.redaction.is_some()
&& let Some(switching) = switching_restore
{
let redaction_settled =
redaction::reconcile_redaction_step(&ctx, &replica, switching).await?;
if !redaction_settled {
return Ok(Action::requeue(Duration::from_secs(30)));
}
}

// Handle schema migration for persistent_schemas configuration
if replica.spec.persistent_schemas.is_some()
&& let Some(switching) = switching_restore
Expand Down Expand Up @@ -306,6 +320,16 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
true
};

let redaction_settled = if replica.spec.redaction.is_some() {
let phase = replica
.status
.as_ref()
.and_then(|s| s.redaction_phase.as_deref());
matches!(phase, None | Some("complete") | Some("partial"))
} else {
true
};

let grace_period =
SignedDuration::try_from(replica.spec.switchover_grace_period.0).unwrap_or_default();
let last_completed = replica
Expand All @@ -325,6 +349,7 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
});

if migration_complete
&& redaction_settled
&& has_matching_current
&& let Some(completed_at) = last_completed
&& now.duration_since(completed_at.0) > grace_period
Expand Down Expand Up @@ -368,6 +393,9 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
"previousRestore": null,
"schemaMigrationJob": null,
"schemaMigrationPhase": null,
"redactionPhase": null,
"redactionVersion": null,
"redactionColumnsApplied": null,
}
});
replicas
Expand Down
Loading
Loading