Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions debezium/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ mise run demo # Installs MariaDB, starts it, runs CDC

The module also includes a Debezium Server sink connector for deployment with standalone Debezium Server.

### [cdc-demo](./cdc-demo/)

**Docker Compose end-to-end demo** - Full stack with Postgres, Debezium Connect, Kafka, and XTDB's built-in `debezium-cdc` module.

- Postgres source (WAL-based CDC)
- Kafka as the message bus
- Uses XTDB's native CDC ingestion (no custom code)
- Fully self-contained via docker-compose

```bash
cd cdc-demo
docker compose up -d
```

## Key Concepts

### Schema-less Ingestion
Expand Down Expand Up @@ -63,14 +77,15 @@ The demos handle both full Debezium envelope format and the flattened format (vi

## Comparison

| Feature | Static JSON | Live CDC (debezium-xtdb) |
|---------|-------------|--------------------------|
| Real database | No | Yes (MySQL/MariaDB) |
| Kafka required | No | No |
| CDC engine | None | Debezium Embedded |
| Latency | N/A | Sub-second |
| Setup complexity | Minimal | Medium (MariaDB install) |
| Best for | Learning | Development/Testing |
| Feature | Static JSON | Live CDC (debezium-xtdb) | Docker Compose (cdc-demo) |
|---------|-------------|--------------------------|---------------------------|
| Real database | No | Yes (MySQL/MariaDB) | Yes (Postgres) |
| Kafka required | No | No | Yes |
| CDC engine | None | Debezium Embedded | Debezium Connect |
| XTDB ingestion | Custom (JDBC) | Custom (JDBC) | Built-in `debezium-cdc` |
| Latency | N/A | Sub-second | Sub-second |
| Setup complexity | Minimal | Medium (MariaDB install) | Medium (Docker) |
| Best for | Learning | Development/Testing | Production-like demo |

## Architecture

Expand Down
62 changes: 62 additions & 0 deletions debezium/cdc-demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Debezium CDC Demo

Demonstrates end-to-end Change Data Capture from Postgres into XTDB via Debezium and Kafka.

## Prerequisites

Build the Docker image (includes the debezium module):

```bash
./docker/scripts/build-aws-image.sh --clean
```

## Start the stack

```bash
cd debezium/cdc-demo
docker compose up -d
```

Wait ~30s for all services to stabilise.
Check status with `docker compose ps` — `debezium-init` and `minio-setup` will show as exited (expected, they're one-shot).

## Run the demo

To submit transactions to Postgres:

```bash
PGPASSWORD=postgres psql -h localhost -p 5434 -U postgres -d sourcedb
```

To watch messages on the kafka topic:

```bash
docker compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic sourcedb.public.cdc_demo --from-beginning 2>/dev/null | jq .
```

To watch for changes in XTDB:
```bash
watch -n 0.5 "psql -h localhost -p 5433 -U xtdb -d xtdb -c \"SELECT *, _valid_time FROM public.cdc_demo FOR ALL VALID_TIME ORDER BY _id, _valid_from\""
```

### 1. Create a table and insert data in Postgres

```sql
CREATE TABLE cdc_demo (_id INT PRIMARY KEY, name TEXT, email TEXT);
INSERT INTO cdc_demo VALUES (1, 'Alice', 'alice@example.com');
INSERT INTO cdc_demo VALUES (2, 'Bob', 'bob@example.com');
```

### 2. More mutations

```sql
UPDATE cdc_demo SET email = 'alice-new@example.com' WHERE _id = 1;
DELETE FROM cdc_demo WHERE _id = 2;
INSERT INTO cdc_demo VALUES (3, 'Charlie', 'charlie@example.com');
```

## Tear down

```bash
docker compose down -v
```
32 changes: 32 additions & 0 deletions debezium/cdc-demo/dc_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
server:
host: '*'
port: 5432

flightSql:
host: '*'
port: 9832

logClusters:
kafkaCluster: !Kafka
bootstrapServers: !Env KAFKA_BOOTSTRAP_SERVERS

log: !Kafka
cluster: kafkaCluster
topic: !Env XTDB_LOG_TOPIC

storage: !Remote
objectStore: !S3
bucket: !Env XTDB_S3_BUCKET
prefix: "xtdb-object-store"
endpoint: !Env XTDB_S3_ENDPOINT
credentials:
accessKey: !Env ACCESS_KEY
secretKey: !Env SECRET_KEY
pathStyleAccessEnabled: true

diskCache:
path: /var/lib/xtdb/buffers

healthz:
host: '*'
port: 8080
148 changes: 148 additions & 0 deletions debezium/cdc-demo/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
version: '3'
services:
minio:
image: minio/minio
ports:
- "9000:9000"
- "8090:8090"
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
volumes:
- minio_data:/data
command: server /data --console-address ":8090"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 5s
timeout: 5s
retries: 3

# a service simply to create the bucket
minio-setup:
image: minio/mc
depends_on:
minio:
condition: service_healthy
entrypoint: >
/bin/sh -c "
mc alias set myminio http://minio:9000 minioadmin minioadmin &&
mc mb --ignore-existing myminio/xtdb-bucket &&
echo 'MinIO bucket created successfully!'"

kafka:
image: confluentinc/cp-kafka:7.7.1
expose:
- 9092
- 9999
- 29092
- 29093
ports:
- 29092:29092
- 29093:29093
environment:
CLUSTER_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_NODE_ID: "1"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
KAFKA_LISTENERS: "PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"

# source postgres
postgres:
image: postgres:17-alpine
ports:
- "5434:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sourcedb
command: ["postgres", "-c", "wal_level=logical"]
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5

# captures PG WAL → Kafka
debezium-connect:
image: quay.io/debezium/connect:3.0
depends_on:
kafka:
condition: service_started
postgres:
condition: service_healthy
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_statuses

debezium-init:
image: curlimages/curl
depends_on:
- debezium-connect
- postgres
volumes:
- ./register-connector.sh:/register-connector.sh:ro
entrypoint: ["/bin/sh", "/register-connector.sh"]

# main XTDB node (queryable via pgwire on :5433)
xtdb:
image: xtdb/xtdb-aws:latest
depends_on:
- kafka
- minio-setup
expose:
- 8080
- 5432
- 9832
ports:
- "8081:8080"
- "5433:5432"
- "9832:9832"
environment:
AWS_REGION: "aws-iso-global"
AWS_S3_FORCE_PATH_STYLE: "true"
AWS_S3_USE_VIRTUAL_HOSTING: "false"
XTDB_NODE_ID: "xt-node-1"
KAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
XTDB_LOG_TOPIC: "xt_log"
XTDB_S3_BUCKET: "xtdb-bucket"
XTDB_S3_ENDPOINT: "http://minio:9000"
ACCESS_KEY: "minioadmin"
SECRET_KEY: "minioadmin"
command: ["-f", "/config/dc_config.yaml"]
volumes:
- ./dc_config.yaml:/config/dc_config.yaml

# XTDB CDC node
xtdb-cdc:
image: xtdb/xtdb-aws:latest
depends_on:
- kafka
- minio-setup
- debezium-init
environment:
AWS_REGION: "aws-iso-global"
AWS_S3_FORCE_PATH_STYLE: "true"
AWS_S3_USE_VIRTUAL_HOSTING: "false"
XTDB_NODE_ID: "xt-cdc-node"
KAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
XTDB_LOG_TOPIC: "xt_log"
XTDB_S3_BUCKET: "xtdb-bucket"
XTDB_S3_ENDPOINT: "http://minio:9000"
ACCESS_KEY: "minioadmin"
SECRET_KEY: "minioadmin"
command: ["debezium-cdc", "--kafka-cluster", "kafkaCluster", "-t", "sourcedb.public.cdc_demo", "-f", "/config/dc_config.yaml"]
volumes:
- ./dc_config.yaml:/config/dc_config.yaml

volumes:
minio_data:
30 changes: 30 additions & 0 deletions debezium/cdc-demo/register-connector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/sh
set -e

CONNECT_URL="http://debezium-connect:8083"

echo "Waiting for Debezium Connect to be ready..."
until curl -sf "$CONNECT_URL/connectors" > /dev/null 2>&1; do
sleep 2
done
echo "Debezium Connect is ready."

curl -sf -X POST "$CONNECT_URL/connectors" \
-H "Content-Type: application/json" \
-d '{
"name": "sourcedb-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "sourcedb",
"topic.prefix": "sourcedb",
"schema.include.list": "public",
"plugin.name": "pgoutput"
}
}'

echo ""
echo "Connector registered successfully."