From e21267342c74927a1f9255b4ea02fac32961031a Mon Sep 17 00:00:00 2001 From: metamuffin Date: Tue, 18 Mar 2025 23:16:42 +0100 Subject: limits module --- Cargo.lock | 23 +++++++++++ Cargo.toml | 1 + readme.md | 16 ++++++++ src/modules/limits.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/modules/mod.rs | 2 + src/modules/paths.rs | 1 + 6 files changed, 153 insertions(+) create mode 100644 src/modules/limits.rs diff --git a/Cargo.lock b/Cargo.lock index 470bd68..1767171 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,6 +129,28 @@ dependencies = [ "password-hash", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -639,6 +661,7 @@ dependencies = [ "aes-gcm-siv", "anyhow", "argon2", + "async-stream", "base64 0.22.1", "bytes", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index bfcbd30..2dad279 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ tokio = { version = "1.39.3", features = ["full"] } tokio-util = { version = "0.7.11", features = ["io"] } futures-util = "0.3.30" futures = "0.3.30" +async-stream = "0.3.6" pin-project = "1.1.5" # Config diff --git a/readme.md b/readme.md index 3b2b5d0..56a47b3 100644 --- a/readme.md +++ b/readme.md @@ -219,6 +219,8 @@ themselves; in that case the request is passed on. - `bin`: Path to the CGI binary (string) - `user`: User that the script is executed as. Requires to run gnix as root. (optional string) + - `args`: List of arguments. (list of string) + - `env`: Environment variables. (map from string to string) - **module `cache`** - Caches requests. **This is experimental! Don't use this.** @@ -240,6 +242,20 @@ themselves; in that case the request is passed on. this. **TODO request size limit** - Takes a sequence of handlers. +- **module `limits`** + - Limits size and transmission rate of request and response bodies. The limit + is enforced on an internal data frame level and does therefore not exactly + reach the specified limits, but never exceeds them. + - `{request,response}.rate` Maximum transmission rate in bytes per second. + (number) + - `{request,response}.rate_buffer` How much transmission time can be + accumulated by not reading for some time in milliseconds. (number) + - `{request,response}.rate_buffer_filled` If the rate buffer is filled up + initially. (boolean) + - `{request,response}.size` Maximum total body size. The body is cut off + before the frame that exceeds this limit. Therefore the body is up to one + frame size smaller than allowed. + - **module `debug`** - Replies with information about the request to debug. Includes source address, HTTP version, URI and headers. diff --git a/src/modules/limits.rs b/src/modules/limits.rs new file mode 100644 index 0000000..7ae5fc9 --- /dev/null +++ b/src/modules/limits.rs @@ -0,0 +1,110 @@ +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use anyhow::Result; +use bytes::Bytes; +use futures::{Future, StreamExt}; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use hyper::body::{Body, Frame}; +use serde::Deserialize; +use std::{ + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::time::sleep_until; + +pub struct LimitsKind; + +#[derive(Deserialize)] +pub struct Limits { + #[serde(default)] + request: LimitParam, + #[serde(default)] + response: LimitParam, + next: DynNode, +} + +#[derive(Debug, Clone, Deserialize, Default)] +struct LimitParam { + size: Option, + rate: Option, + rate_buffer: Option, +} + +impl NodeKind for LimitsKind { + fn name(&self) -> &'static str { + "limits" + } + fn instanciate(&self, config: serde_yml::Value) -> Result> { + Ok(Arc::new(serde_yml::from_value::(config)?)) + } +} + +impl Node for Limits { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin> + Send + Sync + 'a>> { + Box::pin(async move { + let request = request.map(|body| limit_body(body, self.request.clone()).boxed()); + let response = self.next.handle(context, request).await?; + Ok(response.map(|body| limit_body(body, self.response.clone()).boxed())) + }) + } +} + +fn limit_body( + inner: BoxBody, + config: LimitParam, +) -> impl Body { + let mut stream = inner.into_data_stream(); + let mut state = State::new(config); + StreamBody::new(async_stream::stream! { + while let Some(k) = stream.next().await { + match k { + Ok(data) => { + if state.update(data.len()).await { + yield Ok(Frame::data(data)) + } else { + break; + } + }, + Err(e) => yield Err(e) + } + } + }) +} + +struct State { + config: LimitParam, + last_read: Instant, + size: u64, +} + +impl State { + pub fn new(config: LimitParam) -> Self { + Self { + config, + last_read: Instant::now(), + size: 0, + } + } + pub async fn update(&mut self, len: usize) -> bool { + if let Some(rate) = self.config.rate { + let buffer = Duration::from_millis(self.config.rate_buffer.unwrap_or(1000)); + let el = self.last_read.elapsed(); + if el > buffer { + self.last_read += el - buffer + } + self.last_read += Duration::from_nanos((len as u64 * 1_000_000_000) / rate); + sleep_until(self.last_read.into()).await + } + if let Some(max_size) = self.config.size { + self.size += len as u64; + self.size < max_size + } else { + true + } + } +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index b751ba3..283c86a 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -25,6 +25,7 @@ mod proxy; mod redirect; mod switch; mod upgrade_insecure; +mod limits; pub type NodeRequest = Request>; pub type NodeResponse = Response>; @@ -50,6 +51,7 @@ pub static MODULES: &[&dyn NodeKind] = &[ &upgrade_insecure::UpgradeInsecureKind, &inspect::InspectKind, &fallback::FallbackKind, + &limits::LimitsKind, ]; pub struct NodeContext { diff --git a/src/modules/paths.rs b/src/modules/paths.rs index 3dc7596..30e58cf 100644 --- a/src/modules/paths.rs +++ b/src/modules/paths.rs @@ -41,6 +41,7 @@ impl NodeKind for PathsKind { })) } } + impl Node for Paths { fn handle<'a>( &'a self, -- cgit v1.2.3-70-g09d2