Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 193 additions & 9 deletions pingora-load-balancing/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@ use std::time::{Duration, Instant};

use super::{BackendIter, BackendSelection, LoadBalancer};
use async_trait::async_trait;
use pingora_core::services::{background::BackgroundService, ServiceReadyNotifier};
use pingora_core::{
server::ShutdownWatch,
services::{background::BackgroundService, ServiceReadyNotifier},
};

async fn shutdown_changed(shutdown: &mut ShutdownWatch) -> bool {
match shutdown.changed().await {
Ok(()) => *shutdown.borrow(),
Err(_) => true,
}
}

impl<S: Send + Sync + BackendSelection + 'static> LoadBalancer<S>
where
S::Iter: BackendIter,
{
pub async fn run(
&self,
shutdown: pingora_core::server::ShutdownWatch,
mut shutdown: ShutdownWatch,
mut ready_opt: Option<ServiceReadyNotifier>,
) -> () {
// 136 years
Expand All @@ -43,8 +53,19 @@ where

if next_update <= now {
// TODO: log err
let _ = self.update().await;
next_update = now + self.update_frequency.unwrap_or(NEVER);
tokio::select! {
biased;
is_shutdown = shutdown_changed(&mut shutdown) => {
if is_shutdown {
return;
}
continue;
}
update = self.update() => {
let _ = update;
next_update = now + self.update_frequency.unwrap_or(NEVER);
}
}
}

// After the first update, discovery and selection setup will be
Expand All @@ -54,17 +75,33 @@ where
}

if next_health_check <= now {
self.backends
.run_health_check(self.parallel_health_check)
.await;
next_health_check = now + self.health_check_frequency.unwrap_or(NEVER);
tokio::select! {
biased;
is_shutdown = shutdown_changed(&mut shutdown) => {
if is_shutdown {
return;
}
continue;
}
_ = self.backends.run_health_check(self.parallel_health_check) => {
next_health_check = now + self.health_check_frequency.unwrap_or(NEVER);
}
}
}

if self.update_frequency.is_none() && self.health_check_frequency.is_none() {
return;
}
let to_wake = std::cmp::min(next_update, next_health_check);
tokio::time::sleep_until(to_wake.into()).await;
tokio::select! {
biased;
is_shutdown = shutdown_changed(&mut shutdown) => {
if is_shutdown {
return;
}
}
_ = tokio::time::sleep_until(to_wake.into()) => {}
}
now = Instant::now();
}
}
Expand All @@ -90,3 +127,150 @@ where
self.run(shutdown, None).await
}
}

#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};
use std::future;
use std::sync::Arc;

use async_trait::async_trait;
use pingora_error::Result;
use tokio::sync::{watch, Notify};

use super::*;
use crate::discovery::ServiceDiscovery;
use crate::health_check::HealthCheck;
use crate::selection;
use crate::{Backend, Backends};

struct NotifyingDiscovery {
notify: Arc<Notify>,
}

#[async_trait]
impl ServiceDiscovery for NotifyingDiscovery {
async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
self.notify.notify_one();
Ok((BTreeSet::new(), HashMap::new()))
}
}

struct PendingDiscovery {
notify: Arc<Notify>,
}

#[async_trait]
impl ServiceDiscovery for PendingDiscovery {
async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
self.notify.notify_one();
future::pending().await
}
}

struct PendingHealthCheck {
notify: Arc<Notify>,
drop_notify: Arc<Notify>,
}

#[async_trait]
impl HealthCheck for PendingHealthCheck {
async fn check(&self, _target: &Backend) -> Result<()> {
struct NotifyOnDrop(Arc<Notify>);

impl Drop for NotifyOnDrop {
fn drop(&mut self) {
self.0.notify_one();
}
}

let _notify_on_drop = NotifyOnDrop(self.drop_notify.clone());
self.notify.notify_one();
future::pending().await
}

fn health_threshold(&self, _success: bool) -> usize {
1
}
}

async fn assert_run_exits_on_shutdown(
lb: LoadBalancer<selection::RoundRobin>,
notify: Arc<Notify>,
) {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
lb.run(shutdown_rx, None).await;
});

notify.notified().await;
shutdown_tx.send(true).unwrap();

tokio::time::timeout(Duration::from_secs(1), handle)
.await
.expect("background service should observe shutdown promptly")
.expect("background service task should not panic");
}

#[tokio::test]
async fn run_returns_when_shutdown_while_sleeping() {
let notify = Arc::new(Notify::new());
let discovery = NotifyingDiscovery {
notify: notify.clone(),
};
let mut lb = LoadBalancer::<selection::RoundRobin>::from_backends(Backends::new(Box::new(
discovery,
)));
lb.update_frequency = Some(Duration::from_secs(60));

assert_run_exits_on_shutdown(lb, notify).await;
}

#[tokio::test]
async fn run_returns_when_shutdown_while_updating() {
let notify = Arc::new(Notify::new());
let discovery = PendingDiscovery {
notify: notify.clone(),
};
let lb = LoadBalancer::<selection::RoundRobin>::from_backends(Backends::new(Box::new(
discovery,
)));

assert_run_exits_on_shutdown(lb, notify).await;
}

#[tokio::test]
async fn run_returns_when_shutdown_while_health_checking() {
let notify = Arc::new(Notify::new());
let drop_notify = Arc::new(Notify::new());
let mut lb =
LoadBalancer::<selection::RoundRobin>::try_from_iter(["127.0.0.1:80"]).unwrap();
lb.set_health_check(Box::new(PendingHealthCheck {
notify: notify.clone(),
drop_notify: drop_notify.clone(),
}));

assert_run_exits_on_shutdown(lb, notify).await;
tokio::time::timeout(Duration::from_secs(1), drop_notify.notified())
.await
.expect("pending health check should be cancelled");
}

#[tokio::test]
async fn run_aborts_parallel_health_check_on_shutdown() {
let notify = Arc::new(Notify::new());
let drop_notify = Arc::new(Notify::new());
let mut lb =
LoadBalancer::<selection::RoundRobin>::try_from_iter(["127.0.0.1:80"]).unwrap();
lb.parallel_health_check = true;
lb.set_health_check(Box::new(PendingHealthCheck {
notify: notify.clone(),
drop_notify: drop_notify.clone(),
}));

assert_run_exits_on_shutdown(lb, notify).await;
tokio::time::timeout(Duration::from_secs(1), drop_notify.notified())
.await
.expect("parallel health check task should be aborted");
}
}
14 changes: 13 additions & 1 deletion pingora-load-balancing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,17 @@ impl Backends {
use crate::health_check::HealthCheck;
use log::{info, warn};
use pingora_runtime::current_handle;
use tokio::task::AbortHandle;

struct AbortOnDrop(Vec<AbortHandle>);

impl Drop for AbortOnDrop {
fn drop(&mut self) {
for handle in &self.0 {
handle.abort();
}
}
}

async fn check_and_report(
backend: &Backend,
Expand Down Expand Up @@ -293,7 +304,8 @@ impl Backends {
check_and_report(&backend, &check, &ht).await;
})
});

let jobs = Vec::from_iter(jobs);
let _abort_on_drop = AbortOnDrop(jobs.iter().map(|job| job.abort_handle()).collect());
futures::future::join_all(jobs).await;
} else {
for backend in backends.iter() {
Expand Down
Loading