summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-03-18 23:16:42 +0100
committermetamuffin <metamuffin@disroot.org>2025-03-18 23:16:42 +0100
commite21267342c74927a1f9255b4ea02fac32961031a (patch)
treeba60b12f93bbe768d40b50729ad42353af804834
parent412b0b810e9e1fd8feac9edea9034121739b4f8f (diff)
downloadgnix-e21267342c74927a1f9255b4ea02fac32961031a.tar
gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.bz2
gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.zst
limits module
-rw-r--r--Cargo.lock23
-rw-r--r--Cargo.toml1
-rw-r--r--readme.md16
-rw-r--r--src/modules/limits.rs110
-rw-r--r--src/modules/mod.rs2
-rw-r--r--src/modules/paths.rs1
6 files changed, 153 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 470bd68..1767171 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -130,6 +130,28 @@ dependencies = [
]
[[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -639,6 +661,7 @@ dependencies = [
"aes-gcm-siv",
"anyhow",
"argon2",
+ "async-stream",
"base64 0.22.1",
"bytes",
"env_logger",
diff --git a/Cargo.toml b/Cargo.toml
index bfcbd30..2dad279 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,6 +36,7 @@ tokio = { version = "1.39.3", features = ["full"] }
tokio-util = { version = "0.7.11", features = ["io"] }
futures-util = "0.3.30"
futures = "0.3.30"
+async-stream = "0.3.6"
pin-project = "1.1.5"
# Config
diff --git a/readme.md b/readme.md
index 3b2b5d0..56a47b3 100644
--- a/readme.md
+++ b/readme.md
@@ -219,6 +219,8 @@ themselves; in that case the request is passed on.
- `bin`: Path to the CGI binary (string)
- `user`: User that the script is executed as. Requires to run gnix as root.
(optional string)
+ - `args`: List of arguments. (list of string)
+ - `env`: Environment variables. (map from string to string)
- **module `cache`**
- Caches requests. **This is experimental! Don't use this.**
@@ -240,6 +242,20 @@ themselves; in that case the request is passed on.
this. **TODO request size limit**
- Takes a sequence of handlers.
+- **module `limits`**
+ - Limits size and transmission rate of request and response bodies. The limit
+ is enforced on an internal data frame level and does therefore not exactly
+ reach the specified limits, but never exceeds them.
+ - `{request,response}.rate` Maximum transmission rate in bytes per second.
+ (number)
+ - `{request,response}.rate_buffer` How much transmission time can be
+ accumulated by not reading for some time in milliseconds. (number)
+ - `{request,response}.rate_buffer_filled` If the rate buffer is filled up
+ initially. (boolean)
+ - `{request,response}.size` Maximum total body size. The body is cut off
+ before the frame that exceeds this limit. Therefore the body is up to one
+ frame size smaller than allowed.
+
- **module `debug`**
- Replies with information about the request to debug. Includes source
address, HTTP version, URI and headers.
diff --git a/src/modules/limits.rs b/src/modules/limits.rs
new file mode 100644
index 0000000..7ae5fc9
--- /dev/null
+++ b/src/modules/limits.rs
@@ -0,0 +1,110 @@
+use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
+use crate::{config::DynNode, error::ServiceError};
+use anyhow::Result;
+use bytes::Bytes;
+use futures::{Future, StreamExt};
+use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
+use hyper::body::{Body, Frame};
+use serde::Deserialize;
+use std::{
+ pin::Pin,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tokio::time::sleep_until;
+
+pub struct LimitsKind;
+
+#[derive(Deserialize)]
+pub struct Limits {
+ #[serde(default)]
+ request: LimitParam,
+ #[serde(default)]
+ response: LimitParam,
+ next: DynNode,
+}
+
+#[derive(Debug, Clone, Deserialize, Default)]
+struct LimitParam {
+ size: Option<u64>,
+ rate: Option<u64>,
+ rate_buffer: Option<u64>,
+}
+
+impl NodeKind for LimitsKind {
+ fn name(&self) -> &'static str {
+ "limits"
+ }
+ fn instanciate(&self, config: serde_yml::Value) -> Result<Arc<dyn Node>> {
+ Ok(Arc::new(serde_yml::from_value::<Limits>(config)?))
+ }
+}
+
+impl Node for Limits {
+ fn handle<'a>(
+ &'a self,
+ context: &'a mut NodeContext,
+ request: NodeRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> {
+ Box::pin(async move {
+ let request = request.map(|body| limit_body(body, self.request.clone()).boxed());
+ let response = self.next.handle(context, request).await?;
+ Ok(response.map(|body| limit_body(body, self.response.clone()).boxed()))
+ })
+ }
+}
+
+fn limit_body(
+ inner: BoxBody<Bytes, ServiceError>,
+ config: LimitParam,
+) -> impl Body<Data = Bytes, Error = ServiceError> {
+ let mut stream = inner.into_data_stream();
+ let mut state = State::new(config);
+ StreamBody::new(async_stream::stream! {
+ while let Some(k) = stream.next().await {
+ match k {
+ Ok(data) => {
+ if state.update(data.len()).await {
+ yield Ok(Frame::data(data))
+ } else {
+ break;
+ }
+ },
+ Err(e) => yield Err(e)
+ }
+ }
+ })
+}
+
+struct State {
+ config: LimitParam,
+ last_read: Instant,
+ size: u64,
+}
+
+impl State {
+ pub fn new(config: LimitParam) -> Self {
+ Self {
+ config,
+ last_read: Instant::now(),
+ size: 0,
+ }
+ }
+ pub async fn update(&mut self, len: usize) -> bool {
+ if let Some(rate) = self.config.rate {
+ let buffer = Duration::from_millis(self.config.rate_buffer.unwrap_or(1000));
+ let el = self.last_read.elapsed();
+ if el > buffer {
+ self.last_read += el - buffer
+ }
+ self.last_read += Duration::from_nanos((len as u64 * 1_000_000_000) / rate);
+ sleep_until(self.last_read.into()).await
+ }
+ if let Some(max_size) = self.config.size {
+ self.size += len as u64;
+ self.size < max_size
+ } else {
+ true
+ }
+ }
+}
diff --git a/src/modules/mod.rs b/src/modules/mod.rs
index b751ba3..283c86a 100644
--- a/src/modules/mod.rs
+++ b/src/modules/mod.rs
@@ -25,6 +25,7 @@ mod proxy;
mod redirect;
mod switch;
mod upgrade_insecure;
+mod limits;
pub type NodeRequest = Request<BoxBody<Bytes, ServiceError>>;
pub type NodeResponse = Response<BoxBody<Bytes, ServiceError>>;
@@ -50,6 +51,7 @@ pub static MODULES: &[&dyn NodeKind] = &[
&upgrade_insecure::UpgradeInsecureKind,
&inspect::InspectKind,
&fallback::FallbackKind,
+ &limits::LimitsKind,
];
pub struct NodeContext {
diff --git a/src/modules/paths.rs b/src/modules/paths.rs
index 3dc7596..30e58cf 100644
--- a/src/modules/paths.rs
+++ b/src/modules/paths.rs
@@ -41,6 +41,7 @@ impl NodeKind for PathsKind {
}))
}
}
+
impl Node for Paths {
fn handle<'a>(
&'a self,