Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ case "$cmd" in
--env QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME
--env QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD
# For Miri with nightly Rust
--env ZOOKEEPER_ADDR
--env KAFKA_ADDRS
--env SCHEMA_REGISTRY_URL
--env STEP_START_TIMESTAMP_WITH_TZ
Expand Down
12 changes: 0 additions & 12 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,6 @@ steps:
- group: Kafka
key: kafka
steps:
- id: kafka-matrix
label: Previous Kafka versions
depends_on: build-aarch64
timeout_in_minutes: 60
parallelism: 2
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: kafka-matrix
skip: "https://github.com/MaterializeInc/database-issues/issues/9510"

- id: kafka-multi-broker
label: Kafka multi-broker
depends_on: build-aarch64
Expand Down
40 changes: 40 additions & 0 deletions ci/release-qualification/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ steps:
CI_FINAL_PREFLIGHT_CHECK_VERSION: "${BUILDKITE_TAG}"
CI_FINAL_PREFLIGHT_CHECK_ROLLBACK: 1

- id: kafka-matrix
label: Previous Kafka versions
depends_on: build-aarch64
timeout_in_minutes: 60
parallelism: 2
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: kafka-matrix

- group: "MySQL: other versions"
key: mysql-versions
steps:
Expand Down Expand Up @@ -470,6 +481,35 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- group: "SQL Server: other versions"
key: sql-server-versions
steps:
# SQL Server's upstream images are amd64-only, so this matrix runs on
# x86-64 agents.
- id: sql-server-cdc-2019
label: "SQL Server CDC w/ 2019"
depends_on: build-x86_64
timeout_in_minutes: 60
inputs: [test/sql-server-cdc]
plugins:
- ./ci/plugins/mzcompose:
composition: sql-server-cdc
args: [ "--sql-server-version=2019-CU32-ubuntu-20.04" ]
agents:
queue: hetzner-x86-64-4cpu-8gb

- id: sql-server-cdc-2025
label: "SQL Server CDC w/ 2025"
depends_on: build-x86_64
timeout_in_minutes: 60
inputs: [test/sql-server-cdc]
plugins:
- ./ci/plugins/mzcompose:
composition: sql-server-cdc
args: [ "--sql-server-version=2025-CU5-ubuntu-24.04" ]
agents:
queue: hetzner-x86-64-4cpu-8gb

- group: "Platform checks"
key: platform-checks
steps:
Expand Down
11 changes: 5 additions & 6 deletions ci/test/cargo-test/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,24 @@
from materialize.mzcompose.services.minio import Minio
from materialize.mzcompose.services.postgres import Postgres
from materialize.mzcompose.services.schema_registry import SchemaRegistry
from materialize.mzcompose.services.zookeeper import Zookeeper
from materialize.rustc_flags import Sanitizer
from materialize.util import PropagatingThread
from materialize.xcompile import Arch, target

FDB_PORT = 40108

