aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--readme.md22
-rw-r--r--src/error.rs3
-rw-r--r--src/modules/cgi.rs66
-rw-r--r--src/modules/mod.rs2
-rw-r--r--src/modules/ratelimit.rs176
5 files changed, 238 insertions, 31 deletions
diff --git a/readme.md b/readme.md
index 522e269..c8ed4e5 100644
--- a/readme.md
+++ b/readme.md
@@ -267,6 +267,28 @@ themselves; in that case the request is passed on.
before the frame that exceeds this limit. Therefore the body is up to one
frame size smaller than allowed.
+- **module `ratelimit`**
+ - Limits the rate at which requests can be processed. For this every identity
+ (see below) has a request counter. The counter is reset after a fixed time
+ delay.
+ - `reference_duration`: Duration in seconds after which request the counter
+ are reset.
+ - `identity`: Requests are counted per identity. Default is source address.
+ - `!global`: Use a central counter
+ - `!source_address`: Count per source ip address
+ - `!source_address_trunc`: Same but truncate them before. Requires keys `v4`
+ and `v6` which control how many trailing bits are discarded respectively.
+ - `!path`: Count per path (excluding query)
+ - `!path_query`: Count per path (including query)
+ - `max_identities`: Always rejects requests if there are already more than
+ this many identites tracked.
+ - `thresholds`: A list of `[threshold, mode]` pairs that are checked and
+ conditionally executed in order.
+ - `!too_many_requests`: Responds with a empty request with 429 status code
+ and `Retry-After` header set. Later thresholds are not checked.
+ - `!exec <path>`: Invokes a script like CGI would but expects no output.
+ - `next`: Inner handler. (module)
+
- **module `debug`**
- Replies with information about the request to debug. Includes source
address, HTTP version, method, URI and headers.
diff --git a/src/error.rs b/src/error.rs
index 1964c0d..cccfd78 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -55,6 +55,8 @@ pub enum ServiceError {
InvalidHeader,
#[error("invalid uri")]
InvalidUri,
+ #[error("too many concurrent users, please retry later")]
+ TooManyIdentities,
#[error("impossible error")]
Other,
}
@@ -88,6 +90,7 @@ impl ServiceError {
ServiceError::ParseIntError(_) => StatusCode::BAD_REQUEST,
ServiceError::InvalidHeader => StatusCode::BAD_REQUEST,
ServiceError::InvalidUri => StatusCode::BAD_REQUEST,
+ ServiceError::TooManyIdentities => StatusCode::TOO_MANY_REQUESTS,
ServiceError::Other => StatusCode::INTERNAL_SERVER_ERROR,
}
}
diff --git a/src/modules/cgi.rs b/src/modules/cgi.rs
index bd19395..121bcac 100644
--- a/src/modules/cgi.rs
+++ b/src/modules/cgi.rs
@@ -3,7 +3,7 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use super::{Node, NodeKind, NodeResponse};
+use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
use crate::error::ServiceError;
use anyhow::{anyhow, Result};
use futures::TryStreamExt;
@@ -88,36 +88,7 @@ impl Node for Cgi {
command.envs(&self.config.env);
command.args(&self.config.args);
- command.env(
- "CONTENT_LENGTH",
- request
- .headers()
- .get(CONTENT_LENGTH)
- .and_then(|x| x.to_str().ok())
- .unwrap_or_default(),
- );
- command.env(
- "CONTENT_TYPE",
- request
- .headers()
- .get(CONTENT_TYPE)
- .and_then(|x| x.to_str().ok())
- .unwrap_or_default(),
- );
- command.env("GATEWAY_INTERFACE", "CGI/1.1");
- command.env("PATH_INFO", request.uri().path());
- command.env("PATH_TRANSLATED", request.uri().path());
- command.env("QUERY_STRING", request.uri().query().unwrap_or_default());
- command.env("REMOTE_ADDR", context.addr.to_string());
- // command.env("REMOTE_HOST", ));
- // command.env("REMOTE_IDENT", ));
- // command.env("REMOTE_USER", ));
- command.env("REQUEST_METHOD", request.method().to_string());
- // command.env("SCRIPT_NAME", );
- // command.env("SERVER_NAME", );
- // command.env("SERVER_PORT", );
- command.env("SERVER_PROTOCOL", "HTTP/1.1");
- command.env("SERVER_SOFTWARE", "gnix");
+ set_cgi_variables(&mut command, &request, context);
let mut child = command.spawn()?;
let mut stdout = BufReader::new(child.stdout.take().unwrap());
@@ -165,3 +136,36 @@ impl Node for Cgi {
})
}
}
+
+pub fn set_cgi_variables(command: &mut Command, request: &NodeRequest, context: &NodeContext) {
+ command.env(
+ "CONTENT_LENGTH",
+ request
+ .headers()
+ .get(CONTENT_LENGTH)
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default(),
+ );
+ command.env(
+ "CONTENT_TYPE",
+ request
+ .headers()
+ .get(CONTENT_TYPE)
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default(),
+ );
+ command.env("GATEWAY_INTERFACE", "CGI/1.1");
+ command.env("PATH_INFO", request.uri().path());
+ command.env("PATH_TRANSLATED", request.uri().path());
+ command.env("QUERY_STRING", request.uri().query().unwrap_or_default());
+ command.env("REMOTE_ADDR", context.addr.to_string());
+ // command.env("REMOTE_HOST", ));
+ // command.env("REMOTE_IDENT", ));
+ // command.env("REMOTE_USER", ));
+ command.env("REQUEST_METHOD", request.method().to_string());
+ // command.env("SCRIPT_NAME", );
+ // command.env("SERVER_NAME", );
+ // command.env("SERVER_PORT", );
+ command.env("SERVER_PROTOCOL", "HTTP/1.1");
+ command.env("SERVER_SOFTWARE", "gnix");
+}
diff --git a/src/modules/mod.rs b/src/modules/mod.rs
index 21b0b4f..7a58807 100644
--- a/src/modules/mod.rs
+++ b/src/modules/mod.rs
@@ -29,6 +29,7 @@ mod limits;
mod loadbalance;
mod paths;
mod proxy;
+mod ratelimit;
mod redirect;
mod switch;
mod upgrade_insecure;
@@ -59,6 +60,7 @@ pub static MODULES: &[&dyn NodeKind] = &[
&fallback::FallbackKind,
&limits::LimitsKind,
&delay::DelayKind,
+ &ratelimit::RatelimitKind,
];
pub struct NodeContext {
diff --git a/src/modules/ratelimit.rs b/src/modules/ratelimit.rs
new file mode 100644
index 0000000..9e5d583
--- /dev/null
+++ b/src/modules/ratelimit.rs
@@ -0,0 +1,176 @@
+/*
+ This file is part of gnix (https://codeberg.org/metamuffin/gnix)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use super::{cgi::set_cgi_variables, Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
+use crate::{config::DynNode, error::ServiceError};
+use anyhow::Result;
+use futures::Future;
+use http::{header::RETRY_AFTER, HeaderValue, Response, StatusCode};
+use http_body_util::{combinators::BoxBody, BodyExt};
+use log::{error, warn};
+use serde::Deserialize;
+use std::{
+ collections::HashMap,
+ hash::{DefaultHasher, Hash, Hasher},
+ net::IpAddr,
+ pin::Pin,
+ process::Stdio,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tokio::{process::Command, spawn, sync::Mutex, time::sleep_until};
+
+pub struct RatelimitKind;
+
+#[derive(Deserialize)]
+pub struct RatelimitConfig {
+ next: DynNode,
+ #[serde(default)]
+ identity: IdentityMode,
+ #[serde(default = "default_max_identities")]
+ max_identities: usize,
+ reference_duration: f32,
+ thresholds: Vec<(usize, LimitMode)>,
+}
+fn default_max_identities() -> usize {
+ 1 << 16
+}
+
+#[derive(Deserialize, Default)]
+#[serde(rename_all = "snake_case")]
+enum IdentityMode {
+ Global,
+ #[default]
+ SourceAddress,
+ SourceAddressTrunc {
+ v4: u8,
+ v6: u8,
+ },
+ Path,
+ PathQuery,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "snake_case")]
+enum LimitMode {
+ TooManyRequests,
+ Exec(String),
+}
+
+pub struct Ratelimit {
+ state: Arc<Mutex<HashMap<u64, IdentityState>>>,
+ config: RatelimitConfig,
+}
+
+struct IdentityState {
+ counter: usize,
+ frame_end: Instant,
+}
+
+impl NodeKind for RatelimitKind {
+ fn name(&self) -> &'static str {
+ "ratelimit"
+ }
+ fn instanciate(&self, config: serde_yml::Value) -> Result<Arc<dyn Node>> {
+ Ok(Arc::new(Ratelimit {
+ state: Arc::new(Mutex::new(HashMap::new())),
+ config: serde_yml::from_value::<RatelimitConfig>(config)?,
+ }))
+ }
+}
+
+impl Node for Ratelimit {
+ fn handle<'a>(
+ &'a self,
+ context: &'a mut NodeContext,
+ request: NodeRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> {
+ Box::pin(async move {
+ let identity_hash = match self.config.identity {
+ IdentityMode::Global => 0,
+ IdentityMode::SourceAddress => hash(context.addr.ip()),
+ IdentityMode::SourceAddressTrunc { v4, v6 } => match context.addr.ip() {
+ IpAddr::V4(a) => hash(a.to_bits() >> v4),
+ IpAddr::V6(a) => hash(a.to_bits() >> v6),
+ },
+ IdentityMode::Path => hash(request.uri().path()),
+ IdentityMode::PathQuery => hash(request.uri().path_and_query()),
+ };
+
+ let now = Instant::now();
+
+ let (counter, frame_end) = {
+ let mut state = self.state.lock().await;
+ if state.len() > self.config.max_identities {
+ return Err(ServiceError::TooManyIdentities);
+ }
+ let istate = state.entry(identity_hash).or_insert_with(|| {
+ let frame_end = now + Duration::from_secs_f32(self.config.reference_duration);
+ let state = self.state.clone();
+ //? How efficient is this? Does it scale better to have a central task?
+ spawn(async move {
+ sleep_until(frame_end.into()).await;
+ state.lock().await.remove(&identity_hash)
+ });
+ IdentityState {
+ counter: 0,
+ frame_end,
+ }
+ });
+ istate.counter += 1;
+ (istate.counter, istate.frame_end)
+ };
+
+ for (thres, l) in &self.config.thresholds {
+ match l {
+ LimitMode::TooManyRequests => {
+ if counter > *thres {
+ let mut r = Response::new(BoxBody::new(
+ http_body_util::Empty::new().map_err(|x| match x {}),
+ ));
+ *r.status_mut() = StatusCode::TOO_MANY_REQUESTS;
+ r.headers_mut().insert(
+ RETRY_AFTER,
+ HeaderValue::from_str(&(frame_end - now).as_secs().to_string())
+ .unwrap(),
+ );
+ return Ok(r);
+ }
+ }
+ LimitMode::Exec(path) => {
+ // Exact comparison so it can only trigger once per frame
+ if counter == *thres {
+ let mut command = Command::new(&path);
+ command.stdin(Stdio::null());
+ set_cgi_variables(&mut command, &request, context);
+ spawn(async move {
+ let mut child = match command.spawn() {
+ Ok(c) => c,
+ Err(e) => return error!("exec limiter spawn failed: {e}"),
+ };
+ match child.wait().await {
+ Ok(s) if s.success() => (),
+ Ok(s) => warn!(
+ "exec limiter failed with code {}",
+ s.code().unwrap_or_default()
+ ),
+ Err(e) => warn!("exec limiter failed: {e}"),
+ }
+ });
+ }
+ }
+ }
+ }
+
+ self.config.next.handle(context, request).await
+ })
+ }
+}
+
+fn hash(value: impl Hash) -> u64 {
+ let mut h = DefaultHasher::default();
+ value.hash(&mut h);
+ h.finish()
+}