Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions console/src/api/materialize/cluster/dataflowCpuPerWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

import { QueryKey } from "@tanstack/react-query";
import { InferResult, sql } from "kysely";

import {
buildSessionVariables,
executeSqlV2,
queryBuilder,
} from "~/api/materialize";

export type DataflowCpuPerWorkerParams = {
clusterName: string;
replicaName: string;
};

/**
* One row per (dataflow on this cluster, worker on the selected replica) with
* total CPU elapsed since the replica started. Powers the cluster CPU heatmap.
*
* Joined paths:
* - mz_scheduling_elapsed_per_worker x mz_dataflow_operator_dataflows: per-op
* CPU by worker, summed to the dataflow root (operator name `Dataflow:%`).
* - mz_compute_exports: dataflow_id -> the GlobalId of the exported object,
* so each row links back to its index / materialized view / subscription.
* - mz_objects / mz_schemas / mz_databases: human-readable identity.
*/
export function buildDataflowCpuPerWorkerQuery() {
return (
queryBuilder
.selectFrom("mz_scheduling_elapsed_per_worker as mse")
.innerJoin("mz_dataflow_operator_dataflows as dod", "dod.id", "mse.id")
.innerJoin(
"mz_compute_exports as ce",
"ce.dataflow_id",
"dod.dataflow_id",
)
.leftJoin("mz_objects as o", "o.id", "ce.export_id")
.leftJoin("mz_schemas as sc", "sc.id", "o.schema_id")
.leftJoin("mz_databases as da", "da.id", "sc.database_id")
.where("dod.name", "like", "Dataflow:%")
// Filter transient dataflows (peeks, subscribes) — they have ids like `t12`.
.where("ce.export_id", "not like", "t%")
.select((eb) => [
eb.ref("ce.export_id").as("objectId"),
eb.ref("o.name").as("objectName"),
eb.ref("sc.name").as("schemaName"),
eb.ref("da.name").as("databaseName"),
sql<"materialized-view" | "index" | "subscription">`o.type`.as(
"objectType",
),
eb.ref("dod.dataflow_name").as("dataflowName"),
sql<number>`${sql.id("mse", "worker_id")}::int`.as("workerId"),
sql<bigint>`sum(${sql.id("mse", "elapsed_ns")})::bigint`.as(
"elapsedNs",
),
])
.groupBy([
"ce.export_id",
"o.name",
"sc.name",
"da.name",
"o.type",
"dod.dataflow_name",
"mse.worker_id",
])
);
}

export async function fetchDataflowCpuPerWorker({
params,
queryKey,
requestOptions,
}: {
params: DataflowCpuPerWorkerParams;
queryKey: QueryKey;
requestOptions?: RequestInit;
}) {
const query = buildDataflowCpuPerWorkerQuery().compile();
return executeSqlV2({
sessionVariables: buildSessionVariables({
cluster: params.clusterName,
cluster_replica: params.replicaName,
}),
queries: query,
queryKey,
requestOptions,
});
}

export type DataflowCpuPerWorkerRow = InferResult<
ReturnType<typeof buildDataflowCpuPerWorkerQuery>
>[0];
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

import { QueryKey } from "@tanstack/react-query";
import { InferResult, sql } from "kysely";

import {
buildSessionVariables,
executeSqlV2,
queryBuilder,
} from "~/api/materialize";

export type OperatorCpuPerWorkerParams = {
/** GlobalId of the maintained object (index, materialized view). */
objectId: string;
clusterName: string;
replicaName: string;
};

