Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/user/content/reference/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_activity_log_thinned -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_builtin_materialized_views -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_builtin_sources -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_catalog_raw -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_cluster_workload_classes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_error_counts_raw_unified -->
Expand Down
113 changes: 6 additions & 107 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use mz_catalog::builtin::{
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
Expand Down Expand Up @@ -401,10 +401,6 @@ impl CatalogState {
let privileges_row = self.pack_privilege_array_row(entry.privileges());
let privileges = privileges_row.unpack_first();
let mut updates = match entry.item() {
CatalogItem::Log(_) => self.pack_source_update(
id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
privileges, diff, None,
),
CatalogItem::Index(index) => {
self.pack_index_update(id, oid, name, owner_id, index, diff)
}
Expand Down Expand Up @@ -509,45 +505,7 @@ impl CatalogState {
updates
}
CatalogItem::Source(source) => {
let source_type = source.source_type();
let connection_id = source.connection_id();
let envelope = source.data_source.envelope();
let cluster_entry = match source.data_source {
// Ingestion exports don't have their own cluster, but
// run on their ingestion's cluster.
DataSourceDesc::IngestionExport { ingestion_id, .. } => {
self.get_entry(&ingestion_id)
}
DataSourceDesc::Ingestion { .. }
| DataSourceDesc::OldSyntaxIngestion { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Catalog => entry,
};

let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());

let (key_format, value_format) = source.data_source.formats();

let mut updates = self.pack_source_update(
id,
oid,
schema_id,
name,
source_type,
connection_id,
envelope,
key_format,
value_format,
cluster_id.as_deref(),
owner_id,
privileges,
diff,
source.create_sql.as_ref(),
);

updates.extend(match &source.data_source {
match &source.data_source {
DataSourceDesc::Ingestion { desc, .. }
| DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
GenericSourceConnection::Postgres(postgres) => {
Expand Down Expand Up @@ -628,9 +586,7 @@ impl CatalogState {
DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Catalog => vec![],
});

updates
}
}
CatalogItem::View(view) => {
self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
Expand All @@ -647,7 +603,7 @@ impl CatalogState {
CatalogItem::Func(func) => {
self.pack_func_update(id, schema_id, name, owner_id, func, diff)
}
CatalogItem::Secret(_) => vec![],
CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
CatalogItem::Connection(connection) => {
self.pack_connection_update(id, connection, diff)
}
Expand Down Expand Up @@ -831,63 +787,6 @@ impl CatalogState {
)]
}

fn pack_source_update(
&self,
id: CatalogItemId,
oid: u32,
schema_id: &SchemaSpecifier,
name: &str,
source_desc_name: &str,
connection_id: Option<CatalogItemId>,
envelope: Option<&str>,
key_format: Option<&str>,
value_format: Option<&str>,
cluster_id: Option<&str>,
owner_id: &RoleId,
privileges: Datum,
diff: Diff,
create_sql: Option<&String>,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let redacted = create_sql.map(|create_sql| {
let create_stmt = mz_sql::parse::parse(create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
.into_element()
.ast;
create_stmt.to_ast_string_redacted()
});
vec![BuiltinTableUpdate::row(
&*MZ_SOURCES,
Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Datum::String(&schema_id.to_string()),
Datum::String(name),
Datum::String(source_desc_name),
Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
// This is the "source size", which is a remnant from linked
// clusters.
Datum::Null,
Datum::from(envelope),
Datum::from(key_format),
Datum::from(value_format),
Datum::from(cluster_id),
Datum::String(&owner_id.to_string()),
privileges,
if let Some(create_sql) = create_sql {
Datum::String(create_sql)
} else {
Datum::Null
},
if let Some(redacted) = &redacted {
Datum::String(redacted)
} else {
Datum::Null
},
]),
diff,
)]
}

fn pack_postgres_source_update(
&self,
id: CatalogItemId,
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/catalog/open/builtin_schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
MZ_CATALOG_SCHEMA,
"mz_secrets",
),
MigrationStep::replacement(
"26.26.0-dev.0",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be 26.27.0-dev.0 I assume

CatalogItemType::MaterializedView,
MZ_CATALOG_SCHEMA,
"mz_sources",
),
]
});

Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Table(&MZ_INDEXES),
Builtin::Table(&MZ_INDEX_COLUMNS),
Builtin::Table(&MZ_TABLES),
Builtin::Table(&MZ_SOURCES),
Builtin::MaterializedView(&MZ_SOURCES),
Builtin::Table(&MZ_SOURCE_REFERENCES),
Builtin::Table(&MZ_POSTGRES_SOURCES),
Builtin::Table(&MZ_POSTGRES_SOURCE_TABLES),
Expand Down
65 changes: 63 additions & 2 deletions src/catalog/src/builtin/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,87 @@ use mz_sql::catalog::{NameReference, ObjectType};
use mz_sql::rbac;
use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;

use crate::builtin::{Builtin, BuiltinMaterializedView, BuiltinView, PUBLIC_SELECT};
use crate::builtin::{
Builtin, BuiltinLog, BuiltinMaterializedView, BuiltinSource, BuiltinView, PUBLIC_SELECT,
};

/// Generate builtin views reporting the given builtins.
///
/// Used in the [`super::BUILTINS_STATIC`] initializer.
pub(super) fn builtins(
builtin_items: &[Builtin<NameReference>],
) -> impl Iterator<Item = Builtin<NameReference>> {
let source_iter = builtin_items.iter().filter_map(|b| match b {
Builtin::Source(x) => Some(*x),
_ => None,
});
let log_iter = builtin_items.iter().filter_map(|b| match b {
Builtin::Log(x) => Some(*x),
_ => None,
});
let mv_iter = builtin_items.iter().filter_map(|b| match b {
Builtin::MaterializedView(x) => Some(*x),
_ => None,
});

let sources = make_builtin_sources(source_iter, log_iter);
let materialized_views = make_builtin_materialized_views(mv_iter);

[materialized_views].into_iter().map(|v| {
[sources, materialized_views].into_iter().map(|v| {
let static_ref = Box::leak(Box::new(v));
Builtin::View(static_ref)
})
}

fn make_builtin_sources(
source_iter: impl Iterator<Item = &'static BuiltinSource>,
log_iter: impl Iterator<Item = &'static BuiltinLog>,
) -> BuiltinView {
let owner_priv = rbac::owner_privilege(ObjectType::Source, MZ_SYSTEM_ROLE_ID);
let source_values = source_iter.map(|src| {
let privileges = make_privileges_sql(&src.access, &owner_priv);
format!(
"({}::oid, '{}', '{}', 'source', {})",
src.oid, src.schema, src.name, privileges
)
});
let log_values = log_iter.map(|log| {
let privileges = make_privileges_sql(&log.access, &owner_priv);
format!(
"({}::oid, '{}', '{}', 'log', {})",
log.oid, log.schema, log.name, privileges
)
});
let values = source_values.chain(log_values).join(",");
let sql = format!(
"
SELECT oid, schema_name, name, type, privileges
FROM (VALUES {values}) AS v(oid, schema_name, name, type, privileges)"
);

BuiltinView {
name: "mz_builtin_sources",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::VIEW_MZ_BUILTIN_SOURCES_OID,
desc: RelationDesc::builder()
.with_column("oid", SqlScalarType::Oid.nullable(false))
.with_column("schema_name", SqlScalarType::String.nullable(false))
.with_column("name", SqlScalarType::String.nullable(false))
.with_column("type", SqlScalarType::String.nullable(false))
.with_column(
"privileges",
SqlScalarType::Array(Box::new(SqlScalarType::MzAclItem)).nullable(false),
)
.with_key(vec![0])
.with_key(vec![2])
.finish(),
column_comments: Default::default(),
sql: Box::leak(sql.into_boxed_str()),
access: vec![PUBLIC_SELECT],
ontology: None,
}
}

fn make_builtin_materialized_views<'a>(
iter: impl Iterator<Item = &'a BuiltinMaterializedView>,
) -> BuiltinView {
Expand Down
Loading
Loading