diff options
author | metamuffin <metamuffin@disroot.org> | 2025-03-18 23:16:42 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-03-18 23:16:42 +0100 |
commit | e21267342c74927a1f9255b4ea02fac32961031a (patch) | |
tree | ba60b12f93bbe768d40b50729ad42353af804834 /src | |
parent | 412b0b810e9e1fd8feac9edea9034121739b4f8f (diff) | |
download | gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.bz2 gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.zst |
limits module
Diffstat (limited to 'src')
-rw-r--r-- | src/modules/limits.rs | 110 | ||||
-rw-r--r-- | src/modules/mod.rs | 2 | ||||
-rw-r--r-- | src/modules/paths.rs | 1 |
3 files changed, 113 insertions, 0 deletions
diff --git a/src/modules/limits.rs b/src/modules/limits.rs new file mode 100644 index 0000000..7ae5fc9 --- /dev/null +++ b/src/modules/limits.rs @@ -0,0 +1,110 @@ +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use anyhow::Result; +use bytes::Bytes; +use futures::{Future, StreamExt}; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use hyper::body::{Body, Frame}; +use serde::Deserialize; +use std::{ + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::time::sleep_until; + +pub struct LimitsKind; + +#[derive(Deserialize)] +pub struct Limits { + #[serde(default)] + request: LimitParam, + #[serde(default)] + response: LimitParam, + next: DynNode, +} + +#[derive(Debug, Clone, Deserialize, Default)] +struct LimitParam { + size: Option<u64>, + rate: Option<u64>, + rate_buffer: Option<u64>, +} + +impl NodeKind for LimitsKind { + fn name(&self) -> &'static str { + "limits" + } + fn instanciate(&self, config: serde_yml::Value) -> Result<Arc<dyn Node>> { + Ok(Arc::new(serde_yml::from_value::<Limits>(config)?)) + } +} + +impl Node for Limits { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let request = request.map(|body| limit_body(body, self.request.clone()).boxed()); + let response = self.next.handle(context, request).await?; + Ok(response.map(|body| limit_body(body, self.response.clone()).boxed())) + }) + } +} + +fn limit_body( + inner: BoxBody<Bytes, ServiceError>, + config: LimitParam, +) -> impl Body<Data = Bytes, Error = ServiceError> { + let mut stream = inner.into_data_stream(); + let mut state = State::new(config); + StreamBody::new(async_stream::stream! { + while let Some(k) = stream.next().await { + match k { + Ok(data) => { + if state.update(data.len()).await { + yield Ok(Frame::data(data)) + } else { + break; + } + }, + Err(e) => yield Err(e) + } + } + }) +} + +struct State { + config: LimitParam, + last_read: Instant, + size: u64, +} + +impl State { + pub fn new(config: LimitParam) -> Self { + Self { + config, + last_read: Instant::now(), + size: 0, + } + } + pub async fn update(&mut self, len: usize) -> bool { + if let Some(rate) = self.config.rate { + let buffer = Duration::from_millis(self.config.rate_buffer.unwrap_or(1000)); + let el = self.last_read.elapsed(); + if el > buffer { + self.last_read += el - buffer + } + self.last_read += Duration::from_nanos((len as u64 * 1_000_000_000) / rate); + sleep_until(self.last_read.into()).await + } + if let Some(max_size) = self.config.size { + self.size += len as u64; + self.size < max_size + } else { + true + } + } +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index b751ba3..283c86a 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -25,6 +25,7 @@ mod proxy; mod redirect; mod switch; mod upgrade_insecure; +mod limits; pub type NodeRequest = Request<BoxBody<Bytes, ServiceError>>; pub type NodeResponse = Response<BoxBody<Bytes, ServiceError>>; @@ -50,6 +51,7 @@ pub static MODULES: &[&dyn NodeKind] = &[ &upgrade_insecure::UpgradeInsecureKind, &inspect::InspectKind, &fallback::FallbackKind, + &limits::LimitsKind, ]; pub struct NodeContext { diff --git a/src/modules/paths.rs b/src/modules/paths.rs index 3dc7596..30e58cf 100644 --- a/src/modules/paths.rs +++ b/src/modules/paths.rs @@ -41,6 +41,7 @@ impl NodeKind for PathsKind { })) } } + impl Node for Paths { fn handle<'a>( &'a self, |