-
Notifications
You must be signed in to change notification settings - Fork 499
Storage introspection mirrored in compute #35863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
46e4c45
87725a0
2350573
620cdc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ use mz_timely_util::replay::MzReplay; | |
| use timely::dataflow::Scope; | ||
| use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; | ||
| use timely::dataflow::operators::Operator; | ||
| use timely::dataflow::operators::core::Filter; | ||
| use timely::dataflow::operators::generic::OutputBuilder; | ||
| use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; | ||
| use timely::dataflow::operators::generic::operator::empty; | ||
|
|
@@ -61,6 +62,7 @@ pub(super) fn construct( | |
| config: &LoggingConfig, | ||
| event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>, | ||
| shared_state: Rc<RefCell<SharedLoggingState>>, | ||
| storage_log_reader: Option<crate::server::StorageTimelyLogReader>, | ||
| ) -> Return { | ||
| scope.scoped("timely logging", move |scope| { | ||
| let enable_logging = config.enable_logging; | ||
|
|
@@ -76,6 +78,44 @@ pub(super) fn construct( | |
| (empty(scope), token) | ||
| }; | ||
|
|
||
| // If we have a storage log reader, replay it, remap its IDs to avoid | ||
| // collisions with compute IDs, and concatenate with compute logs. | ||
| let (logs, storage_token) = if let Some(reader) = storage_log_reader { | ||
| use mz_timely_util::activator::RcActivator; | ||
| use timely::dataflow::operators::Concatenate; | ||
|
|
||
| let activator = RcActivator::new("storage_timely_activator".to_string(), 128); | ||
| let (storage_logs, s_token) = | ||
| [reader].mz_replay(scope, "storage timely logs", config.interval, activator); | ||
|
|
||
| let storage_logs = storage_logs | ||
| // Ignore storage park/unpark events. | ||
| .filter(|(_, x)| !matches!(x, TimelyEvent::Park { .. })) | ||
| // Remap storage IDs so they don't collide with compute IDs. | ||
| .unary(Pipeline, "Remap Storage IDs", |_cap, _info| { | ||
| move |input, output| { | ||
| input.for_each(|time, data| { | ||
| output.session(&time).give_iterator(data.drain(..).map( | ||
| |(t, mut event)| { | ||
| remap_timely_event_ids(&mut event); | ||
| (t, event) | ||
| }, | ||
| )); | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| let merged = scope.concatenate([logs, storage_logs]); | ||
| (merged, Some(s_token)) | ||
| } else { | ||
| (logs, None) | ||
| }; | ||
| // Convert storage token to Rc<dyn Any> so we can stash it alongside the compute token. | ||
| let storage_token: Rc<dyn std::any::Any> = match storage_token { | ||
| Some(t) => t, | ||
| None => Rc::new(()), | ||
| }; | ||
|
|
||
| // Build a demux operator that splits the replayed event stream up into the separate | ||
| // logging streams. | ||
| let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone()); | ||
|
|
@@ -133,10 +173,14 @@ pub(super) fn construct( | |
| }; | ||
|
|
||
| for (time, event) in data.drain(..) { | ||
| // Note: we skip the worker_id assertion for storage events, | ||
| // whose channel IDs have been offset by STORAGE_ID_OFFSET. | ||
| if let TimelyEvent::Messages(msg) = &event { | ||
| match msg.is_send { | ||
| true => assert_eq!(msg.source, worker_id), | ||
| false => assert_eq!(msg.target, worker_id), | ||
| if msg.channel < STORAGE_ID_OFFSET { | ||
| match msg.is_send { | ||
| true => assert_eq!(msg.source, worker_id), | ||
| false => assert_eq!(msg.target, worker_id), | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -367,9 +411,11 @@ pub(super) fn construct( | |
| &format!("Arrange {variant:?}"), | ||
| ) | ||
| .trace; | ||
| let combined_token: Rc<dyn std::any::Any> = | ||
| Rc::new((Rc::clone(&token), Rc::clone(&storage_token))); | ||
| let collection = LogCollection { | ||
| trace, | ||
| token: Rc::clone(&token), | ||
| token: combined_token, | ||
| }; | ||
| collections.insert(variant, collection); | ||
| } | ||
|
|
@@ -785,3 +831,56 @@ where | |
| vec.resize(index + 1, Default::default()); | ||
| } | ||
| } | ||
|
|
||
| /// Offset added to storage operator/channel IDs to avoid collisions with compute IDs. | ||
| /// | ||
| /// This is large enough that compute IDs (which start from 0 and grow) will never reach it, | ||
| /// but small enough to be representable as a `u64` with room for many storage operators. | ||
| const STORAGE_ID_OFFSET: usize = 1 << 48; | ||
|
|
||
| /// Remaps operator, channel, and address IDs in a `TimelyEvent` originating from a storage | ||
| /// timely instance so they don't collide with compute's IDs. | ||
| fn remap_timely_event_ids(event: &mut TimelyEvent) { | ||
|
Comment on lines
+835
to
+843
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question for console folks: How does the UI decide which things show up in the clickable introspection graph thing? If a bunch of storage operators show up in the introspection tables with these gigantic IDs, will that clutter the existing introspection UI for compute? |
||
| match event { | ||
| TimelyEvent::Operates(OperatesEvent { id, addr, .. }) => { | ||
| *id = id.wrapping_add(STORAGE_ID_OFFSET); | ||
| if let Some(first) = addr.first_mut() { | ||
| *first = first.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| } | ||
| TimelyEvent::Channels(ChannelsEvent { | ||
| id, | ||
| scope_addr, | ||
| source, | ||
| target, | ||
| .. | ||
| }) => { | ||
| *id = id.wrapping_add(STORAGE_ID_OFFSET); | ||
| if let Some(first) = scope_addr.first_mut() { | ||
| *first = first.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| source.0 = source.0.wrapping_add(STORAGE_ID_OFFSET); | ||
| target.0 = target.0.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| TimelyEvent::Shutdown(ShutdownEvent { id }) => { | ||
| *id = id.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| TimelyEvent::Schedule(ScheduleEvent { id, .. }) => { | ||
| *id = id.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| TimelyEvent::Messages(MessagesEvent { channel, .. }) => { | ||
| *channel = channel.wrapping_add(STORAGE_ID_OFFSET); | ||
| // Note: source/target in Messages are *worker* IDs, not operator IDs. | ||
| // We leave them as-is. | ||
| } | ||
| TimelyEvent::PushProgress(e) => { | ||
| e.op_id = e.op_id.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| TimelyEvent::CommChannels(e) => { | ||
| e.identifier = e.identifier.wrapping_add(STORAGE_ID_OFFSET); | ||
| } | ||
| TimelyEvent::Park(_) | TimelyEvent::Text(_) => { | ||
| // No IDs to remap. | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,9 @@ use mz_ore::metrics::MetricsRegistry; | |
| use mz_ore::tracing::TracingHandle; | ||
| use mz_persist_client::cache::PersistClientCache; | ||
| use mz_storage_types::connections::ConnectionContext; | ||
| use mz_timely_util::capture::ArcEventLink; | ||
| use mz_txn_wal::operator::TxnsContext; | ||
| use timely::logging::TimelyEvent; | ||
| use timely::progress::Antichain; | ||
| use timely::worker::Worker as TimelyWorker; | ||
| use tokio::sync::mpsc; | ||
|
|
@@ -54,8 +56,12 @@ pub struct ComputeInstanceContext { | |
| pub connection_context: ConnectionContext, | ||
| } | ||
|
|
||
| /// Type alias for the storage timely log reader. | ||
| pub(crate) type StorageTimelyLogReader = | ||
| Arc<ArcEventLink<mz_repr::Timestamp, Vec<(Duration, TimelyEvent)>>>; | ||
|
|
||
| /// Configures the server with compute-specific metrics. | ||
| #[derive(Debug, Clone)] | ||
| #[derive(Clone)] | ||
| struct Config { | ||
| /// `persist` client cache. | ||
| pub persist_clients: Arc<PersistClientCache>, | ||
|
|
@@ -71,6 +77,10 @@ struct Config { | |
| pub metrics_registry: MetricsRegistry, | ||
| /// The number of timely workers per process. | ||
| pub workers_per_process: usize, | ||
| /// Per-worker readers for storage timely logging events. | ||
| // TODO: Consider using the timely config's `process` and `workers` fields to | ||
| // deterministically assign readers to workers by local index, rather than pop(). | ||
| pub storage_log_readers: Arc<Mutex<Vec<StorageTimelyLogReader>>>, | ||
|
Comment on lines
+80
to
+83
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The invariant in compute dataflows is that worker id |
||
| } | ||
|
|
||
| /// Initiates a timely dataflow computation, processing compute commands. | ||
|
|
@@ -81,6 +91,7 @@ pub async fn serve( | |
| txns_ctx: TxnsContext, | ||
| tracing_handle: Arc<TracingHandle>, | ||
| context: ComputeInstanceContext, | ||
| storage_log_readers: Vec<StorageTimelyLogReader>, | ||
| ) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> { | ||
| let config = Config { | ||
| persist_clients, | ||
|
|
@@ -90,6 +101,7 @@ pub async fn serve( | |
| context, | ||
| metrics_registry: metrics_registry.clone(), | ||
| workers_per_process: timely_config.workers, | ||
| storage_log_readers: Arc::new(Mutex::new(storage_log_readers)), | ||
| }; | ||
| let tokio_executor = tokio::runtime::Handle::current(); | ||
|
|
||
|
|
@@ -223,6 +235,8 @@ struct Worker<'w> { | |
| metrics_registry: MetricsRegistry, | ||
| /// The number of timely workers per process. | ||
| workers_per_process: usize, | ||
| /// Reader for storage timely logging events. | ||
| storage_log_reader: Option<StorageTimelyLogReader>, | ||
| } | ||
|
|
||
| impl ClusterSpec for Config { | ||
|
|
@@ -247,6 +261,9 @@ impl ClusterSpec for Config { | |
| let worker_id = timely_worker.index(); | ||
| let metrics = self.metrics.for_worker(worker_id); | ||
|
|
||
| // Take this worker's storage log reader. | ||
| let storage_log_reader = self.storage_log_readers.lock().unwrap().pop(); | ||
|
|
||
| // Create the command channel that broadcasts commands from worker 0 to other workers. We | ||
| // reuse this channel between client connections, to avoid bugs where different workers end | ||
| // up creating incompatible sides of the channel dataflow after reconnects. | ||
|
|
@@ -268,6 +285,7 @@ impl ClusterSpec for Config { | |
| tracing_handle: Arc::clone(&self.tracing_handle), | ||
| metrics_registry: self.metrics_registry.clone(), | ||
| workers_per_process: self.workers_per_process, | ||
| storage_log_reader, | ||
| } | ||
| .run() | ||
| } | ||
|
|
@@ -396,21 +414,27 @@ impl<'w> Worker<'w> { | |
| } | ||
|
|
||
| fn handle_command(&mut self, cmd: ComputeCommand) { | ||
| match &cmd { | ||
| ComputeCommand::CreateInstance(_) => { | ||
| self.compute_state = Some(ComputeState::new( | ||
| Arc::clone(&self.persist_clients), | ||
| self.txns_ctx.clone(), | ||
| self.metrics.clone(), | ||
| Arc::clone(&self.tracing_handle), | ||
| self.context.clone(), | ||
| self.metrics_registry.clone(), | ||
| self.workers_per_process, | ||
| )); | ||
| } | ||
| _ => (), | ||
| let is_create_instance = matches!(&cmd, ComputeCommand::CreateInstance(_)); | ||
| if is_create_instance { | ||
| self.compute_state = Some(ComputeState::new( | ||
| Arc::clone(&self.persist_clients), | ||
| self.txns_ctx.clone(), | ||
| self.metrics.clone(), | ||
| Arc::clone(&self.tracing_handle), | ||
| self.context.clone(), | ||
| self.metrics_registry.clone(), | ||
| self.workers_per_process, | ||
| )); | ||
| } | ||
| self.activate_compute().unwrap().handle_compute_command(cmd); | ||
| // Take the storage log reader before borrowing self for activate_compute. | ||
| let storage_log_reader = if is_create_instance { | ||
| self.storage_log_reader.take() | ||
| } else { | ||
| None | ||
| }; | ||
| let mut active = self.activate_compute().unwrap(); | ||
| active.storage_log_reader = storage_log_reader; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fragile as it only works if the current command is |
||
| active.handle_compute_command(cmd); | ||
| } | ||
|
|
||
| fn activate_compute(&mut self) -> Option<ActiveComputeState<'_>> { | ||
|
|
@@ -419,6 +443,7 @@ impl<'w> Worker<'w> { | |
| timely_worker: &mut *self.timely_worker, | ||
| compute_state, | ||
| response_tx: &mut self.response_tx, | ||
| storage_log_reader: None, | ||
| }) | ||
| } else { | ||
| None | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be feature-flagged. It means a new parameter to clusterd that is set by envd depending on a feature flag.