/**
* One row per (operator within this object's dataflow, worker on the selected
* replica) with elapsed CPU since the replica started. Powers the heatmap on
* the object detail Performance tab.
*
* Structural operators are filtered out — the list mirrors the one in the
* upstream dataflow-troubleshooting prototype (Frank McSherry / docs) so the
* remaining rows are the operators a user can actually reason about (joins,
* arrangements, reduces, sources, sinks, etc.).
*/
export function buildOperatorCpuPerWorkerQuery(objectId: string) {
return queryBuilder
.selectFrom("mz_scheduling_elapsed_per_worker as mse")
.innerJoin("mz_dataflow_operator_dataflows as dod", "dod.id", "mse.id")
.innerJoin("mz_compute_exports as ce", "ce.dataflow_id", "dod.dataflow_id")
.where("ce.export_id", "=", objectId)
.where("dod.name", "not like", "Dataflow:%")
.where("dod.name", "not like", "BuildRegion:%")
.where("dod.name", "not like", "BuildingObject%")
.where("dod.name", "not like", "InputRegion:%")
.where("dod.name", "not like", "Binding(LocalId%")
.where("dod.name", "not like", "LogOperatorHydration%")
.where("dod.name", "!=", "Main Body")
.select((eb) => [
sql<string>`${sql.id("dod", "id")}::text`.as("operatorId"),
eb.ref("dod.name").as("operatorName"),
sql<number>`${sql.id("mse", "worker_id")}::int`.as("workerId"),
sql<bigint>`sum(${sql.id("mse", "elapsed_ns")})::bigint`.as("elapsedNs"),
])
.groupBy(["dod.id", "dod.name", "mse.worker_id"]);
}

export async function fetchOperatorCpuPerWorker({
params,
queryKey,
requestOptions,
}: {
params: OperatorCpuPerWorkerParams;
queryKey: QueryKey;
requestOptions?: RequestInit;
}) {
const query = buildOperatorCpuPerWorkerQuery(params.objectId).compile();
return executeSqlV2({
sessionVariables: buildSessionVariables({
cluster: params.clusterName,
cluster_replica: params.replicaName,
}),
queries: query,
queryKey,
requestOptions,
});
}

export type OperatorCpuPerWorkerRow = InferResult<
ReturnType<typeof buildOperatorCpuPerWorkerQuery>
>[0];
23 changes: 22 additions & 1 deletion console/src/platform/clusters/ClusterOverview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import {
Box,
Button,
Code,
Flex,
Grid,
Expand All @@ -24,7 +25,7 @@ import {
import React from "react";
import { useParams } from "react-router-dom";

import { MZ_PROBE_CLUSTER } from "~/api/materialize";
import { isSystemCluster, MZ_PROBE_CLUSTER } from "~/api/materialize";
import { Cluster } from "~/api/materialize/cluster/clusterList";
import Alert from "~/components/Alert";
import ErrorBox from "~/components/ErrorBox";
Expand All @@ -46,6 +47,7 @@ import {
TOTAL_GRAPH_HEIGHT_PX,
UtilizationGraph,
} from "./ClusterOverview/UtilizationGraph";
import { WorkerSkewDrawer } from "./ClusterOverview/WorkerSkewDrawer";
import { ClusterParams } from "./ClusterRoutes";
import {
CLUSTER_METRICS_UNAVAILABLE_MESSAGE,
Expand Down Expand Up @@ -76,6 +78,7 @@ const ClusterOverview = () => {
localStorageKey: "mz-cluster-graph-time-period",
});
const [selectedReplica, setSelectedReplica] = React.useState("all");
const [isWorkerSkewOpen, setIsWorkerSkewOpen] = React.useState(false);

const bucketSizeMs = React.useMemo(
() => Math.max(timePeriodMinutes * 1000, MIN_BUCKET_SIZE_MS),
Expand Down Expand Up @@ -164,6 +167,17 @@ const ClusterOverview = () => {
Resource Usage
</Text>
<HStack>
{cluster &&
!isSystemCluster(cluster.id) &&
cluster.replicas.length > 0 && (
<Button
variant="outline"
size="sm"
onClick={() => setIsWorkerSkewOpen(true)}
>
Where is CPU going?
</Button>
)}
{cluster && (
<LabeledSelect
label="Replicas"
Expand Down Expand Up @@ -266,6 +280,13 @@ const ClusterOverview = () => {
<ClusterFreshness clusterId={clusterId} />
)}
</VStack>
{cluster && (
<WorkerSkewDrawer
isOpen={isWorkerSkewOpen}
onClose={() => setIsWorkerSkewOpen(false)}
cluster={cluster}
/>
)}
</MainContentContainer>
);
};
Expand Down
Loading
Loading