Skip to content

Commit

Permalink
Support observe backend health status cloudflare#225
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Jul 13, 2024
1 parent 42a847d commit fb62869
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
88 changes: 88 additions & 0 deletions pingora-load-balancing/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,24 @@ use pingora_http::{RequestHeader, ResponseHeader};
use std::sync::Arc;
use std::time::Duration;

#[async_trait]
pub trait HealthObserve {
/// This function is called When health status changes
async fn health_check_callback(&self, _target: &Backend, _healthy: bool);
}
pub type HealthCheckCallback = Box<dyn HealthObserve + Send + Sync>;

/// [HealthCheck] is the interface to implement health check for backends
#[async_trait]
pub trait HealthCheck {
/// Check the given backend.
///
/// `Ok(())`` if the check passes, otherwise the check fails.
async fn check(&self, target: &Backend) -> Result<()>;

/// This function is called When health status changes
async fn health_status_change(&self, _target: &Backend, _healthy: bool) {}

/// This function defines how many *consecutive* checks should flip the health of a backend.
///
/// For example: with `success``: `true`: this function should return the
Expand All @@ -56,6 +67,7 @@ pub struct TcpHealthCheck {
/// set, it will also try to establish a TLS connection on top of the TCP connection.
pub peer_template: BasicPeer,
connector: TransportConnector,
callback: Option<HealthCheckCallback>,
}

impl Default for TcpHealthCheck {
Expand All @@ -67,6 +79,7 @@ impl Default for TcpHealthCheck {
consecutive_failure: 1,
peer_template,
connector: TransportConnector::new(None),
callback: None,
}
}
}
Expand All @@ -93,6 +106,11 @@ impl TcpHealthCheck {
pub fn set_connector(&mut self, connector: TransportConnector) {
self.connector = connector;
}

/// Set the health observe callback function.
pub fn set_callback(&mut self, callback: HealthCheckCallback) {
self.callback = Some(callback);
}
}

#[async_trait]
Expand All @@ -110,6 +128,12 @@ impl HealthCheck for TcpHealthCheck {
peer._address = target.addr.clone();
self.connector.get_stream(&peer).await.map(|_| {})
}

async fn health_status_change(&self, target: &Backend, healthy: bool) {
if let Some(callback) = &self.callback {
callback.health_check_callback(target, healthy).await;
}
}
}

type Validator = Box<dyn Fn(&ResponseHeader) -> Result<()> + Send + Sync>;
Expand Down Expand Up @@ -147,6 +171,7 @@ pub struct HttpHealthCheck {
/// Sometimes the health check endpoint lives one a different port than the actual backend.
/// Setting this option allows the health check to perform on the given port of the backend IP.
pub port_override: Option<u16>,
callback: Option<HealthCheckCallback>,
}

impl HttpHealthCheck {
Expand Down Expand Up @@ -174,13 +199,18 @@ impl HttpHealthCheck {
req,
validator: None,
port_override: None,
callback: None,
}
}

/// Replace the internal http connector with the given [HttpConnector]
pub fn set_connector(&mut self, connector: HttpConnector) {
self.connector = connector;
}
/// Set the health observe callback function.
pub fn set_callback(&mut self, callback: HealthCheckCallback) {
self.callback = Some(callback);
}
}

#[async_trait]
Expand Down Expand Up @@ -235,6 +265,11 @@ impl HealthCheck for HttpHealthCheck {

Ok(())
}
async fn health_status_change(&self, target: &Backend, healthy: bool) {
if let Some(callback) = &self.callback {
callback.health_check_callback(target, healthy).await;
}
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -313,8 +348,11 @@ impl Health {

#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicU16, Ordering};

use super::*;
use crate::SocketAddr;
use async_trait::async_trait;

#[tokio::test]
async fn test_tcp_check() {
Expand Down Expand Up @@ -381,4 +419,54 @@ mod test {

assert!(http_check.check(&backend).await.is_ok());
}

struct Observe {
healthy_count: Arc<AtomicU16>,
}
#[async_trait]
impl HealthObserve for Observe {
async fn health_check_callback(&self, _target: &Backend, healthy: bool) {
if healthy {
self.healthy_count.fetch_add(1, Ordering::Relaxed);
}
}
}

#[tokio::test]
async fn test_tcp_health_observe() {
let healthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
healthy_count: healthy_count.clone(),
};
let bob = Box::new(ob);

let mut tcp_check = TcpHealthCheck::default();
tcp_check.set_callback(bob);

let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:80".parse().unwrap()),
weight: 1,
};

assert!(tcp_check.check(&backend).await.is_ok());
assert!(1 == healthy_count.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_https_health_observe() {
let healthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
healthy_count: healthy_count.clone(),
};
let bob = Box::new(ob);
let mut https_check = HttpHealthCheck::new("one.one.one.one", true);
https_check.set_callback(bob);

let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:443".parse().unwrap()),
weight: 1,
};

assert!(https_check.check(&backend).await.is_ok());
assert!(1 == healthy_count.load(Ordering::Relaxed));
}
}
2 changes: 2 additions & 0 deletions pingora-load-balancing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,10 @@ impl Backends {
h.observe_health(errored.is_none(), check.health_threshold(errored.is_none()));
if flipped {
if let Some(e) = errored {
check.health_status_change(backend, false).await;
warn!("{backend:?} becomes unhealthy, {e}");
} else {
check.health_status_change(backend, true).await;
info!("{backend:?} becomes healthy");
}
}
Expand Down

0 comments on commit fb62869

Please sign in to comment.