diff options
Diffstat (limited to 'src/filters/accesslog.rs')
-rw-r--r-- | src/filters/accesslog.rs | 106 |
1 files changed, 69 insertions, 37 deletions
diff --git a/src/filters/accesslog.rs b/src/filters/accesslog.rs index 9a33762..1da6e5d 100644 --- a/src/filters/accesslog.rs +++ b/src/filters/accesslog.rs @@ -1,49 +1,81 @@ -use crate::{config::AccessLogConfig, error::ServiceError, FilterRequest, State}; -use futures::executor::block_on; +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use futures::Future; use log::error; -use std::{net::SocketAddr, ops::ControlFlow, time::SystemTime}; +use serde::Deserialize; +use std::{path::PathBuf, pin::Pin, sync::Arc, time::SystemTime}; use tokio::{ - fs::OpenOptions, + fs::{File, OpenOptions}, io::{AsyncWriteExt, BufWriter}, + sync::RwLock, }; -pub async fn access_log( - state: &State, - host: &str, - addr: SocketAddr, - config: &AccessLogConfig, - req: &FilterRequest, -) -> Result<ControlFlow<()>, ServiceError> { - let mut g = state.access_logs.write().await; +pub struct AccessLogKind; - let log = g.entry(host.to_owned()).or_insert_with(|| { - BufWriter::new( - // TODO aaahh dont block the runtime and dont panic in any case.... - block_on( - OpenOptions::new() - .append(true) - .create(true) - .open(&config.file), - ) - .unwrap(), - ) - }); +#[derive(Deserialize)] +struct AccessLogConfig { + file: PathBuf, + #[serde(default)] + flush: bool, + #[serde(default)] + reject_on_fail: bool, + next: DynNode, +} - let method = req.method().as_str(); - let time = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros(); - let mut res = log - .write_all(format!("{time}\t{addr}\t{method}\t{:?}\n", req.uri()).as_bytes()) - .await; +struct AccessLog { + config: AccessLogConfig, + file: RwLock<Option<BufWriter<File>>>, +} - if config.flush && res.is_ok() { - res = log.flush().await; +impl NodeKind for AccessLogKind { + fn name(&self) -> &'static str { + "access_log" } - - if config.reject_on_fail { - res? - } else if let Err(e) = res { - error!("failed to write log: {e:?}") + fn instanciate(&self, config: serde_yaml::Value) -> anyhow::Result<Arc<dyn Node>> { + Ok(Arc::new(AccessLog { + config: serde_yaml::from_value::<AccessLogConfig>(config)?, + file: Default::default(), + })) } +} + +impl Node for AccessLog { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let mut g = self.file.write().await; + let log = match g.as_mut() { + Some(r) => r, + None => g.insert(BufWriter::new( + OpenOptions::new() + .append(true) + .create(true) + .open(&self.config.file) + .await?, + )), + }; - Ok(ControlFlow::Continue(())) + let method = request.method().as_str(); + let time = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros(); + let addr = context.addr; + let mut res = log + .write_all(format!("{time}\t{addr}\t{method}\t{:?}\n", request.uri()).as_bytes()) + .await; + + if self.config.flush && res.is_ok() { + res = log.flush().await; + } + + if self.config.reject_on_fail { + res? + } else if let Err(e) = res { + error!("failed to write log: {e:?}") + } + + self.config.next.handle(context, request).await + }) + } } |