Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion misc/python/materialize/workload_replay/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,5 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any:
]

else:
raise ValueError(f"Unhandled data type {self.typ}")
# Custom data type, or not supported yet
return "NULL" if in_query else None
6 changes: 5 additions & 1 deletion misc/python/materialize/workload_replay/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ def test(
services.update(["kafka", "schema-registry", "zookeeper"])
elif connection["type"] == "ssh-tunnel":
services.add("ssh-bastion-host")
elif connection["type"] in ("aws-privatelink", "aws"):
elif connection["type"] == "iceberg-catalog":
pass # handled by setup_polaris_for_iceberg in objects.py
elif connection["type"] == "aws-privatelink":
pass # can't run outside of cloud
elif connection["type"] == "aws":
pass # handled together with iceberg-catalog when present
else:
raise ValueError(f"Unhandled connection type {connection['type']}")

Expand Down
98 changes: 97 additions & 1 deletion misc/python/materialize/workload_replay/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
from psycopg.sql import SQL, Identifier, Literal

from materialize.mzcompose.composition import Composition
from materialize.mzcompose.helpers.iceberg import (
create_polaris_namespace,
get_polaris_access_token,
setup_polaris_for_iceberg,
)
from materialize.mzcompose.services.sql_server import SqlServer
from materialize.workload_replay.util import (
get_kafka_topic,
Expand Down Expand Up @@ -192,6 +197,40 @@ def run_create_objects_part_1(
)
)

has_iceberg = any(
connection["type"] == "iceberg-catalog"
for schemas in workload["databases"].values()
for items in schemas.values()
for connection in items["connections"].values()
)
iceberg_credentials: tuple[str, str] | None = None
if has_iceberg:
print("Setting up Polaris for Iceberg sinks")
c.sql(
"ALTER SYSTEM SET enable_iceberg_sink = true",
user="mz_system",
port=6877,
print_statement=verbose,
)
iceberg_credentials = setup_polaris_for_iceberg(c)
# Create any additional namespaces referenced by iceberg sinks
namespaces: set[str] = set()
for schemas in workload["databases"].values():
for items in schemas.values():
for sink in items["sinks"].values():
match = re.search(
r"NAMESPACE\s*=?\s*'([^']+)'",
sink.get("create_sql", ""),
re.IGNORECASE,
)
if match:
namespaces.add(match.group(1))
namespaces.discard("default_namespace")
if namespaces:
access_token = get_polaris_access_token(c)
for ns in namespaces:
create_polaris_namespace(c, access_token, namespace=ns)

print("Creating connections")
existing_dbs = {"postgres": {"postgres"}, "sql-server": set()}
for db, schemas in workload["databases"].items():
Expand Down Expand Up @@ -301,7 +340,64 @@ def run_create_objects_part_1(
port=6877,
print_statement=verbose,
)
elif connection["type"] in ("aws-privatelink", "aws"):
elif connection["type"] == "aws":
if iceberg_credentials is not None:
username, key = iceberg_credentials
secret_name = f"{name}_secret"
c.sql(
SQL("CREATE SECRET {}.{}.{} AS {}").format(
Identifier(db),
Identifier(schema),
Identifier(secret_name),
Literal(key),
),
user="mz_system",
port=6877,
print_statement=verbose,
)
c.sql(
SQL(
"CREATE CONNECTION {}.{}.{} TO AWS ("
"ACCESS KEY ID = {}, "
"SECRET ACCESS KEY = SECRET {}.{}.{}, "
"ENDPOINT = 'http://minio:9000/', "
"REGION = 'us-east-1')"
).format(
Identifier(db),
Identifier(schema),
Identifier(name),
Literal(username),
Identifier(db),
Identifier(schema),
Identifier(secret_name),
),
user="mz_system",
port=6877,
print_statement=verbose,
)
# else: skip, can't run outside of cloud
elif connection["type"] == "iceberg-catalog":
assert (
iceberg_credentials is not None
), "Iceberg catalog connection requires polaris service"
c.sql(
SQL(
"CREATE CONNECTION {}.{}.{} TO ICEBERG CATALOG ("
"CATALOG TYPE = 'REST', "
"URL = 'http://polaris:8181/api/catalog', "
"CREDENTIAL = 'root:root', "
"WAREHOUSE = 'default_catalog', "
"SCOPE = 'PRINCIPAL_ROLE:ALL')"
).format(
Identifier(db),
Identifier(schema),
Identifier(name),
),
user="mz_system",
port=6877,
print_statement=verbose,
)
elif connection["type"] == "aws-privatelink":
pass # can't run outside of cloud
else:
raise ValueError(f"Unhandled connection type {connection['type']}")
Expand Down
6 changes: 6 additions & 0 deletions test/workload-replay/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from materialize.mzcompose.services.azurite import Azurite
from materialize.mzcompose.services.kafka import Kafka
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.minio import Mc, Minio
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.polaris import Polaris, PolarisBootstrap
from materialize.mzcompose.services.postgres import Postgres
from materialize.mzcompose.services.redpanda import Redpanda
from materialize.mzcompose.services.schema_registry import SchemaRegistry
Expand Down Expand Up @@ -59,6 +61,10 @@
Postgres(),
MySql(),
Azurite(),
Minio(),
Mc(),
PolarisBootstrap(),
Polaris(),
Mz(app_password=""),
Materialized(
cluster_replica_size=cluster_replica_sizes,
Expand Down