diff options
-rw-r--r-- | readme.md | 22 | ||||
-rw-r--r-- | src/error.rs | 3 | ||||
-rw-r--r-- | src/modules/cgi.rs | 66 | ||||
-rw-r--r-- | src/modules/mod.rs | 2 | ||||
-rw-r--r-- | src/modules/ratelimit.rs | 176 |
5 files changed, 238 insertions, 31 deletions
@@ -267,6 +267,28 @@ themselves; in that case the request is passed on. before the frame that exceeds this limit. Therefore the body is up to one frame size smaller than allowed. +- **module `ratelimit`** + - Limits the rate at which requests can be processed. For this every identity + (see below) has a request counter. The counter is reset after a fixed time + delay. + - `reference_duration`: Duration in seconds after which request the counter + are reset. + - `identity`: Requests are counted per identity. Default is source address. + - `!global`: Use a central counter + - `!source_address`: Count per source ip address + - `!source_address_trunc`: Same but truncate them before. Requires keys `v4` + and `v6` which control how many trailing bits are discarded respectively. + - `!path`: Count per path (excluding query) + - `!path_query`: Count per path (including query) + - `max_identities`: Always rejects requests if there are already more than + this many identites tracked. + - `thresholds`: A list of `[threshold, mode]` pairs that are checked and + conditionally executed in order. + - `!too_many_requests`: Responds with a empty request with 429 status code + and `Retry-After` header set. Later thresholds are not checked. + - `!exec <path>`: Invokes a script like CGI would but expects no output. + - `next`: Inner handler. (module) + - **module `debug`** - Replies with information about the request to debug. Includes source address, HTTP version, method, URI and headers. diff --git a/src/error.rs b/src/error.rs index 1964c0d..cccfd78 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,6 +55,8 @@ pub enum ServiceError { InvalidHeader, #[error("invalid uri")] InvalidUri, + #[error("too many concurrent users, please retry later")] + TooManyIdentities, #[error("impossible error")] Other, } @@ -88,6 +90,7 @@ impl ServiceError { ServiceError::ParseIntError(_) => StatusCode::BAD_REQUEST, ServiceError::InvalidHeader => StatusCode::BAD_REQUEST, ServiceError::InvalidUri => StatusCode::BAD_REQUEST, + ServiceError::TooManyIdentities => StatusCode::TOO_MANY_REQUESTS, ServiceError::Other => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/modules/cgi.rs b/src/modules/cgi.rs index bd19395..121bcac 100644 --- a/src/modules/cgi.rs +++ b/src/modules/cgi.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use super::{Node, NodeKind, NodeResponse}; +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; use crate::error::ServiceError; use anyhow::{anyhow, Result}; use futures::TryStreamExt; @@ -88,36 +88,7 @@ impl Node for Cgi { command.envs(&self.config.env); command.args(&self.config.args); - command.env( - "CONTENT_LENGTH", - request - .headers() - .get(CONTENT_LENGTH) - .and_then(|x| x.to_str().ok()) - .unwrap_or_default(), - ); - command.env( - "CONTENT_TYPE", - request - .headers() - .get(CONTENT_TYPE) - .and_then(|x| x.to_str().ok()) - .unwrap_or_default(), - ); - command.env("GATEWAY_INTERFACE", "CGI/1.1"); - command.env("PATH_INFO", request.uri().path()); - command.env("PATH_TRANSLATED", request.uri().path()); - command.env("QUERY_STRING", request.uri().query().unwrap_or_default()); - command.env("REMOTE_ADDR", context.addr.to_string()); - // command.env("REMOTE_HOST", )); - // command.env("REMOTE_IDENT", )); - // command.env("REMOTE_USER", )); - command.env("REQUEST_METHOD", request.method().to_string()); - // command.env("SCRIPT_NAME", ); - // command.env("SERVER_NAME", ); - // command.env("SERVER_PORT", ); - command.env("SERVER_PROTOCOL", "HTTP/1.1"); - command.env("SERVER_SOFTWARE", "gnix"); + set_cgi_variables(&mut command, &request, context); let mut child = command.spawn()?; let mut stdout = BufReader::new(child.stdout.take().unwrap()); @@ -165,3 +136,36 @@ impl Node for Cgi { }) } } + +pub fn set_cgi_variables(command: &mut Command, request: &NodeRequest, context: &NodeContext) { + command.env( + "CONTENT_LENGTH", + request + .headers() + .get(CONTENT_LENGTH) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default(), + ); + command.env( + "CONTENT_TYPE", + request + .headers() + .get(CONTENT_TYPE) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default(), + ); + command.env("GATEWAY_INTERFACE", "CGI/1.1"); + command.env("PATH_INFO", request.uri().path()); + command.env("PATH_TRANSLATED", request.uri().path()); + command.env("QUERY_STRING", request.uri().query().unwrap_or_default()); + command.env("REMOTE_ADDR", context.addr.to_string()); + // command.env("REMOTE_HOST", )); + // command.env("REMOTE_IDENT", )); + // command.env("REMOTE_USER", )); + command.env("REQUEST_METHOD", request.method().to_string()); + // command.env("SCRIPT_NAME", ); + // command.env("SERVER_NAME", ); + // command.env("SERVER_PORT", ); + command.env("SERVER_PROTOCOL", "HTTP/1.1"); + command.env("SERVER_SOFTWARE", "gnix"); +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 21b0b4f..7a58807 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -29,6 +29,7 @@ mod limits; mod loadbalance; mod paths; mod proxy; +mod ratelimit; mod redirect; mod switch; mod upgrade_insecure; @@ -59,6 +60,7 @@ pub static MODULES: &[&dyn NodeKind] = &[ &fallback::FallbackKind, &limits::LimitsKind, &delay::DelayKind, + &ratelimit::RatelimitKind, ]; pub struct NodeContext { diff --git a/src/modules/ratelimit.rs b/src/modules/ratelimit.rs new file mode 100644 index 0000000..9e5d583 --- /dev/null +++ b/src/modules/ratelimit.rs @@ -0,0 +1,176 @@ +/* + This file is part of gnix (https://codeberg.org/metamuffin/gnix) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use super::{cgi::set_cgi_variables, Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use anyhow::Result; +use futures::Future; +use http::{header::RETRY_AFTER, HeaderValue, Response, StatusCode}; +use http_body_util::{combinators::BoxBody, BodyExt}; +use log::{error, warn}; +use serde::Deserialize; +use std::{ + collections::HashMap, + hash::{DefaultHasher, Hash, Hasher}, + net::IpAddr, + pin::Pin, + process::Stdio, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{process::Command, spawn, sync::Mutex, time::sleep_until}; + +pub struct RatelimitKind; + +#[derive(Deserialize)] +pub struct RatelimitConfig { + next: DynNode, + #[serde(default)] + identity: IdentityMode, + #[serde(default = "default_max_identities")] + max_identities: usize, + reference_duration: f32, + thresholds: Vec<(usize, LimitMode)>, +} +fn default_max_identities() -> usize { + 1 << 16 +} + +#[derive(Deserialize, Default)] +#[serde(rename_all = "snake_case")] +enum IdentityMode { + Global, + #[default] + SourceAddress, + SourceAddressTrunc { + v4: u8, + v6: u8, + }, + Path, + PathQuery, +} + +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +enum LimitMode { + TooManyRequests, + Exec(String), +} + +pub struct Ratelimit { + state: Arc<Mutex<HashMap<u64, IdentityState>>>, + config: RatelimitConfig, +} + +struct IdentityState { + counter: usize, + frame_end: Instant, +} + +impl NodeKind for RatelimitKind { + fn name(&self) -> &'static str { + "ratelimit" + } + fn instanciate(&self, config: serde_yml::Value) -> Result<Arc<dyn Node>> { + Ok(Arc::new(Ratelimit { + state: Arc::new(Mutex::new(HashMap::new())), + config: serde_yml::from_value::<RatelimitConfig>(config)?, + })) + } +} + +impl Node for Ratelimit { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let identity_hash = match self.config.identity { + IdentityMode::Global => 0, + IdentityMode::SourceAddress => hash(context.addr.ip()), + IdentityMode::SourceAddressTrunc { v4, v6 } => match context.addr.ip() { + IpAddr::V4(a) => hash(a.to_bits() >> v4), + IpAddr::V6(a) => hash(a.to_bits() >> v6), + }, + IdentityMode::Path => hash(request.uri().path()), + IdentityMode::PathQuery => hash(request.uri().path_and_query()), + }; + + let now = Instant::now(); + + let (counter, frame_end) = { + let mut state = self.state.lock().await; + if state.len() > self.config.max_identities { + return Err(ServiceError::TooManyIdentities); + } + let istate = state.entry(identity_hash).or_insert_with(|| { + let frame_end = now + Duration::from_secs_f32(self.config.reference_duration); + let state = self.state.clone(); + //? How efficient is this? Does it scale better to have a central task? + spawn(async move { + sleep_until(frame_end.into()).await; + state.lock().await.remove(&identity_hash) + }); + IdentityState { + counter: 0, + frame_end, + } + }); + istate.counter += 1; + (istate.counter, istate.frame_end) + }; + + for (thres, l) in &self.config.thresholds { + match l { + LimitMode::TooManyRequests => { + if counter > *thres { + let mut r = Response::new(BoxBody::new( + http_body_util::Empty::new().map_err(|x| match x {}), + )); + *r.status_mut() = StatusCode::TOO_MANY_REQUESTS; + r.headers_mut().insert( + RETRY_AFTER, + HeaderValue::from_str(&(frame_end - now).as_secs().to_string()) + .unwrap(), + ); + return Ok(r); + } + } + LimitMode::Exec(path) => { + // Exact comparison so it can only trigger once per frame + if counter == *thres { + let mut command = Command::new(&path); + command.stdin(Stdio::null()); + set_cgi_variables(&mut command, &request, context); + spawn(async move { + let mut child = match command.spawn() { + Ok(c) => c, + Err(e) => return error!("exec limiter spawn failed: {e}"), + }; + match child.wait().await { + Ok(s) if s.success() => (), + Ok(s) => warn!( + "exec limiter failed with code {}", + s.code().unwrap_or_default() + ), + Err(e) => warn!("exec limiter failed: {e}"), + } + }); + } + } + } + } + + self.config.next.handle(context, request).await + }) + } +} + +fn hash(value: impl Hash) -> u64 { + let mut h = DefaultHasher::default(); + value.hash(&mut h); + h.finish() +} |