diff options
Diffstat (limited to 'src/modules/loadbalance.rs')
-rw-r--r-- | src/modules/loadbalance.rs | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/src/modules/loadbalance.rs b/src/modules/loadbalance.rs new file mode 100644 index 0000000..5358b03 --- /dev/null +++ b/src/modules/loadbalance.rs @@ -0,0 +1,62 @@ +//! Load balancing module +//! +//! Given a set of handlers, the handler that is the least busy will handle the next request. +//! Current implementation does not scale well for many handlers. +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use anyhow::Result; +use serde::Deserialize; +use serde_yaml::Value; +use std::{ + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +pub struct LoadBalanceKind; + +#[derive(Deserialize)] +struct LoadBalanceConfig(Vec<DynNode>); + +struct LoadBalance { + load: Vec<AtomicUsize>, + config: LoadBalanceConfig, +} + +impl NodeKind for LoadBalanceKind { + fn name(&self) -> &'static str { + "loadbalance" + } + fn instanciate(&self, config: Value) -> Result<Arc<dyn Node>> { + let config = serde_yaml::from_value::<LoadBalanceConfig>(config)?; + Ok(Arc::new(LoadBalance { + load: config.0.iter().map(|_| AtomicUsize::new(0)).collect(), + config, + })) + } +} +impl Node for LoadBalance { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let index = self + .load + .iter() + .enumerate() + .min_by_key(|(_, k)| k.load(Ordering::Relaxed)) + .map(|(i, _)| i) + .ok_or(ServiceError::CustomStatic("zero routes to balance load"))?; + + self.load[index].fetch_add(1, Ordering::Relaxed); + let resp = self.config.0[index].handle(context, request).await; + self.load[index].fetch_sub(1, Ordering::Relaxed); + resp + }) + } +} |