Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/clusterd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mz_service::secrets::SecretsReaderCliArgs;
use mz_service::transport;
use mz_storage::storage_state::StorageInstanceContext;
use mz_storage_types::connections::ConnectionContext;
use mz_timely_util::capture::arc_event_link;
use mz_txn_wal::operator::TxnsContext;
use tokio::runtime::Handle;
use tower::Service;
Expand Down Expand Up @@ -377,6 +378,11 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
let mut compute_timely_config = args.compute_timely_config;
compute_timely_config.process = args.process;

// Create per-worker bridges for forwarding storage timely logging events to compute.
let num_workers = storage_timely_config.workers;
let (storage_log_writers, storage_log_readers): (Vec<_>, Vec<_>) =
(0..num_workers).map(|_| arc_event_link()).unzip();
Comment on lines +381 to +384
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be feature-flagged. It means a new parameter to clusterd that is set by envd depending on a feature flag.


// Start storage server.
let storage_client_builder = mz_storage::serve(
storage_timely_config,
Expand All @@ -387,6 +393,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
SYSTEM_TIME.clone(),
connection_context.clone(),
StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
storage_log_writers,
)
.await?;
info!(
Expand Down Expand Up @@ -418,6 +425,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
worker_core_affinity: args.worker_core_affinity,
connection_context,
},
storage_log_readers,
)
.await?;
info!(
Expand Down
12 changes: 10 additions & 2 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ pub(crate) struct ActiveComputeState<'a> {
pub compute_state: &'a mut ComputeState,
/// The channel over which frontier information is reported.
pub response_tx: &'a mut ResponseSender,
/// Reader for storage timely logging events (consumed during CreateInstance).
pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
}

/// A token that keeps a sink alive.
Expand Down Expand Up @@ -417,7 +419,8 @@ impl<'a> ActiveComputeState<'a> {
self.compute_state.apply_expiration_offset(offset);
}

self.initialize_logging(config.logging);
let storage_log_reader = self.storage_log_reader.take();
self.initialize_logging(config.logging, storage_log_reader);

self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
}
Expand Down Expand Up @@ -670,7 +673,11 @@ impl<'a> ActiveComputeState<'a> {
}