SERVICES = [
Zookeeper(),
Kafka(
# We need a stable port to advertise, so pick one that is unlikely to
# conflict with a Kafka cluster running on the local machine.
ports=["30123:30123"],
allow_host_ports=True,
advertised_listeners=[
"HOST://localhost:30123",
"PLAINTEXT://kafka:9092",
],
environment_extra=[
"KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
],
),
SchemaRegistry(),
Expand Down Expand Up @@ -119,7 +120,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
os.remove(junit_path)

c.up(
"zookeeper",
"kafka",
"schema-registry",
"postgres",
Expand All @@ -146,7 +146,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:

env = dict(
os.environ,
ZOOKEEPER_ADDR=f"localhost:{c.default_port('zookeeper')}",
KAFKA_ADDRS="localhost:30123",
SCHEMA_REGISTRY_URL=f"http://localhost:{c.default_port('schema-registry')}",
POSTGRES_URL=postgres_url,
Expand Down
48 changes: 48 additions & 0 deletions misc/mzcompose/kafka/ensure-with-scram.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env bash
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
#
# Drop-in replacement for cp-kafka's /etc/confluent/docker/ensure that
# additionally bootstraps KRaft SCRAM users at storage-format time.
#
# In Kafka 4.x KRaft, the runtime `kafka-configs --alter --add-config=SCRAM-*`
# path is fragile: altering both SCRAM-SHA-256 and SCRAM-SHA-512 for the same
# user in one request is rejected, and splitting into two requests creates
# records that the broker rejects as "invalid credentials" during SASL auth.
# `kafka-storage format --add-scram ...` at first boot is the supported path.
#
# Configure users by setting KAFKA_INIT_SCRAM_USERS to a semicolon-separated
# list of entries of the form:
# SCRAM-SHA-256=[name=alice,password=secret];SCRAM-SHA-512=[name=alice,password=secret]

set -euo pipefail

# shellcheck source=/dev/null
. /etc/confluent/docker/bash-config

export KAFKA_DATA_DIRS=${KAFKA_DATA_DIRS:-"/var/lib/kafka/data"}
echo "===> Check if $KAFKA_DATA_DIRS is writable ..."
ub path "$KAFKA_DATA_DIRS" writable

echo "===> Using provided cluster id $CLUSTER_ID ..."

format_args=(--cluster-id="$CLUSTER_ID" -c /etc/kafka/kafka.properties)
if [[ -n "${KAFKA_INIT_SCRAM_USERS:-}" ]]; then
IFS=';' read -ra entries <<<"$KAFKA_INIT_SCRAM_USERS"
for entry in "${entries[@]}"; do
format_args+=(--add-scram "$entry")
done
fi

# Not erroring out if storage is already formatted matches the original
# cp-kafka ensure script. --add-scram only applies on first format.
result=$(kafka-storage format "${format_args[@]}" 2>&1) || \
echo "$result" | grep -i "already formatted" || \
{ echo "$result" && exit 1; }
echo "$result"
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
say = ui.speaker("C> ")


DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.4"
DEFAULT_CONFLUENT_PLATFORM_VERSION = "8.2.0"

DEFAULT_MZ_VOLUMES = [
"mzdata:/mzdata",
Expand Down
32 changes: 18 additions & 14 deletions misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ def up(
if isinstance(service, Service) and service.idle
]
)
old_compose = None
if idle:
old_compose = copy.deepcopy(self.compose)
for service_name, service in self.compose["services"].items():
Expand All @@ -1212,20 +1213,23 @@ def up(
for service in services
]

self.capture_logs()
self.invoke(
"up",
*(["--detach"] if detach else []),
*(["--wait"] if wait else []),
*(["--quiet-pull"] if ui.env_is_truthy("CI") else []),
*service_names,
max_tries=300 if os.getenv("CI_WAITING_FOR_BUILD") else max_tries,
build=os.getenv("CI_WAITING_FOR_BUILD"),
)

if idle:
self.compose = old_compose # type: ignore
self._invalidate_compose_files()
try:
self.capture_logs()
self.invoke(
"up",
*(["--detach"] if detach else []),
*(["--wait"] if wait else []),
*(["--quiet-pull"] if ui.env_is_truthy("CI") else []),
*service_names,
max_tries=300 if os.getenv("CI_WAITING_FOR_BUILD") else max_tries,
build=os.getenv("CI_WAITING_FOR_BUILD"),
)
finally:
# Restore even on failure: otherwise the next test in the same
# composition would inherit the sleep-infinity entrypoint.
if old_compose is not None:
self.compose = old_compose # type: ignore
self._invalidate_compose_files()

def validate_sources_sinks_clusters(self) -> str | None:
"""Validate that all sources, sinks & clusters are in a good state"""
Expand Down
7 changes: 3 additions & 4 deletions misc/python/materialize/mzcompose/services/cockroach.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@


class Cockroach(Service):
# TODO(def-): Bump when https://github.com/cockroachdb/cockroach/issues/158051 is fixed
DEFAULT_COCKROACH_TAG = "v24.2.0"
DEFAULT_COCKROACH_TAG = "v25.4.10"

def __init__(
self,
Expand Down Expand Up @@ -100,7 +99,7 @@ def backup(c: Composition) -> None:
"-e",
"""
CREATE EXTERNAL CONNECTION backup_bucket
AS 's3://persist/crdb-backup?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin';
AS 's3://persist/crdb-backup?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin&AWS_USE_PATH_STYLE=true';
BACKUP INTO 'external://backup_bucket';
DROP EXTERNAL CONNECTION backup_bucket;
""",
Expand All @@ -120,7 +119,7 @@ def restore(
"""
DROP DATABASE defaultdb;
CREATE EXTERNAL CONNECTION backup_bucket
AS 's3://persist/crdb-backup?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin';
AS 's3://persist/crdb-backup?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin&AWS_USE_PATH_STYLE=true';
RESTORE DATABASE defaultdb
FROM LATEST IN 'external://backup_bucket';
DROP EXTERNAL CONNECTION backup_bucket;
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/services/grafana.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, name: str = "grafana") -> None:
super().__init__(
name=name,
config={
"image": "grafana/grafana:12.2.0",
"image": "grafana/grafana:13.0.1",
"ports": ["3000"],
"environment": [
"GF_AUTH_ANONYMOUS_ENABLED=true",
Expand Down
89 changes: 73 additions & 16 deletions misc/python/materialize/mzcompose/services/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@
from materialize.mzcompose.service import (
Service,
ServiceConfig,
ServiceDependency,
)

# Cluster ID used to format the KRaft metadata log on first boot. Must be a
# 22-character URL-safe base64 string (16 raw bytes). Decodes to the ASCII
# bytes of "MaterializeKafka".
DEFAULT_KAFKA_CLUSTER_ID = "TWF0ZXJpYWxpemVLYWZrYQ"


class Kafka(Service):
def __init__(
Expand All @@ -29,28 +35,71 @@ def __init__(
broker_id: int = 1,
offsets_topic_replication_factor: int = 1,
advertised_listeners: list[str] = [],
environment: list[str] = [
"KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
"KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false",
"KAFKA_MIN_INSYNC_REPLICAS=1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
"KAFKA_MESSAGE_MAX_BYTES=15728640",
"KAFKA_REPLICA_FETCH_MAX_BYTES=15728640",
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100",
],
listeners: list[str] | None = None,
environment: list[str] | None = None,
environment_extra: list[str] = [],
depends_on_extra: list[str] = [],
volumes: list[str] = [],
platform: str | None = None,
use_zookeeper: bool = False,
controller_quorum_voters: str | None = None,
controller_port: int = 9093,
cluster_id: str = DEFAULT_KAFKA_CLUSTER_ID,
) -> None:
if not advertised_listeners:
advertised_listeners = [f"PLAINTEXT://{name}:9092"]

if environment is None:
environment = [
"KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false",
"KAFKA_MIN_INSYNC_REPLICAS=1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
"KAFKA_MESSAGE_MAX_BYTES=15728640",
"KAFKA_REPLICA_FETCH_MAX_BYTES=15728640",
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100",
]
if use_zookeeper:
environment = [
"KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
*environment,
]

if use_zookeeper:
mode_environment = [
f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
f"KAFKA_BROKER_ID={broker_id}",
]
depends_on: dict[str, ServiceDependency] = {
"zookeeper": ServiceDependency(condition="service_started"),
}
else:
if listeners is None:
listeners = []
for adv in advertised_listeners:
proto, _, hostport = adv.partition("://")
_, _, port = hostport.partition(":")
listeners.append(f"{proto}://0.0.0.0:{port}")
listeners.append(f"CONTROLLER://0.0.0.0:{controller_port}")
if controller_quorum_voters is None:
controller_quorum_voters = f"{broker_id}@{name}:{controller_port}"
mode_environment = [
f"KAFKA_NODE_ID={broker_id}",
"KAFKA_PROCESS_ROLES=broker,controller",
f"KAFKA_CONTROLLER_QUORUM_VOTERS={controller_quorum_voters}",
f"KAFKA_LISTENERS={','.join(listeners)}",
f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
"KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER",
"KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
f"CLUSTER_ID={cluster_id}",
]
depends_on = {}

environment = [
*environment,
f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
f"KAFKA_BROKER_ID={broker_id}",
*mode_environment,
*environment_extra,
]
if ports is None:
Expand All @@ -65,11 +114,19 @@ def __init__(
f"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR={offsets_topic_replication_factor}",
],
"depends_on": {
"zookeeper": {"condition": "service_started"},
**{s: {"condition": "service_started"} for s in depends_on_extra},
**depends_on,
**{
s: ServiceDependency(condition="service_started")
for s in depends_on_extra
},
},
"healthcheck": {
"test": ["CMD", "nc", "-z", "localhost", "9092"],
# cp-kafka 8.x dropped `nc`, so use bash's built-in /dev/tcp
# to probe the broker port without any extra dependencies.
"test": [
"CMD-SHELL",
"bash -c 'exec 3<>/dev/tcp/localhost/9092'",
],
"interval": "1s",
"start_period": "120s",
},
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/services/localstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Localstack(Service):
def __init__(
self,
name: str = "localstack",
image: str = "localstack/localstack:4.12.0",
image: str = "localstack/localstack:4.14.0",
port: int = 4566,
environment: list[str] = ["LOCALSTACK_HOST=localstack"],
volumes: list[str] = ["/var/run/docker.sock:/var/run/docker.sock"],
Expand Down
Loading
Loading