From 886a18e0c67624d0882f04c7f6659bcfee6b4d8d Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 29 May 2024 16:37:44 +0200 Subject: refactor filter system --- src/filters/accesslog.rs | 112 ++++++++++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 40 deletions(-) (limited to 'src/filters/accesslog.rs') diff --git a/src/filters/accesslog.rs b/src/filters/accesslog.rs index 9a33762..1da6e5d 100644 --- a/src/filters/accesslog.rs +++ b/src/filters/accesslog.rs @@ -1,49 +1,81 @@ -use crate::{config::AccessLogConfig, error::ServiceError, FilterRequest, State}; -use futures::executor::block_on; +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use futures::Future; use log::error; -use std::{net::SocketAddr, ops::ControlFlow, time::SystemTime}; +use serde::Deserialize; +use std::{path::PathBuf, pin::Pin, sync::Arc, time::SystemTime}; use tokio::{ - fs::OpenOptions, + fs::{File, OpenOptions}, io::{AsyncWriteExt, BufWriter}, + sync::RwLock, }; -pub async fn access_log( - state: &State, - host: &str, - addr: SocketAddr, - config: &AccessLogConfig, - req: &FilterRequest, -) -> Result, ServiceError> { - let mut g = state.access_logs.write().await; - - let log = g.entry(host.to_owned()).or_insert_with(|| { - BufWriter::new( - // TODO aaahh dont block the runtime and dont panic in any case.... - block_on( - OpenOptions::new() - .append(true) - .create(true) - .open(&config.file), - ) - .unwrap(), - ) - }); - - let method = req.method().as_str(); - let time = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros(); - let mut res = log - .write_all(format!("{time}\t{addr}\t{method}\t{:?}\n", req.uri()).as_bytes()) - .await; - - if config.flush && res.is_ok() { - res = log.flush().await; - } +pub struct AccessLogKind; + +#[derive(Deserialize)] +struct AccessLogConfig { + file: PathBuf, + #[serde(default)] + flush: bool, + #[serde(default)] + reject_on_fail: bool, + next: DynNode, +} - if config.reject_on_fail { - res? - } else if let Err(e) = res { - error!("failed to write log: {e:?}") +struct AccessLog { + config: AccessLogConfig, + file: RwLock>>, +} + +impl NodeKind for AccessLogKind { + fn name(&self) -> &'static str { + "access_log" + } + fn instanciate(&self, config: serde_yaml::Value) -> anyhow::Result> { + Ok(Arc::new(AccessLog { + config: serde_yaml::from_value::(config)?, + file: Default::default(), + })) } +} + +impl Node for AccessLog { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin> + Send + Sync + 'a>> { + Box::pin(async move { + let mut g = self.file.write().await; + let log = match g.as_mut() { + Some(r) => r, + None => g.insert(BufWriter::new( + OpenOptions::new() + .append(true) + .create(true) + .open(&self.config.file) + .await?, + )), + }; + + let method = request.method().as_str(); + let time = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros(); + let addr = context.addr; + let mut res = log + .write_all(format!("{time}\t{addr}\t{method}\t{:?}\n", request.uri()).as_bytes()) + .await; - Ok(ControlFlow::Continue(())) + if self.config.flush && res.is_ok() { + res = log.flush().await; + } + + if self.config.reject_on_fail { + res? + } else if let Err(e) = res { + error!("failed to write log: {e:?}") + } + + self.config.next.handle(context, request).await + }) + } } -- cgit v1.2.3-70-g09d2