/// Initializes timely dataflow logging and publishes as a view.
pub fn initialize_logging(&mut self, config: LoggingConfig) {
pub fn initialize_logging(
&mut self,
config: LoggingConfig,
storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
) {
if self.compute_state.compute_logger.is_some() {
panic!("dataflow server has already initialized logging");
}
Expand All @@ -685,6 +692,7 @@ impl<'a> ActiveComputeState<'a> {
self.compute_state.metrics_registry.clone(),
Rc::clone(&self.compute_state.worker_config),
self.compute_state.workers_per_process,
storage_log_reader,
);

let dataflow_index = Rc::new(dataflow_index);
Expand Down
5 changes: 5 additions & 0 deletions src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub fn initialize(
metrics_registry: MetricsRegistry,
worker_config: Rc<ConfigSet>,
workers_per_process: usize,
storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
) -> LoggingTraces {
let interval_ms = std::cmp::max(1, config.interval.as_millis());

Expand All @@ -70,6 +71,7 @@ pub fn initialize(
metrics_registry,
worker_config,
workers_per_process,
storage_log_reader,
};

// Depending on whether we should log the creation of the logging dataflows, we register the
Expand Down Expand Up @@ -108,6 +110,8 @@ struct LoggingContext<'a> {
metrics_registry: MetricsRegistry,
worker_config: Rc<ConfigSet>,
workers_per_process: usize,
/// Optional reader for storage timely logging events.
storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
}

pub(crate) struct LoggingTraces {
Expand All @@ -133,6 +137,7 @@ impl LoggingContext<'_> {
self.config,
self.t_event_queue.clone(),
Rc::clone(&self.shared_state),
self.storage_log_reader.take(),
);
collections.extend(timely_collections);

Expand Down
107 changes: 103 additions & 4 deletions src/compute/src/logging/timely.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use mz_timely_util::replay::MzReplay;
use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::operators::Operator;
use timely::dataflow::operators::core::Filter;
use timely::dataflow::operators::generic::OutputBuilder;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::generic::operator::empty;
Expand Down Expand Up @@ -61,6 +62,7 @@ pub(super) fn construct(
config: &LoggingConfig,
event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
) -> Return {
scope.scoped("timely logging", move |scope| {
let enable_logging = config.enable_logging;
Expand All @@ -76,6 +78,44 @@ pub(super) fn construct(
(empty(scope), token)
};

// If we have a storage log reader, replay it, remap its IDs to avoid
// collisions with compute IDs, and concatenate with compute logs.
let (logs, storage_token) = if let Some(reader) = storage_log_reader {
use mz_timely_util::activator::RcActivator;
use timely::dataflow::operators::Concatenate;

let activator = RcActivator::new("storage_timely_activator".to_string(), 128);
let (storage_logs, s_token) =
[reader].mz_replay(scope, "storage timely logs", config.interval, activator);

let storage_logs = storage_logs
// Ignore storage park/unpark events.
.filter(|(_, x)| !matches!(x, TimelyEvent::Park { .. }))
// Remap storage IDs so they don't collide with compute IDs.
.unary(Pipeline, "Remap Storage IDs", |_cap, _info| {
move |input, output| {
input.for_each(|time, data| {
output.session(&time).give_iterator(data.drain(..).map(
|(t, mut event)| {
remap_timely_event_ids(&mut event);
(t, event)
},
));
});
}
});

let merged = scope.concatenate([logs, storage_logs]);
(merged, Some(s_token))
} else {
(logs, None)
};
// Convert storage token to Rc<dyn Any> so we can stash it alongside the compute token.
let storage_token: Rc<dyn std::any::Any> = match storage_token {
Some(t) => t,
None => Rc::new(()),
};

// Build a demux operator that splits the replayed event stream up into the separate
// logging streams.
let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
Expand Down Expand Up @@ -133,10 +173,14 @@ pub(super) fn construct(
};

for (time, event) in data.drain(..) {
// Note: we skip the worker_id assertion for storage events,
// whose channel IDs have been offset by STORAGE_ID_OFFSET.
if let TimelyEvent::Messages(msg) = &event {
match msg.is_send {
true => assert_eq!(msg.source, worker_id),
false => assert_eq!(msg.target, worker_id),
if msg.channel < STORAGE_ID_OFFSET {
match msg.is_send {
true => assert_eq!(msg.source, worker_id),
false => assert_eq!(msg.target, worker_id),
}
}
}

Expand Down Expand Up @@ -367,9 +411,11 @@ pub(super) fn construct(
&format!("Arrange {variant:?}"),
)
.trace;
let combined_token: Rc<dyn std::any::Any> =
Rc::new((Rc::clone(&token), Rc::clone(&storage_token)));
let collection = LogCollection {
trace,
token: Rc::clone(&token),
token: combined_token,
};
collections.insert(variant, collection);
}
Expand Down Expand Up @@ -785,3 +831,56 @@ where
vec.resize(index + 1, Default::default());
}
}

/// Offset added to storage operator/channel IDs to avoid collisions with compute IDs.
///
/// This is large enough that compute IDs (which start from 0 and grow) will never reach it,
/// but small enough to be representable as a `u64` with room for many storage operators.
const STORAGE_ID_OFFSET: usize = 1 << 48;

/// Remaps operator, channel, and address IDs in a `TimelyEvent` originating from a storage
/// timely instance so they don't collide with compute's IDs.
fn remap_timely_event_ids(event: &mut TimelyEvent) {
Comment on lines +835 to +843
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for console folks: How does the UI decide which things show up in the clickable introspection graph thing? If a bunch of storage operators show up in the introspection tables with these gigantic IDs, will that clutter the existing introspection UI for compute?
@leedqin

match event {
TimelyEvent::Operates(OperatesEvent { id, addr, .. }) => {
*id = id.wrapping_add(STORAGE_ID_OFFSET);
if let Some(first) = addr.first_mut() {
*first = first.wrapping_add(STORAGE_ID_OFFSET);
}
}
TimelyEvent::Channels(ChannelsEvent {
id,
scope_addr,
source,
target,
..
}) => {
*id = id.wrapping_add(STORAGE_ID_OFFSET);
if let Some(first) = scope_addr.first_mut() {
*first = first.wrapping_add(STORAGE_ID_OFFSET);
}
source.0 = source.0.wrapping_add(STORAGE_ID_OFFSET);
target.0 = target.0.wrapping_add(STORAGE_ID_OFFSET);
}
TimelyEvent::Shutdown(ShutdownEvent { id }) => {
*id = id.wrapping_add(STORAGE_ID_OFFSET);
}
TimelyEvent::Schedule(ScheduleEvent { id, .. }) => {
*id = id.wrapping_add(STORAGE_ID_OFFSET);
}
TimelyEvent::Messages(MessagesEvent { channel, .. }) => {
*channel = channel.wrapping_add(STORAGE_ID_OFFSET);
// Note: source/target in Messages are *worker* IDs, not operator IDs.
// We leave them as-is.
}
TimelyEvent::PushProgress(e) => {
e.op_id = e.op_id.wrapping_add(STORAGE_ID_OFFSET);
}
TimelyEvent::CommChannels(e) => {
e.identifier = e.identifier.wrapping_add(STORAGE_ID_OFFSET);
}
TimelyEvent::Park(_) | TimelyEvent::Text(_) => {
// No IDs to remap.
}
}
}
55 changes: 40 additions & 15 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::TracingHandle;
use mz_persist_client::cache::PersistClientCache;
use mz_storage_types::connections::ConnectionContext;
use mz_timely_util::capture::ArcEventLink;
use mz_txn_wal::operator::TxnsContext;
use timely::logging::TimelyEvent;
use timely::progress::Antichain;
use timely::worker::Worker as TimelyWorker;
use tokio::sync::mpsc;
Expand All @@ -54,8 +56,12 @@ pub struct ComputeInstanceContext {
pub connection_context: ConnectionContext,
}

/// Type alias for the storage timely log reader.
pub(crate) type StorageTimelyLogReader =
Arc<ArcEventLink<mz_repr::Timestamp, Vec<(Duration, TimelyEvent)>>>;

/// Configures the server with compute-specific metrics.
#[derive(Debug, Clone)]
#[derive(Clone)]
struct Config {
/// `persist` client cache.
pub persist_clients: Arc<PersistClientCache>,
Expand All @@ -71,6 +77,10 @@ struct Config {
pub metrics_registry: MetricsRegistry,
/// The number of timely workers per process.
pub workers_per_process: usize,
/// Per-worker readers for storage timely logging events.
// TODO: Consider using the timely config's `process` and `workers` fields to
// deterministically assign readers to workers by local index, rather than pop().
pub storage_log_readers: Arc<Mutex<Vec<StorageTimelyLogReader>>>,
Comment on lines +80 to +83
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invariant in compute dataflows is that worker id $$x$$ processes introspection data of worker $$x$$. It might be that a more general invariant might be true, too, but we need to audit the implementation. Probably this is true: Each worker ID must send its updates to exactly one processing operator instance, for the lifetime of the worker.

}

/// Initiates a timely dataflow computation, processing compute commands.
Expand All @@ -81,6 +91,7 @@ pub async fn serve(
txns_ctx: TxnsContext,
tracing_handle: Arc<TracingHandle>,
context: ComputeInstanceContext,
storage_log_readers: Vec<StorageTimelyLogReader>,
) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
let config = Config {
persist_clients,
Expand All @@ -90,6 +101,7 @@ pub async fn serve(
context,
metrics_registry: metrics_registry.clone(),
workers_per_process: timely_config.workers,
storage_log_readers: Arc::new(Mutex::new(storage_log_readers)),
};
let tokio_executor = tokio::runtime::Handle::current();

Expand Down Expand Up @@ -223,6 +235,8 @@ struct Worker<'w> {
metrics_registry: MetricsRegistry,
/// The number of timely workers per process.
workers_per_process: usize,
/// Reader for storage timely logging events.
storage_log_reader: Option<StorageTimelyLogReader>,
}

impl ClusterSpec for Config {
Expand All @@ -247,6 +261,9 @@ impl ClusterSpec for Config {
let worker_id = timely_worker.index();
let metrics = self.metrics.for_worker(worker_id);

// Take this worker's storage log reader.
let storage_log_reader = self.storage_log_readers.lock().unwrap().pop();

// Create the command channel that broadcasts commands from worker 0 to other workers. We
// reuse this channel between client connections, to avoid bugs where different workers end
// up creating incompatible sides of the channel dataflow after reconnects.
Expand All @@ -268,6 +285,7 @@ impl ClusterSpec for Config {
tracing_handle: Arc::clone(&self.tracing_handle),
metrics_registry: self.metrics_registry.clone(),
workers_per_process: self.workers_per_process,
storage_log_reader,
}
.run()
}
Expand Down Expand Up @@ -396,21 +414,27 @@ impl<'w> Worker<'w> {
}

fn handle_command(&mut self, cmd: ComputeCommand) {
match &cmd {
ComputeCommand::CreateInstance(_) => {
self.compute_state = Some(ComputeState::new(
Arc::clone(&self.persist_clients),
self.txns_ctx.clone(),
self.metrics.clone(),
Arc::clone(&self.tracing_handle),
self.context.clone(),
self.metrics_registry.clone(),
self.workers_per_process,
));
}
_ => (),
let is_create_instance = matches!(&cmd, ComputeCommand::CreateInstance(_));
if is_create_instance {
self.compute_state = Some(ComputeState::new(
Arc::clone(&self.persist_clients),
self.txns_ctx.clone(),
self.metrics.clone(),
Arc::clone(&self.tracing_handle),
self.context.clone(),
self.metrics_registry.clone(),
self.workers_per_process,
));
}
self.activate_compute().unwrap().handle_compute_command(cmd);
// Take the storage log reader before borrowing self for activate_compute.
let storage_log_reader = if is_create_instance {
self.storage_log_reader.take()
} else {
None
};
let mut active = self.activate_compute().unwrap();
active.storage_log_reader = storage_log_reader;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fragile as it only works if the current command is CrateInstance, but not in any other case. This is transient state, but it seems to be better to make it part of ComputeState to avoid unrelated parts of the code to depend on implementation details.

active.handle_compute_command(cmd);
}

fn activate_compute(&mut self) -> Option<ActiveComputeState<'_>> {
Expand All @@ -419,6 +443,7 @@ impl<'w> Worker<'w> {
timely_worker: &mut *self.timely_worker,
compute_state,
response_tx: &mut self.response_tx,
storage_log_reader: None,
})
} else {
None
Expand Down
Loading
Loading