Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def validate(self) -> Testdrive:
)


@disabled("due to https://github.com/MaterializeInc/database-issues/issues/10086")
class MaterializedViewReplacement(Check):
def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v26.8.0")
Expand Down
8 changes: 6 additions & 2 deletions src/compute-client/src/as_of_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,9 +771,13 @@ impl<'a, T: TimestampManipulation> Context<'a, T> {
/// impose as-of constraints on other compute collections.
fn prune_sealed_persist_sinks(&mut self) {
self.collections.retain(|id, _| {
self.storage_collections
let retain = self.storage_collections
.collection_frontiers(*id)
.map_or(true, |f| !f.write_frontier.is_empty())
.map_or(true, |f| !f.write_frontier.is_empty());
if !retain {
tracing::info!("prune sealed persist sink: {id}");
}
retain
});
}

Expand Down
6 changes: 6 additions & 0 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,7 @@ where
.collection_metadata(id)
.map_err(|_| CollectionMissing(id))?
.clone();
tracing::info!("MV dataflow export: {id} {metadata:?}");
let conn = MaterializedViewSinkConnection {
value_desc: conn.value_desc,
storage_metadata: metadata,
Expand Down Expand Up @@ -1619,6 +1620,7 @@ where
export_ids = %augmented_dataflow.display_export_ids(),
as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
until = ?augmented_dataflow.until.elements(),
sink_exports = ?augmented_dataflow.sink_exports,
"creating dataflow",
);
}
Expand Down Expand Up @@ -1897,6 +1899,10 @@ where
return;
}

if id.is_user() && new_frontier.is_empty() {
tracing::info!("advanced to empty write frontier: {id}");
}

// Relax the implied read hold according to the read policy.
let new_since = match &collection.read_policy {
Some(read_policy) => {
Expand Down
3 changes: 3 additions & 0 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,9 @@ mod append {
let (lower, upper) = (desc.lower, desc.upper);
let mut to_append: Vec<_> = self.batches.iter_mut().collect();

if upper.is_empty() && self.sink_id.is_user() {
tracing::info!("appending batch with empty frontier: {}", self.sink_id);
}
loop {
let result = self
.persist_writer
Expand Down
5 changes: 4 additions & 1 deletion src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2733,6 +2733,9 @@ where
if id.is_user() {
trace!("new upper for collection {id}: {:?}", upper);
}
if id.is_user() && upper.is_empty() {
info!("new empty upper for collection {id}");
}
let current_shard = self.shard_by_id.get(&id);
if let Some(shard_id) = current_shard {
if shard_id == &handle.shard_id() {
Expand Down Expand Up @@ -3071,7 +3074,7 @@ async fn finalize_shards_task<T>(
debug!(%shard_id, "shard is already finalized!");
Some(shard_id)
} else {
debug!(%shard_id, "finalizing shard");
info!(%shard_id, "finalizing shard");
let finalize = || async move {
// TODO: thread the global ID into the shard finalization WAL
let diagnostics = Diagnostics::from_purpose("finalizing shards");
Expand Down