aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-03-18 23:16:42 +0100
committermetamuffin <metamuffin@disroot.org>2025-03-18 23:16:42 +0100
commite21267342c74927a1f9255b4ea02fac32961031a (patch)
treeba60b12f93bbe768d40b50729ad42353af804834 /src
parent412b0b810e9e1fd8feac9edea9034121739b4f8f (diff)
downloadgnix-e21267342c74927a1f9255b4ea02fac32961031a.tar
gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.bz2
gnix-e21267342c74927a1f9255b4ea02fac32961031a.tar.zst
limits module
Diffstat (limited to 'src')
-rw-r--r--src/modules/limits.rs110
-rw-r--r--src/modules/mod.rs2
-rw-r--r--src/modules/paths.rs1
3 files changed, 113 insertions, 0 deletions
diff --git a/src/modules/limits.rs b/src/modules/limits.rs
new file mode 100644
index 0000000..7ae5fc9
--- /dev/null
+++ b/src/modules/limits.rs
@@ -0,0 +1,110 @@
+use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
+use crate::{config::DynNode, error::ServiceError};
+use anyhow::Result;
+use bytes::Bytes;
+use futures::{Future, StreamExt};
+use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
+use hyper::body::{Body, Frame};
+use serde::Deserialize;
+use std::{
+ pin::Pin,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tokio::time::sleep_until;
+
+pub struct LimitsKind;
+
+#[derive(Deserialize)]
+pub struct Limits {
+ #[serde(default)]
+ request: LimitParam,
+ #[serde(default)]
+ response: LimitParam,
+ next: DynNode,
+}
+
+#[derive(Debug, Clone, Deserialize, Default)]
+struct LimitParam {
+ size: Option<u64>,
+ rate: Option<u64>,
+ rate_buffer: Option<u64>,
+}
+
+impl NodeKind for LimitsKind {
+ fn name(&self) -> &'static str {
+ "limits"
+ }
+ fn instanciate(&self, config: serde_yml::Value) -> Result<Arc<dyn Node>> {
+ Ok(Arc::new(serde_yml::from_value::<Limits>(config)?))
+ }
+}
+
+impl Node for Limits {
+ fn handle<'a>(
+ &'a self,
+ context: &'a mut NodeContext,
+ request: NodeRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> {
+ Box::pin(async move {
+ let request = request.map(|body| limit_body(body, self.request.clone()).boxed());
+ let response = self.next.handle(context, request).await?;
+ Ok(response.map(|body| limit_body(body, self.response.clone()).boxed()))
+ })
+ }
+}
+
+fn limit_body(
+ inner: BoxBody<Bytes, ServiceError>,
+ config: LimitParam,
+) -> impl Body<Data = Bytes, Error = ServiceError> {
+ let mut stream = inner.into_data_stream();
+ let mut state = State::new(config);
+ StreamBody::new(async_stream::stream! {
+ while let Some(k) = stream.next().await {
+ match k {
+ Ok(data) => {
+ if state.update(data.len()).await {
+ yield Ok(Frame::data(data))
+ } else {
+ break;
+ }
+ },
+ Err(e) => yield Err(e)
+ }
+ }
+ })
+}
+
+struct State {
+ config: LimitParam,
+ last_read: Instant,
+ size: u64,
+}
+
+impl State {
+ pub fn new(config: LimitParam) -> Self {
+ Self {
+ config,
+ last_read: Instant::now(),
+ size: 0,
+ }
+ }
+ pub async fn update(&mut self, len: usize) -> bool {
+ if let Some(rate) = self.config.rate {
+ let buffer = Duration::from_millis(self.config.rate_buffer.unwrap_or(1000));
+ let el = self.last_read.elapsed();
+ if el > buffer {
+ self.last_read += el - buffer
+ }
+ self.last_read += Duration::from_nanos((len as u64 * 1_000_000_000) / rate);
+ sleep_until(self.last_read.into()).await
+ }
+ if let Some(max_size) = self.config.size {
+ self.size += len as u64;
+ self.size < max_size
+ } else {
+ true
+ }
+ }
+}
diff --git a/src/modules/mod.rs b/src/modules/mod.rs
index b751ba3..283c86a 100644
--- a/src/modules/mod.rs
+++ b/src/modules/mod.rs
@@ -25,6 +25,7 @@ mod proxy;
mod redirect;
mod switch;
mod upgrade_insecure;
+mod limits;
pub type NodeRequest = Request<BoxBody<Bytes, ServiceError>>;
pub type NodeResponse = Response<BoxBody<Bytes, ServiceError>>;
@@ -50,6 +51,7 @@ pub static MODULES: &[&dyn NodeKind] = &[
&upgrade_insecure::UpgradeInsecureKind,
&inspect::InspectKind,
&fallback::FallbackKind,
+ &limits::LimitsKind,
];
pub struct NodeContext {
diff --git a/src/modules/paths.rs b/src/modules/paths.rs
index 3dc7596..30e58cf 100644
--- a/src/modules/paths.rs
+++ b/src/modules/paths.rs
@@ -41,6 +41,7 @@ impl NodeKind for PathsKind {
}))
}
}
+
impl Node for Paths {
fn handle<'a>(
&'a self,