diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-02 17:47:15 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-02 17:47:17 +0200 |
commit | 631f621e31d209daf5cfe3a286d2aed5efb33cdc (patch) | |
tree | 23dac0689dd0d28671e822cfe563ef276cc9ba0d | |
parent | 8f6f8004361a2f597d16baf4d4e2d32ff477cd75 (diff) | |
download | gnix-631f621e31d209daf5cfe3a286d2aed5efb33cdc.tar gnix-631f621e31d209daf5cfe3a286d2aed5efb33cdc.tar.bz2 gnix-631f621e31d209daf5cfe3a286d2aed5efb33cdc.tar.zst |
add a new (better) logging module
-rw-r--r-- | src/main.rs | 3 | ||||
-rw-r--r-- | src/modules/log.rs | 261 | ||||
-rw-r--r-- | src/modules/mod.rs | 2 |
3 files changed, 265 insertions, 1 deletions
diff --git a/src/main.rs b/src/main.rs index 9f395e2..f40f716 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,8 @@ slice_split_once, iterator_try_collect, path_add_extension, - never_type + never_type, + string_from_utf8_lossy_owned )] pub mod certs; diff --git a/src/modules/log.rs b/src/modules/log.rs new file mode 100644 index 0000000..6122624 --- /dev/null +++ b/src/modules/log.rs @@ -0,0 +1,261 @@ +/* + This file is part of gnix (https://codeberg.org/metamuffin/gnix) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{config::DynNode, error::ServiceError}; +use futures::Future; +use http::{ + header::{HOST, USER_AGENT}, + Version, +}; +use log::info; +use serde::{Deserialize, Serialize}; +use std::{ + fmt::Write as _, + io::{Cursor, Write as _}, + net::SocketAddr, + path::PathBuf, + pin::Pin, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; +use tokio::{ + fs::{rename, File, OpenOptions}, + io::{AsyncWriteExt, BufWriter}, + sync::RwLock, +}; + +pub struct LogKind; + +#[derive(Deserialize)] +struct LogConfig { + output_temp: PathBuf, + output_done: PathBuf, + file_duration: u64, + columns: Vec<Column>, + #[serde(default)] + format: Format, + #[serde(default)] + flush: bool, + next: DynNode, +} + +struct Log { + config: LogConfig, + file: RwLock<Option<(BufWriter<File>, Instant)>>, +} +#[derive(Deserialize, Default)] +#[serde(rename_all = "snake_case")] +enum Format { + #[default] + Tsv, + Json, +} +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +enum Column { + Time, + Address, + Method, + Host, + Path, + PathQuery, + UserAgent, + HttpVersion, + Secure, +} + +impl NodeKind for LogKind { + fn name(&self) -> &'static str { + "log" + } + fn instanciate(&self, config: serde_yml::Value) -> anyhow::Result<Arc<dyn Node>> { + Ok(Arc::new(Log { + config: serde_yml::from_value::<LogConfig>(config)?, + file: Default::default(), + })) + } +} + +impl Node for Log { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + let line = match self.config.format { + Format::Tsv => { + let mut out = String::new(); + for col in &self.config.columns { + match col { + Column::Time => write!( + out, + "{}\t", + SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos() + ), + Column::Address => write!(out, "{}\t", context.addr), + Column::Method => write!(out, "{}\t", request.method().as_str()), + Column::Host => write!( + out, + "{}\t", + request + .headers() + .get(HOST) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + ), + Column::Path => write!(out, "{}\t", request.uri().path()), + Column::PathQuery => { + write!( + out, + "{}\t", + request + .uri() + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or_default() + ) + } + Column::UserAgent => write!( + out, + "{}\t", + request + .headers() + .get(USER_AGENT) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + ), + Column::HttpVersion => { + write!(out, "{:?}\t", request.version()) + } + Column::Secure => write!(out, "{}\t", context.secure), + } + .unwrap(); + } + write!(out, "\n").unwrap(); + out + } + Format::Json => { + #[derive(Serialize, Default)] + struct Entry { + #[serde(skip_serializing_if = "Option::is_none")] + time: Option<u128>, + #[serde(skip_serializing_if = "Option::is_none")] + method: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + address: Option<SocketAddr>, + #[serde(skip_serializing_if = "Option::is_none")] + user_agent: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + path: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + path_query: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + host: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + http_version: Option<u8>, + #[serde(skip_serializing_if = "Option::is_none")] + secure: Option<bool>, + } + let mut entry = Entry::default(); + for col in &self.config.columns { + match col { + Column::Time => { + entry.time = + Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos()) + } + Column::Address => entry.address = Some(context.addr), + Column::Method => { + entry.method = Some(request.method().as_str().to_string()) + } + Column::Host => { + entry.host = request + .headers() + .get(HOST) + .and_then(|x| x.to_str().ok()) + .map(|s| s.to_string()) + } + Column::Path => entry.path = Some(request.uri().path().to_string()), + Column::PathQuery => { + entry.path_query = request + .uri() + .path_and_query() + .map(|pq| pq.as_str().to_string()) + } + Column::UserAgent => { + entry.user_agent = request + .headers() + .get(USER_AGENT) + .and_then(|x| x.to_str().ok()) + .map(|s| s.to_string()) + } + Column::HttpVersion => { + entry.http_version = Some({ + let v = request.version(); + if v == Version::HTTP_3 { + 3 + } else if v == Version::HTTP_2 { + 2 + } else { + 1 + } + }) + } + Column::Secure => entry.secure = Some(context.secure), + } + } + + let mut out = Cursor::new(Vec::new()); + serde_json::to_writer(&mut out, &entry).unwrap(); + write!(out, "\n").unwrap(); + String::from_utf8_lossy_owned(out.into_inner()) + } + }; + + let mut g = self.file.write().await; + if let Some((log, otime)) = g.as_mut() { + if otime.elapsed() > Duration::from_secs(self.config.file_duration) { + info!("closing log file (age)"); + log.flush().await?; + *g = None; + } + } + if g.is_none() { + if self.config.output_temp.exists() { + info!("moving log file"); + rename( + &self.config.output_temp, + self.config.output_done.join( + SystemTime::UNIX_EPOCH + .elapsed() + .unwrap() + .as_secs() + .to_string(), + ), + ) + .await?; + } + info!("opening new log file"); + *g = Some(( + BufWriter::new( + OpenOptions::new() + .append(true) + .create(true) + .open(&self.config.output_temp) + .await?, + ), + Instant::now(), + )); + } + let log = &mut g.as_mut().unwrap().0; + log.write_all(line.as_bytes()).await?; + if self.config.flush { + log.flush().await?; + } + self.config.next.handle(context, request).await + }) + } +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 7a58807..65e9666 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -27,6 +27,7 @@ mod hosts; mod inspect; mod limits; mod loadbalance; +mod log; mod paths; mod proxy; mod ratelimit; @@ -61,6 +62,7 @@ pub static MODULES: &[&dyn NodeKind] = &[ &limits::LimitsKind, &delay::DelayKind, &ratelimit::RatelimitKind, + &log::LogKind, ]; pub struct NodeContext { |