diff options
Diffstat (limited to 'src/modules/semaphore.rs')
-rw-r--r-- | src/modules/semaphore.rs | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/src/modules/semaphore.rs b/src/modules/semaphore.rs new file mode 100644 index 0000000..b4f50b2 --- /dev/null +++ b/src/modules/semaphore.rs @@ -0,0 +1,56 @@ +/* + This file is part of gnix (https://codeberg.org/metamuffin/gnix) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use anyhow::Result; +use serde::Deserialize; +use serde_yml::Value; +use std::{ + future::Future, + pin::Pin, + sync::Arc, +}; +use tokio::sync::Semaphore; + +pub struct SemaphoreKind; + +#[derive(Deserialize)] +struct SemaphoreConfig { + permits: usize, + next: DynNode, +} + +struct SemaphoreState { + config: SemaphoreConfig, + semaphore: Semaphore, +} + +impl NodeKind for SemaphoreKind { + fn name(&self) -> &'static str { + "semaphore" + } + fn instanciate(&self, config: Value) -> Result<Arc<dyn Node>> { + let config = serde_yml::from_value::<SemaphoreConfig>(config)?; + Ok(Arc::new(SemaphoreState { + semaphore: Semaphore::new(config.permits), + config, + })) + } +} +impl Node for SemaphoreState { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let _permit = self.semaphore.acquire().await; + let resp = self.config.next.handle(context, request).await?; + drop(_permit); + Ok(resp) + }) + } +} |