Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cid_file=ci/builder/.${flavor%%-*}.cidfile

rust_components=rustc,cargo,rust-std-$arch_gcc-unknown-linux-gnu,llvm-tools-preview
if [[ $rust_version = nightly ]]; then
rust_components+=,miri-preview
rust_components+=,miri-preview,rustfmt-preview
else
rust_components+=,clippy-preview,rustfmt-preview
fi
Expand Down Expand Up @@ -145,7 +145,7 @@ fi
# *inside* this image. See materialize.git.expand_globs in the Python code for
# details on this computation.
files=$(cat \
<(git diff --name-only -z 4b825dc642cb6eb9a060e54bf8d69288fbee4904 ci/builder) \
<(git diff --name-only -z 4b825dc642cb6eb9a060e54bf8d69288fbee4904 ci/builder bin/ci-builder) \
<(git ls-files --others --exclude-standard -z ci/builder) \
| LC_ALL=C sort -z \
| xargs -0 "$shasum")
Expand Down
12 changes: 12 additions & 0 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ steps:
coverage: skip
sanitizer: skip

- id: lint-line-length
label: Line length lint
command: bin/ci-builder run nightly cargo fmt -- --check --config error_on_line_overflow=true
inputs:
- "**/*.rs"
depends_on: []
timeout_in_minutes: 20
agents:
queue: hetzner-aarch64-4cpu-8gb
coverage: skip
sanitizer: skip

