diff options
author | metamuffin <metamuffin@disroot.org> | 2025-03-18 23:16:42 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-03-18 23:16:42 +0100 |
commit | e21267342c74927a1f9255b4ea02fac32961031a (patch) | |
tree | ba60b12f93bbe768d40b50729ad42353af804834 | |
parent | 412b0b810e9e1fd8feac9edea9034121739b4f8f (diff) | |
download | gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.bz2 gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.zst |
limits module
-rw-r--r-- | Cargo.lock | 23 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | readme.md | 16 | ||||
-rw-r--r-- | src/modules/limits.rs | 110 | ||||
-rw-r--r-- | src/modules/mod.rs | 2 | ||||
-rw-r--r-- | src/modules/paths.rs | 1 |
6 files changed, 153 insertions, 0 deletions
@@ -130,6 +130,28 @@ dependencies = [ ] [[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "atomic-waker" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -639,6 +661,7 @@ dependencies = [ "aes-gcm-siv", "anyhow", "argon2", + "async-stream", "base64 0.22.1", "bytes", "env_logger", @@ -36,6 +36,7 @@ tokio = { version = "1.39.3", features = ["full"] } tokio-util = { version = "0.7.11", features = ["io"] } futures-util = "0.3.30" futures = "0.3.30" +async-stream = "0.3.6" pin-project = "1.1.5" # Config @@ -219,6 +219,8 @@ themselves; in that case the request is passed on. - `bin`: Path to the CGI binary (string) - `user`: User that the script is executed as. Requires to run gnix as root. (optional string) + - `args`: List of arguments. (list of string) + - `env`: Environment variables. (map from string to string) - **module `cache`** - Caches requests. **This is experimental! Don't use this.** @@ -240,6 +242,20 @@ themselves; in that case the request is passed on. this. **TODO request size limit** - Takes a sequence of handlers. +- **module `limits`** + - Limits size and transmission rate of request and response bodies. The limit + is enforced on an internal data frame level and does therefore not exactly + reach the specified limits, but never exceeds them. + - `{request,response}.rate` Maximum transmission rate in bytes per second. + (number) + - `{request,response}.rate_buffer` How much transmission time can be + accumulated by not reading for some time in milliseconds. (number) + - `{request,response}.rate_buffer_filled` If the rate buffer is filled up + initially. (boolean) + - `{request,response}.size` Maximum total body size. The body is cut off + before the frame that exceeds this limit. Therefore the body is up to one + frame size smaller than allowed. + - **module `debug`** - Replies with information about the request to debug. Includes source address, HTTP version, URI and headers. diff --git a/src/modules/limits.rs b/src/modules/limits.rs new file mode 100644 index 0000000..7ae5fc9 --- /dev/null +++ b/src/modules/limits.rs @@ -0,0 +1,110 @@ +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use anyhow::Result; +use bytes::Bytes; +use futures::{Future, StreamExt}; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use hyper::body::{Body, Frame}; +use serde::Deserialize; +use std::{ + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::time::sleep_until; + +pub struct LimitsKind; + +#[derive(Deserialize)] +pub struct Limits { + #[serde(default)] + request: LimitParam, + #[serde(default)] + response: LimitParam, + next: DynNode, +} + +#[derive(Debug, Clone, Deserialize, Default)] +struct LimitParam { + size: Option<u64>, + rate: Option<u64>, + rate_buffer: Option<u64>, +} + +impl NodeKind for LimitsKind { + fn name(&self) -> &'static str { + "limits" + } + fn instanciate(&self, config: serde_yml::Value) -> Result<Arc<dyn Node>> { + Ok(Arc::new(serde_yml::from_value::<Limits>(config)?)) + } +} + +impl Node for Limits { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let request = request.map(|body| limit_body(body, self.request.clone()).boxed()); + let response = self.next.handle(context, request).await?; + Ok(response.map(|body| limit_body(body, self.response.clone()).boxed())) + }) + } +} + +fn limit_body( + inner: BoxBody<Bytes, ServiceError>, + config: LimitParam, +) -> impl Body<Data = Bytes, Error = ServiceError> { + let mut stream = inner.into_data_stream(); + let mut state = State::new(config); + StreamBody::new(async_stream::stream! { + while let Some(k) = stream.next().await { + match k { + Ok(data) => { + if state.update(data.len()).await { + yield Ok(Frame::data(data)) + } else { + break; + } + }, + Err(e) => yield Err(e) + } + } + }) +} + +struct State { + config: LimitParam, + last_read: Instant, + size: u64, +} + +impl State { + pub fn new(config: LimitParam) -> Self { + Self { + config, + last_read: Instant::now(), + size: 0, + } + } + pub async fn update(&mut self, len: usize) -> bool { + if let Some(rate) = self.config.rate { + let buffer = Duration::from_millis(self.config.rate_buffer.unwrap_or(1000)); + let el = self.last_read.elapsed(); + if el > buffer { + self.last_read += el - buffer + } + self.last_read += Duration::from_nanos((len as u64 * 1_000_000_000) / rate); + sleep_until(self.last_read.into()).await + } + if let Some(max_size) = self.config.size { + self.size += len as u64; + self.size < max_size + } else { + true + } + } +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index b751ba3..283c86a 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -25,6 +25,7 @@ mod proxy; mod redirect; mod switch; mod upgrade_insecure; +mod limits; pub type NodeRequest = Request<BoxBody<Bytes, ServiceError>>; pub type NodeResponse = Response<BoxBody<Bytes, ServiceError>>; @@ -50,6 +51,7 @@ pub static MODULES: &[&dyn NodeKind] = &[ &upgrade_insecure::UpgradeInsecureKind, &inspect::InspectKind, &fallback::FallbackKind, + &limits::LimitsKind, ]; pub struct NodeContext { diff --git a/src/modules/paths.rs b/src/modules/paths.rs index 3dc7596..30e58cf 100644 --- a/src/modules/paths.rs +++ b/src/modules/paths.rs @@ -41,6 +41,7 @@ impl NodeKind for PathsKind { })) } } + impl Node for Paths { fn handle<'a>( &'a self, |