Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 367 additions & 1 deletion misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,9 @@ def final_preflight_check(self) -> None:
def sanity_restart_mz(self) -> None:
"""Restart Materialized if it is part of the composition to find
problems with persisted objects, functions as a sanity check."""
if self.is_running("materialized"):
self._check_console_queries()

if (
"materialized" in self.compose["services"]
and "labels" in self.compose["services"]["materialized"]
Expand Down Expand Up @@ -1204,6 +1207,369 @@ def sanity_restart_mz(self) -> None:
"Sanity Restart skipped because Mz not in services or `sanity_restart` label not set"
)

# Queries from the web console that should complete in < 1s.
# Keep in sync with the console frontend.
CONSOLE_QUERIES: dict[str, str] = {
"clusterDeploymentLineage": """
SELECT
cluster_id AS "clusterId",
current_deployment_cluster_id AS "currentDeploymentClusterId",
cluster_name AS "clusterName"
FROM mz_cluster_deployment_lineage
WHERE current_deployment_cluster_id IN ('s2')
""",
"clustersList": """
SELECT
owners."isOwner",
c.id,
c.name,
c.disk,
c.managed,
c.size,
(
SELECT COALESCE(jsonb_agg(agg), '[]')
FROM (
SELECT
cr.id,
cr.name,
cr.size,
cr.disk,
(
SELECT COALESCE(jsonb_agg(agg), '[]')
FROM (
SELECT
crs_inner.replica_id,
crs_inner.process_id,
crs_inner.status,
crs_inner.reason,
crs_inner.updated_at
FROM mz_cluster_replica_statuses AS crs_inner
WHERE crs_inner.replica_id = c.id
) AS agg
) AS statuses
FROM mz_cluster_replicas AS cr
WHERE cr.cluster_id = c.id
ORDER BY cr.id
) AS agg
) AS replicas,
latest_cluster_status_update.latest_status_update AS "latestStatusUpdate"
FROM
mz_clusters AS c
JOIN (
SELECT
c.id AS cluster_id,
max(crsh.occurred_at) AS latest_status_update
FROM
mz_clusters AS c
LEFT JOIN mz_cluster_replica_history AS crh ON crh.cluster_id = c.id
LEFT JOIN mz_cluster_replica_status_history AS crsh ON crh.replica_id = crsh.replica_id
GROUP BY c.id
) AS latest_cluster_status_update ON latest_cluster_status_update.cluster_id = c.id
JOIN (
SELECT
r.id,
r.name,
((SELECT mz_is_superuser() OR current_setting('enable_rbac_checks') = 'off')
OR has_role(current_user, r.oid, 'USAGE')) AS "isOwner"
FROM mz_roles AS r
) AS owners ON owners.id = c.owner_id
ORDER BY c.name
""",
"consoleClusterUtilizationOverview": """
SELECT
bucket_start AS "bucketStart",
replica_id AS "replicaId",
memory_percent AS "maxMemoryPercent",
max_memory_at AS "maxMemoryAt",
disk_percent AS "maxDiskPercent",
max_disk_at AS "maxDiskAt",
max_cpu_percent AS "maxCpuPercent",
max_cpu_at AS "maxCpuAt",
heap_percent AS "maxHeapPercent",
max_heap_at AS "maxHeapAt",
memory_and_disk_percent AS "maxMemoryAndDiskPercent",
max_memory_and_disk_memory_percent AS "maxMemoryAndDiskMemoryPercent",
max_memory_and_disk_disk_percent AS "maxMemoryAndDiskDiskPercent",
max_memory_and_disk_at AS "maxMemoryAndDiskAt",
offline_events AS "offlineEvents",
bucket_end AS "bucketEnd",
name,
cluster_id AS "clusterId",
size
FROM mz_console_cluster_utilization_overview
WHERE cluster_id IN ('s2')
ORDER BY "bucketStart"
""",
"lagHistory": """
WITH
lag_history_with_temporal_filter AS (
SELECT occurred_at, lag, object_id
FROM mz_wallclock_global_lag_recent_history
WHERE occurred_at + INTERVAL '3600000 MILLISECONDS' >= mz_now()
),
lag_history_binned AS (
SELECT
date_bin('60000 MILLISECONDS', occurred_at, '1970-01-01'::timestamp) AS bucket_start,
lag,
object_id
FROM lag_history_with_temporal_filter
),
lag_history_binned_by_max_lag AS (
SELECT DISTINCT ON (bucket_start, object_id)
bucket_start, object_id, lag
FROM lag_history_binned
ORDER BY bucket_start DESC, object_id, lag DESC
),
lag_history AS (
SELECT
lag_history.bucket_start AS "bucketStart",
clusters.id AS "clusterId",
lag_history.lag,
lag_history.object_id AS "objectId",
clusters.name AS "clusterName",
object_names.database_name AS "databaseName",
object_names.schema_name AS "schemaName",
object_names.name AS "objectName"
FROM
lag_history_binned_by_max_lag AS lag_history
JOIN mz_objects AS objects ON lag_history.object_id = objects.id
JOIN mz_clusters AS clusters ON clusters.id = objects.cluster_id
JOIN mz_object_fully_qualified_names AS object_names ON lag_history.object_id = object_names.id
WHERE clusters.id = 's2'
)
SELECT * FROM lag_history
ORDER BY "bucketStart" ASC, lag DESC
""",
"largestClusterReplica": """
SELECT
cr.name,
cr.size,
crhm.heap_limit::text AS "heapLimit",
bool_and(hs.hydrated) AS "isHydrated"
FROM
mz_cluster_replicas AS cr
JOIN mz_cluster_replica_sizes AS crs ON cr.size = crs.size
LEFT JOIN (
SELECT replica_id, hydrated
FROM mz_hydration_statuses
WHERE object_id NOT LIKE 's%'
) AS hs ON cr.id = hs.replica_id
LEFT JOIN (
SELECT
crm.replica_id,
max(crm.heap_limit) AS heap_limit,
max(crm.heap_bytes) AS heap_bytes
FROM mz_cluster_replica_metrics AS crm
GROUP BY crm.replica_id
) AS crhm ON crhm.replica_id = cr.id
WHERE cr.cluster_id = 's2'
GROUP BY cr.name, cr.size, crs.memory_bytes, crs.disk_bytes, crs.processes, crhm.heap_limit
ORDER BY "isHydrated" DESC NULLS LAST, "heapLimit" DESC NULLS LAST
LIMIT 1
""",
"largestMaintainedObjects": """
SELECT
cr.name,
cr.size,
crhm.heap_limit::text AS "heapLimit",
bool_and(hs.hydrated) AS "isHydrated"
FROM
mz_cluster_replicas AS cr
JOIN mz_cluster_replica_sizes AS crs ON cr.size = crs.size
LEFT JOIN (
SELECT replica_id, hydrated
FROM mz_hydration_statuses
WHERE object_id NOT LIKE 's%'
) AS hs ON cr.id = hs.replica_id
LEFT JOIN (
SELECT
crm.replica_id,
max(crm.heap_limit) AS heap_limit,
max(crm.heap_bytes) AS heap_bytes
FROM mz_cluster_replica_metrics AS crm
GROUP BY crm.replica_id
) AS crhm ON crhm.replica_id = cr.id
WHERE cr.cluster_id = 's2'
GROUP BY cr.name, cr.size, crs.memory_bytes, crs.disk_bytes, crs.processes, crhm.heap_limit
ORDER BY "isHydrated" DESC NULLS LAST, "heapLimit" DESC NULLS LAST
LIMIT 1
""",
"replicaUtilizationHistory": """
WITH
replica_history AS (
SELECT replica_id, cluster_id, size
FROM mz_cluster_replica_history
UNION
SELECT id AS replica_id, cluster_id, size
FROM mz_cluster_replicas
),
replica_name_history AS (
SELECT
id,
new_name AS name,
COALESCE(occurred_at, '1970-01-01'::timestamp) AS occurred_at
FROM mz_cluster_replica_name_history
),
replica_metrics_history AS (
SELECT
m.occurred_at,
m.replica_id,
r.size,
(sum(m.cpu_nano_cores::float8) / (NULLIF(s.cpu_nano_cores, 0) * s.processes)) AS cpu_percent,
(sum(m.memory_bytes::float8) / (NULLIF(s.memory_bytes, 0) * s.processes)) AS memory_percent,
(sum(m.disk_bytes::float8) / (NULLIF(s.disk_bytes, 0) * s.processes)) AS disk_percent,
sum(m.disk_bytes::float8) AS disk_bytes,
sum(m.memory_bytes::float8) AS memory_bytes,
s.disk_bytes * s.processes AS total_disk_bytes,
s.memory_bytes * s.processes AS total_memory_bytes,
max(m.heap_bytes::float8) AS heap_bytes,
max(m.heap_limit) AS heap_limit,
max(m.heap_bytes::float8 / NULLIF(m.heap_limit, 0)) AS heap_percent
FROM
replica_history AS r
JOIN mz_cluster_replica_sizes AS s ON r.size = s.size
JOIN mz_cluster_replica_metrics_history AS m ON m.replica_id = r.replica_id
GROUP BY m.occurred_at, m.replica_id, r.size, s.cpu_nano_cores, s.memory_bytes, s.disk_bytes, s.processes
),
replica_utilization_history_binned AS (
SELECT
m.occurred_at,
m.replica_id,
m.cpu_percent,
m.memory_percent,
m.memory_bytes,
m.disk_percent,
m.disk_bytes,
m.total_disk_bytes,
m.total_memory_bytes,
m.heap_bytes,
m.heap_percent,
m.size,
date_bin('60000 MILLISECONDS', occurred_at, '1970-01-01'::timestamp) AS bucket_start
FROM
replica_history AS r
JOIN replica_metrics_history AS m ON m.replica_id = r.replica_id
WHERE occurred_at >= date_bin('60000 MILLISECONDS', '{now}'::timestamp, '1970-01-01'::timestamp)
),
max_memory AS (
SELECT DISTINCT ON (bucket_start, replica_id)
bucket_start, replica_id, memory_percent, occurred_at
FROM replica_utilization_history_binned
ORDER BY bucket_start, replica_id, COALESCE(memory_bytes, 0) DESC
),
max_disk AS (
SELECT DISTINCT ON (bucket_start, replica_id)
bucket_start, replica_id, disk_percent, occurred_at
FROM replica_utilization_history_binned
ORDER BY bucket_start, replica_id, COALESCE(disk_bytes, 0) DESC
),
max_cpu AS (
SELECT DISTINCT ON (bucket_start, replica_id)
bucket_start, replica_id, cpu_percent, occurred_at
FROM replica_utilization_history_binned
ORDER BY bucket_start, replica_id, COALESCE(cpu_percent, 0) DESC
),
max_heap AS (
SELECT DISTINCT ON (bucket_start, replica_id)
bucket_start, replica_id, heap_percent, occurred_at
FROM replica_utilization_history_binned
ORDER BY bucket_start, replica_id, COALESCE(heap_bytes, 0) DESC
),
max_memory_and_disk AS (
SELECT DISTINCT ON (bucket_start, replica_id)
bucket_start, replica_id, memory_percent, disk_percent, memory_and_disk_percent, occurred_at
FROM (
SELECT *,
CASE
WHEN disk_bytes IS NULL AND memory_bytes IS NULL THEN NULL
ELSE (COALESCE(memory_bytes, 0) + COALESCE(disk_bytes, 0))
/ NULLIF(total_memory_bytes + total_disk_bytes, 0)
END AS memory_and_disk_percent
FROM replica_utilization_history_binned
) AS max_memory_and_disk_inner
ORDER BY bucket_start, replica_id, COALESCE(memory_and_disk_percent, 0) DESC
),
replica_offline_event_history AS (
SELECT
date_bin('60000 MILLISECONDS', occurred_at, '1970-01-01'::timestamp) AS bucket_start,
replica_id,
jsonb_agg(jsonb_build_object(
'replicaId', rsh.replica_id,
'occurredAt', rsh.occurred_at,
'status', rsh.status,
'reason', rsh.reason
)) AS offline_events
FROM mz_cluster_replica_status_history AS rsh
WHERE process_id = '0' AND status = 'offline'
AND occurred_at >= date_bin('60000 MILLISECONDS', '{now}'::timestamp, '1970-01-01'::timestamp)
GROUP BY bucket_start, replica_id
)
SELECT
max_memory.bucket_start AS "bucketStart",
max_memory.replica_id AS "replicaId",
max_memory.memory_percent AS "maxMemoryPercent",
max_memory.occurred_at AS "maxMemoryAt",
max_disk.disk_percent AS "maxDiskPercent",
max_disk.occurred_at AS "maxDiskAt",
max_memory_and_disk.memory_and_disk_percent AS "maxMemoryAndDiskPercent",
max_memory_and_disk.memory_percent AS "maxMemoryAndDiskMemoryPercent",
max_memory_and_disk.disk_percent AS "maxMemoryAndDiskDiskPercent",
max_memory_and_disk.occurred_at AS "maxMemoryAndDiskAt",
max_cpu.cpu_percent AS "maxCpuPercent",
max_cpu.occurred_at AS "maxCpuAt",
max_heap.heap_percent AS "maxHeapPercent",
max_heap.occurred_at AS "maxHeapAt",
replica_offline_event_history.offline_events AS "offlineEvents",
max_memory.bucket_start + INTERVAL '60000 MILLISECONDS' AS "bucketEnd",
replica_name_history.name,
replica_history.cluster_id AS "clusterId",
replica_history.size
FROM
max_memory
JOIN max_disk ON max_memory.bucket_start = max_disk.bucket_start AND max_memory.replica_id = max_disk.replica_id
JOIN max_cpu ON max_memory.bucket_start = max_cpu.bucket_start AND max_memory.replica_id = max_cpu.replica_id
JOIN max_heap ON max_memory.bucket_start = max_heap.bucket_start AND max_memory.replica_id = max_heap.replica_id
JOIN max_memory_and_disk ON max_memory.bucket_start = max_memory_and_disk.bucket_start AND max_memory.replica_id = max_memory_and_disk.replica_id
JOIN replica_history ON max_memory.replica_id = replica_history.replica_id
JOIN LATERAL (
SELECT *
FROM replica_name_history
WHERE max_memory.replica_id = replica_name_history.id
AND max_memory.bucket_start + INTERVAL '60000 MILLISECONDS' >= replica_name_history.occurred_at
ORDER BY replica_name_history.occurred_at DESC
LIMIT 1
) AS replica_name_history ON true
LEFT JOIN replica_offline_event_history ON max_memory.bucket_start = replica_offline_event_history.bucket_start
AND max_memory.replica_id = replica_offline_event_history.replica_id
WHERE replica_history.cluster_id IN ('s2')
ORDER BY "bucketStart"
""",
}

