//! Load balancing module //! //! Given a set of handlers, the handler that is the least busy will handle the next request. //! Current implementation does not scale well for many handlers. use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; use crate::{config::DynNode, error::ServiceError}; use anyhow::Result; use serde::Deserialize; use serde_yaml::Value; use std::{ future::Future, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; pub struct LoadBalanceKind; #[derive(Deserialize)] struct LoadBalanceConfig(Vec); struct LoadBalance { load: Vec, config: LoadBalanceConfig, } impl NodeKind for LoadBalanceKind { fn name(&self) -> &'static str { "loadbalance" } fn instanciate(&self, config: Value) -> Result> { let config = serde_yaml::from_value::(config)?; Ok(Arc::new(LoadBalance { load: config.0.iter().map(|_| AtomicUsize::new(0)).collect(), config, })) } } impl Node for LoadBalance { fn handle<'a>( &'a self, context: &'a mut NodeContext, request: NodeRequest, ) -> Pin> + Send + Sync + 'a>> { Box::pin(async move { let index = self .load .iter() .enumerate() .min_by_key(|(_, k)| k.load(Ordering::Relaxed)) .map(|(i, _)| i) .ok_or(ServiceError::CustomStatic("zero routes to balance load"))?; self.load[index].fetch_add(1, Ordering::Relaxed); let resp = self.config.0[index].handle(context, request).await; self.load[index].fetch_sub(1, Ordering::Relaxed); resp }) } }