Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/adapters/ingresses/cloudflare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
use super::Ingress;
use async_trait::async_trait;
use derive_getters::Getters;
use miette::Result;
use miette::{Result, bail};
use tracing::{debug, info};

#[derive(Getters)]
Expand Down Expand Up @@ -72,6 +72,7 @@ impl Ingress for CloudflareWorkerIngress {

async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> {
info!("Setting Cloudflare canary traffic to {percent}.");
// bail!("BAIL");
let control_version = DeploymentVersion::builder()
.percentage((100 - percent.clone().as_i32()) as u64)
.version_id(self.control_version_id.clone().unwrap())
Expand Down
73 changes: 60 additions & 13 deletions src/subsystems/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,77 @@ impl IntoSubsystem<Report> for ControllerSubsystem {
.build();

// • Start the ingress subsystem.
subsys.start(SubsystemBuilder::new(
INGRESS_SUBSYSTEM_NAME,
ingress_subsystem.into_subsystem(),
));
let ingress_subsys = subsys.start(
SubsystemBuilder::new(INGRESS_SUBSYSTEM_NAME, ingress_subsystem.into_subsystem())
.detached(),
);

// • Start the platform subsystem.
subsys.start(SubsystemBuilder::new(
PLATFORM_SUBSYSTEM_NAME,
platform_subsystem.into_subsystem(),
));
let platform_subsys = subsys.start(
SubsystemBuilder::new(PLATFORM_SUBSYSTEM_NAME, platform_subsystem.into_subsystem())
.detached(),
);

// • Start the MonitorController subsytem.
// The MonitorController and Monitor don't need to be
// detached because they can be shutdown in tandem.
// We need them to drop their channels to signal to
// the other subsystems why the shutdown has occurred.
subsys.start(SubsystemBuilder::new(
MONITOR_CONTROLLER_SUBSYSTEM_NAME,
monitor_controller.into_subsystem(),
));

// • Start the relay subsystem.
subsys.start(SubsystemBuilder::new(
RELAY_SUBSYSTEM_NAME,
relay_subsystem.into_subsystem(),
));

let relay_subsys = subsys.start(
SubsystemBuilder::new(RELAY_SUBSYSTEM_NAME, relay_subsystem.into_subsystem())
.detached(),
);

trace!("Controller waiting for shutdown request...");
subsys.on_shutdown_requested().await;
subsys.initiate_shutdown();
trace!("Controller shutdown requested!");
// Waiting for children will block until the Monitor and
// MonitorController are shut down.
trace!("Contoller waiting for children to shutdown");
subsys.wait_for_children().await;
trace!("Controller children shutdown");
// Next, we wait for the relay, because we need to abandon
// any state locks that we've taken before we can roll back
// the ingress or yank the platform.
trace!("Controller waiting for relay to shutdown");
relay_subsys.initiate_shutdown();
trace!("Controller waiting for relay to shutdown complete");
trace!("Controller waiting for relay to join");
relay_subsys.join().await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pseudo-code.

Suggested change
relay_subsys.join().await?;
if let Err(_) = relay_subsys.join().await? {
backend.shutdown_irregular(); // POST to backend to mark as errored.
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to change the on_error to CatchAndLocalShutdown.

trace!("Relay joined");

trace!("Relay shutdown!");

trace!("Controller waiting for ingress to shutdown");
ingress_subsys.initiate_shutdown();
trace!("Controller waiting for ingress to shutdown complete");
trace!("Controller waiting for ingress to join");
ingress_subsys.join().await?;
trace!("Ingress joined");

trace!("Ingress shutdown!");

trace!("Controller waiting for platform to shutdown");
platform_subsys.initiate_shutdown();
trace!("Controller waiting for platform to shutdown complete");
trace!("Controller waiting for platform to join");
platform_subsys.join().await?;
trace!("Platform joined");

trace!("Platform shutdown!");

// TODO: Tell the backend to mark the rollout as
// cancelled (if it isn't already marked as completed).
trace!("TODO: API CALL TO CANCEL ROLLOUT");

trace!("Controller shutdown complete!");
Ok(())
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/subsystems/controller/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{
};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle};
use tokio_stream::{Stream, StreamExt as _, wrappers::IntervalStream};
use tracing::debug;
use tracing::{debug, trace};

use crate::{
MonitorSubsystem,
Expand Down Expand Up @@ -157,13 +157,19 @@ impl IntoSubsystem<Report> for MonitorController<StatusCode> {
loop {
select! {
_ = subsys.on_shutdown_requested() => {
trace!("MonitorController received shutdown request.");
// If we've received the shutdown signal,
// we don't have anything to do except ensure
// our children have shutdown, guaranteeing
// the monitor is shut down.
// NB: We can't implement the shutdown trait because
// self has been partially moved.
subsys.request_local_shutdown();
trace!("MonitorController requested local shutdown complete.");

trace!("MonitorController waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("MonitorController children shutdown complete.");
return Ok(());
}
baseline_version_id = self.baseline_receiver.recv() => {
Expand Down
9 changes: 8 additions & 1 deletion src/subsystems/ingress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use miette::{Report, Result};
use tokio::sync::mpsc::channel;
use tokio::{select, sync::mpsc::Receiver};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle};
use tracing::debug;
use tracing::{debug, trace};

use crate::adapters::BoxedIngress;

Expand Down Expand Up @@ -88,6 +88,13 @@ impl IntoSubsystem<Report> for IngressSubsystem {
loop {
select! {
_ = subsys.on_shutdown_requested() => {
trace!("IngressSubsystem received shutdown request.");
subsys.request_local_shutdown();
trace!("IngressSubsystem requested local shutdown complete.");

trace!("IngressSubsystem waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("IngressSubsystem children shutdown complete.");
return self.shutdown().await;
}
// Shutdown signal from one of the handles. Since this thread has exclusive
Expand Down
14 changes: 12 additions & 2 deletions src/subsystems/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{
sync::mpsc::{Receiver, channel},
};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle};
use tracing::debug;
use tracing::{debug, trace};

use super::handle::Handle;
use super::{ShutdownResult, Shutdownable};
Expand Down Expand Up @@ -81,6 +81,13 @@ impl IntoSubsystem<Report> for MonitorSubsystem<StatusCode> {
loop {
select! {
_ = subsys.on_shutdown_requested() => {
trace!("MonitorSubsystem received shutdown request.");
subsys.request_local_shutdown();
trace!("MonitorSubsystem requested local shutdown complete.");

trace!("MonitorSubsystem waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("MonitorSubsystem children shutdown complete.");
return self.shutdown().await;
}
_ = self.shutdown.recv() => {
Expand All @@ -102,9 +109,12 @@ impl IntoSubsystem<Report> for MonitorSubsystem<StatusCode> {
#[async_trait]
impl Shutdownable for MonitorSubsystem<StatusCode> {
async fn shutdown(&mut self) -> ShutdownResult {
trace!("Shutting down MonitorSubsystem...");
// We just have to shut the monitor down manually,
// since we have an exclusive lock on it.
self.monitor.shutdown().await
let _ = self.monitor.shutdown().await;
trace!("MonitorSubsystem shut down!");
Ok(())
}
}

Expand Down
9 changes: 8 additions & 1 deletion src/subsystems/platform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::{
sync::mpsc::{self, Receiver, channel},
};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle};
use tracing::debug;
use tracing::{debug, trace};

use crate::adapters::BoxedPlatform;

Expand Down Expand Up @@ -97,6 +97,13 @@ impl IntoSubsystem<Report> for PlatformSubsystem {
select! {
// Shutdown comes first so it has high priority.
_ = subsys.on_shutdown_requested() => {
trace!("PlatformSubsystem received shutdown request.");
subsys.request_local_shutdown();
trace!("PlatformSubsystem requested local shutdown complete.");

trace!("PlatformSubsystem waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("PlatformSubsystem children shutdown complete.");
return self.shutdown().await;
}
// Shutdown signal from one of the handles. Since this thread has exclusive
Expand Down
15 changes: 14 additions & 1 deletion src/subsystems/relay/lock_mgmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::oneshot;
use tokio::time::{Interval, interval};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle};
use tracing::{debug, trace};

use crate::{
Shutdownable,
Expand Down Expand Up @@ -44,7 +45,9 @@ impl LockManager {
) -> Result<Self> {
let (done_sender, task_done) = mpsc::channel(1);
// Take the initial lock.
trace!("LockManager taking initial lock...");
let locked_state = backend.lock_state(&metadata, &state, done_sender).await?;
trace!("LockManager taking initial lock complete.");
let freq = *locked_state.frequency();
let timer = interval(freq / 2);
Ok(Self {
Expand Down Expand Up @@ -79,6 +82,13 @@ impl IntoSubsystem<Report> for LockManager {
}
}
_ = subsys.on_shutdown_requested() => {
trace!("LockManager received shutdown request.");
subsys.request_local_shutdown();
trace!("LockManager requested local shutdown complete.");

trace!("LockManager waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("LockManager children shutdown complete.");
// Release the lock.
return self.shutdown().await;
}
Expand All @@ -94,7 +104,10 @@ impl IntoSubsystem<Report> for LockManager {
#[async_trait]
impl Shutdownable for LockManager {
async fn shutdown(&mut self) -> ShutdownResult {
trace!("LockManager shutting down...");
// Release any of the locks we've taken.
self.backend.abandon_lock(&self.meta, &self.state).await
let _ = self.backend.abandon_lock(&self.meta, &self.state).await;
trace!("LockManager shut down!");
Ok(())
}
}
15 changes: 11 additions & 4 deletions src/subsystems/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ impl IntoSubsystem<Report> for RelaySubsystem<StatusCode> {
select! {
// Besides that, we can just hang out.
_ = subsys.on_shutdown_requested() => {
trace!("RelaySubsystem received shutdown request.");
subsys.request_local_shutdown();
trace!("RelaySubsystem requested local shutdown complete.");
// Waiting for children ensure that all of the locks
// we've taken have been released.
trace!("RelaySubsystem waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("RelaySubstsem children shutdown complete");
return Ok(());
}
// • When we start the RelaySubsystem,
Expand All @@ -113,8 +120,7 @@ impl IntoSubsystem<Report> for RelaySubsystem<StatusCode> {
self.backend.upload_observations(&self.meta, batch).await?;
} else {
// The stream has been closed, so we should shutdown.
debug!("Shutting down in relay");
subsys.request_shutdown();
subsys.request_local_shutdown();
}
}
// • We also need to poll the backend for new states.
Expand Down Expand Up @@ -172,7 +178,7 @@ impl IntoSubsystem<Report> for RelaySubsystem<StatusCode> {
let percent = WholePercent::try_from(percent_traffic).unwrap();
self.ingress.set_canary_traffic(percent).await?;

locked_state.mark_done().await?;
// locked_state.mark_done().await?;
},
RollbackCanary => {
// Set traffic to 0 immediately.
Expand All @@ -188,7 +194,8 @@ impl IntoSubsystem<Report> for RelaySubsystem<StatusCode> {
}
} else {
// The stream has been closed, so we should shutdown.
subsys.request_shutdown();
trace!("Shutting down in relay from closed state stream");
subsys.request_local_shutdown();
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/subsystems/relay/poll_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use bon::bon;
use miette::{IntoDiagnostic, Report, Result};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle};
use tracing::trace;

use crate::{
Shutdownable,
Expand Down Expand Up @@ -70,6 +71,13 @@ impl IntoSubsystem<Report> for StatePoller {
loop {
select! {
_ = subsys.on_shutdown_requested() => {
trace!("StatePoller received shutdown request");
subsys.request_local_shutdown();
trace!("StatePoller requested local shutdown complete.");

trace!("StatePoller waiting for children to shutdown.");
subsys.wait_for_children().await;
trace!("StatePoller children shutdown complete.");
return self.shutdown().await
}
_ = self.timer.tick() => {
Expand All @@ -88,6 +96,7 @@ impl IntoSubsystem<Report> for StatePoller {
#[async_trait]
impl Shutdownable for StatePoller {
async fn shutdown(&mut self) -> ShutdownResult {
trace!("StatePoller shut down!");
// Nothing to do! We just stop polling.
Ok(())
}
Expand Down
Loading