aboutsummaryrefslogtreecommitdiff
path: root/src/modules/semaphore.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/semaphore.rs')
-rw-r--r--src/modules/semaphore.rs56
1 files changed, 56 insertions, 0 deletions
diff --git a/src/modules/semaphore.rs b/src/modules/semaphore.rs
new file mode 100644
index 0000000..b4f50b2
--- /dev/null
+++ b/src/modules/semaphore.rs
@@ -0,0 +1,56 @@
+/*
+ This file is part of gnix (https://codeberg.org/metamuffin/gnix)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
+use crate::{config::DynNode, error::ServiceError};
+use anyhow::Result;
+use serde::Deserialize;
+use serde_yml::Value;
+use std::{
+ future::Future,
+ pin::Pin,
+ sync::Arc,
+};
+use tokio::sync::Semaphore;
+
+pub struct SemaphoreKind;
+
+#[derive(Deserialize)]
+struct SemaphoreConfig {
+ permits: usize,
+ next: DynNode,
+}
+
+struct SemaphoreState {
+ config: SemaphoreConfig,
+ semaphore: Semaphore,
+}
+
+impl NodeKind for SemaphoreKind {
+ fn name(&self) -> &'static str {
+ "semaphore"
+ }
+ fn instanciate(&self, config: Value) -> Result<Arc<dyn Node>> {
+ let config = serde_yml::from_value::<SemaphoreConfig>(config)?;
+ Ok(Arc::new(SemaphoreState {
+ semaphore: Semaphore::new(config.permits),
+ config,
+ }))
+ }
+}
+impl Node for SemaphoreState {
+ fn handle<'a>(
+ &'a self,
+ context: &'a mut NodeContext,
+ request: NodeRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> {
+ Box::pin(async move {
+ let _permit = self.semaphore.acquire().await;
+ let resp = self.config.next.handle(context, request).await?;
+ drop(_permit);
+ Ok(resp)
+ })
+ }
+}