def _check_console_queries(self) -> None:
"""Run console queries and fail if any take > 1s."""
thirty_min_ago = time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - 30 * 60)
)
slow_queries = []
for name, query_template in self.CONSOLE_QUERIES.items():
query = query_template.replace("{now}", thirty_min_ago)
start_time = time.time()
try:
self.sql(query)
except Exception as e:
print(f"Console query {name} failed: {e}")
continue
elapsed = time.time() - start_time
if elapsed > 1:
slow_queries.append((name, elapsed))
print(f"Slow console query: {name} took {elapsed:.1f}s")
if slow_queries:
details = ", ".join(
f"{name} ({elapsed:.1f}s)" for name, elapsed in slow_queries
)
raise AssertionError(f"Slow console queries (> 1s): {details}")

def metadata_store(self) -> str:
for name in ["cockroach", "postgres-metadata", "alloydb", "foundationdb"]:
if name in self.compose["services"]:
Expand Down Expand Up @@ -1253,7 +1619,7 @@ def down(
"""
if os.getenv("CI_FINAL_PREFLIGHT_CHECK_VERSION") is not None:
self.final_preflight_check()
elif sanity_restart_mz and self.is_sanity_restart_mz:
else:
self.sanity_restart_mz()
self.capture_logs()
self.invoke(
Expand Down
1 change: 1 addition & 0 deletions test/limits/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,7 @@ def run_scenarios(
start_time = time.time()
c.testdrive(f.getvalue(), quiet=True).stdout
wallclock = time.time() - start_time
c.sanity_restart_mz()
except Exception as e:
print(
f"Failed scenario {scenario.__name__} with count {scenario.COUNT}: {e}"
Expand Down
Loading