From 532cc431d1c5ca1ffcf429a4ccb94edc7848fe7a Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 30 May 2024 00:09:11 +0200 Subject: rename filters dir --- src/modules/accesslog.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/modules/accesslog.rs (limited to 'src/modules/accesslog.rs') diff --git a/src/modules/accesslog.rs b/src/modules/accesslog.rs new file mode 100644 index 0000000..1da6e5d --- /dev/null +++ b/src/modules/accesslog.rs @@ -0,0 +1,81 @@ +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use futures::Future; +use log::error; +use serde::Deserialize; +use std::{path::PathBuf, pin::Pin, sync::Arc, time::SystemTime}; +use tokio::{ + fs::{File, OpenOptions}, + io::{AsyncWriteExt, BufWriter}, + sync::RwLock, +}; + +pub struct AccessLogKind; + +#[derive(Deserialize)] +struct AccessLogConfig { + file: PathBuf, + #[serde(default)] + flush: bool, + #[serde(default)] + reject_on_fail: bool, + next: DynNode, +} + +struct AccessLog { + config: AccessLogConfig, + file: RwLock>>, +} + +impl NodeKind for AccessLogKind { + fn name(&self) -> &'static str { + "access_log" + } + fn instanciate(&self, config: serde_yaml::Value) -> anyhow::Result> { + Ok(Arc::new(AccessLog { + config: serde_yaml::from_value::(config)?, + file: Default::default(), + })) + } +} + +impl Node for AccessLog { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin> + Send + Sync + 'a>> { + Box::pin(async move { + let mut g = self.file.write().await; + let log = match g.as_mut() { + Some(r) => r, + None => g.insert(BufWriter::new( + OpenOptions::new() + .append(true) + .create(true) + .open(&self.config.file) + .await?, + )), + }; + + let method = request.method().as_str(); + let time = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros(); + let addr = context.addr; + let mut res = log + .write_all(format!("{time}\t{addr}\t{method}\t{:?}\n", request.uri()).as_bytes()) + .await; + + if self.config.flush && res.is_ok() { + res = log.flush().await; + } + + if self.config.reject_on_fail { + res? + } else if let Err(e) = res { + error!("failed to write log: {e:?}") + } + + self.config.next.handle(context, request).await + }) + } +} -- cgit v1.2.3-70-g09d2