- id: lint-clippy
label: Clippy
command: bin/ci-builder run stable ci/test/lint-clippy.sh
Expand Down
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
edition = "2024"
tab_spaces = 4
merge_derives = false
12 changes: 9 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3474,7 +3474,9 @@ impl Coordinator {
// `next()` on any stream is cancel-safe:
// https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
// Receive a single command.
Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
Some(event) = cluster_events.next() => {
messages.push(Message::ClusterEvent(event))
},
// See [`mz_controller::Controller::Controller::ready`] for notes
// on why this is cancel-safe.
// Receive a single command.
Expand Down Expand Up @@ -3522,7 +3524,9 @@ impl Coordinator {
if count == 0 {
break;
} else {
messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
messages.extend(cmd_messages.drain(..).map(
|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
));
}
},
// `recv()` on `UnboundedReceiver` is cancellation safe:
Expand All @@ -3534,7 +3538,9 @@ impl Coordinator {
pending_read_txns.push(pending_read_txn);
}
for (conn_id, pending_read_txn) in pending_read_txns {
let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
let prev = self
.pending_linearize_read_txns
.insert(conn_id, pending_read_txn);
soft_assert_or_log!(
prev.is_none(),
"connections can not have multiple concurrent reads, prev: {prev:?}"
Expand Down
16 changes: 7 additions & 9 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,13 @@ impl Coordinator {
}
Message::PrivateLinkVpcEndpointEvents(events) => {
if !self.controller.read_only() {
self.controller
.storage
.append_introspection_updates(
mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
events
.into_iter()
.map(|e| (mz_repr::Row::from(e), Diff::ONE))
.collect(),
);
self.controller.storage.append_introspection_updates(
IntrospectionType::PrivatelinkConnectionStatusHistory,
events
.into_iter()
.map(|e| (mz_repr::Row::from(e), Diff::ONE))
.collect(),
);
}
}
Message::CheckSchedulingPolicies => {
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ impl CachedStatisticsOracle {
pub async fn new<T: TimelyTimestamp>(
ids: &BTreeSet<GlobalId>,
as_of: &Antichain<T>,
storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
storage_collections: &dyn StorageCollections<Timestamp = T>,
) -> Result<Self, StorageError<T>> {
let mut cache = BTreeMap::new();

Expand Down
56 changes: 26 additions & 30 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
use mz_catalog::memory::error::ErrorKind;
use mz_catalog::memory::objects::{
CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
};
Expand Down Expand Up @@ -115,6 +116,9 @@ use crate::session::{
use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
use crate::{PeekResponseUnary, ReadHolds};

/// A future that resolves to a real-time recency timestamp.
type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;

mod cluster;
mod copy_from;
mod create_continual_task;
Expand Down Expand Up @@ -651,8 +655,7 @@ impl Coordinator {
match transact_result {
Ok(()) => Ok(ExecuteResponse::CreatedSource),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
})) if if_not_exists_ids.contains_key(&id) => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
Expand Down Expand Up @@ -797,8 +800,7 @@ impl Coordinator {
match transact_result {
Ok(_) => Ok(ExecuteResponse::CreatedConnection),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if plan.if_not_exists => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
Expand All @@ -824,8 +826,7 @@ impl Coordinator {
match self.catalog_transact(Some(session), ops).await {
Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
})) if plan.if_not_exists => {
session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
Ok(ExecuteResponse::CreatedDatabase)
Expand All @@ -848,8 +849,7 @@ impl Coordinator {
match self.catalog_transact(Some(session), vec![op]).await {
Ok(_) => Ok(ExecuteResponse::CreatedSchema),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
})) if plan.if_not_exists => {
session.add_notice(AdapterNotice::SchemaAlreadyExists {
name: plan.schema_name,
Expand Down Expand Up @@ -1037,8 +1037,7 @@ impl Coordinator {
match catalog_result {
Ok(()) => Ok(ExecuteResponse::CreatedTable),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
ctx.session_mut()
.add_notice(AdapterNotice::ObjectAlreadyExists {
Expand Down Expand Up @@ -1096,8 +1095,7 @@ impl Coordinator {
match result {
Ok(()) => {}
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
Expand Down Expand Up @@ -1673,9 +1671,13 @@ impl Coordinator {
for (default_privilege_object, default_privilege_acls) in
self.catalog().default_privileges()
{
if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
|| matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
{
if matches!(
&default_privilege_object.database_id,
Some(database_id) if dropped_databases.contains(database_id),
) || matches!(
&default_privilege_object.schema_id,
Some(schema_id) if dropped_schemas.contains(schema_id),
) {
for default_privilege_acl in default_privilege_acls {
default_privilege_revokes.insert((
default_privilege_object.clone(),
Expand Down Expand Up @@ -2129,9 +2131,7 @@ impl Coordinator {
// Re-verify this id exists.
let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
AdapterError::Catalog(mz_catalog::memory::error::Error {
kind: mz_catalog::memory::error::ErrorKind::Sql(
CatalogError::UnknownItem(id.to_string()),
),
kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
})
})?;
}
Expand Down Expand Up @@ -2272,8 +2272,7 @@ impl Coordinator {
&self,
source_ids: impl Iterator<Item = GlobalId>,
real_time_recency_timeout: Duration,
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
{
) -> Result<Option<RtrTimestampFuture>, AdapterError> {
let item_ids = source_ids
.map(|gid| {
self.catalog
Expand Down Expand Up @@ -2325,8 +2324,7 @@ impl Coordinator {
&self,
session: &Session,
source_ids: impl Iterator<Item = GlobalId>,
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
{
) -> Result<Option<RtrTimestampFuture>, AdapterError> {
let vars = session.vars();

if vars.real_time_recency()
Expand Down Expand Up @@ -2537,9 +2535,9 @@ impl Coordinator {
None => {
ctx.retire(Err(AdapterError::Catalog(
mz_catalog::memory::error::Error {
kind: mz_catalog::memory::error::ErrorKind::Sql(
CatalogError::UnknownItem(plan.id.to_string()),
),
kind: ErrorKind::Sql(CatalogError::UnknownItem(
plan.id.to_string(),
)),
},
)));
return;
Expand Down Expand Up @@ -2647,9 +2645,7 @@ impl Coordinator {
None => {
ctx.retire(Err(AdapterError::Catalog(
mz_catalog::memory::error::Error {
kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
id.to_string(),
)),
kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
},
)));
return;
Expand Down Expand Up @@ -2843,8 +2839,8 @@ impl Coordinator {
return_if_err!(style.prep_scalar_expr(expr), ctx);
}

let make_diffs =
move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
let make_diffs = move |mut rows: Box<dyn RowIterator>|
-> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
let arena = RowArena::new();
let mut diffs = Vec::new();
let mut datum_vec = mz_repr::DatumVec::new();
Expand Down
11 changes: 7 additions & 4 deletions src/adapter/src/coord/sequencer/inner/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::collections::BTreeMap;

use maplit::btreemap;
use mz_catalog::memory::error::ErrorKind;
use mz_catalog::memory::objects::{CatalogItem, Index};
use mz_ore::instrument;
use mz_repr::explain::{ExprHumanizerExt, TransientItem};
Expand Down Expand Up @@ -341,8 +342,11 @@ impl Coordinator {
), AdapterError> {
let _dispatch_guard = explain_ctx.dispatch_guard();

let index_plan =
optimize::index::Index::new(plan.name.clone(), plan.index.on, plan.index.keys.clone());
let index_plan = optimize::index::Index::new(
plan.name.clone(),
plan.index.on,
plan.index.keys.clone(),
);

// MIR ⇒ MIR optimization (global)
let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
Expand Down Expand Up @@ -512,8 +516,7 @@ impl Coordinator {
match transact_result {
Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,11 @@ impl Coordinator {

// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
let global_mir_plan =
optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
// MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
let global_lir_plan =
optimizer.catch_unwind_optimize(global_mir_plan.clone())?;

Ok((local_mir_plan, global_mir_plan, global_lir_plan))
};
Expand Down Expand Up @@ -756,7 +758,9 @@ impl Coordinator {
Ok(_) => Ok(ExecuteResponse::CreatedMaterializedView),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
mz_catalog::memory::error::ErrorKind::Sql(
CatalogError::ItemAlreadyExists(_, _),
),
})) if if_not_exists => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/sequencer/inner/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::collections::BTreeMap;

use maplit::btreemap;
use mz_catalog::memory::error::ErrorKind;
use mz_catalog::memory::objects::{CatalogItem, View};
use mz_expr::CollectionPlan;
use mz_ore::instrument;
Expand Down Expand Up @@ -424,8 +425,7 @@ impl Coordinator {
match self.catalog_transact(Some(session), ops).await {
Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
session.add_notice(AdapterNotice::ObjectAlreadyExists {
name: name.item,
Expand Down
Loading