/* This file is part of gnix (https://codeberg.org/metamuffin/gnix) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; use crate::{config::DynNode, error::ServiceError}; use futures::Future; use http::{ header::{HOST, USER_AGENT}, Version, }; use log::info; use serde::{Deserialize, Serialize}; use std::{ fmt::Write as _, io::{Cursor, Write as _}, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc, time::{Duration, Instant, SystemTime}, }; use tokio::{ fs::{rename, File, OpenOptions}, io::{AsyncWriteExt, BufWriter}, sync::RwLock, }; pub struct LogKind; #[derive(Deserialize)] struct LogConfig { output_temp: PathBuf, output_done: PathBuf, file_duration: u64, columns: Vec, #[serde(default)] format: Format, #[serde(default)] flush: bool, next: DynNode, } struct Log { config: LogConfig, file: RwLock, Instant)>>, } #[derive(Deserialize, Default)] #[serde(rename_all = "snake_case")] enum Format { #[default] Tsv, Json, } #[derive(Deserialize)] #[serde(rename_all = "snake_case")] enum Column { Time, Address, Method, Host, Path, PathQuery, UserAgent, HttpVersion, Secure, } impl NodeKind for LogKind { fn name(&self) -> &'static str { "log" } fn instanciate(&self, config: serde_yml::Value) -> anyhow::Result> { Ok(Arc::new(Log { config: serde_yml::from_value::(config)?, file: Default::default(), })) } } impl Node for Log { fn handle<'a>( &'a self, context: &'a mut NodeContext, request: NodeRequest, ) -> Pin> + Send + Sync + 'a>> { Box::pin(async move { let line = match self.config.format { Format::Tsv => { let mut out = String::new(); for col in &self.config.columns { match col { Column::Time => write!( out, "{}\t", SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos() ), Column::Address => write!(out, "{}\t", context.addr), Column::Method => write!(out, "{}\t", request.method().as_str()), Column::Host => write!( out, "{}\t", request .headers() .get(HOST) .and_then(|x| x.to_str().ok()) .unwrap_or_default() ), Column::Path => write!(out, "{}\t", request.uri().path()), Column::PathQuery => { write!( out, "{}\t", request .uri() .path_and_query() .map(|pq| pq.as_str()) .unwrap_or_default() ) } Column::UserAgent => write!( out, "{}\t", request .headers() .get(USER_AGENT) .and_then(|x| x.to_str().ok()) .unwrap_or_default() ), Column::HttpVersion => { write!(out, "{:?}\t", request.version()) } Column::Secure => write!(out, "{}\t", context.secure), } .unwrap(); } write!(out, "\n").unwrap(); out } Format::Json => { #[derive(Serialize, Default)] struct Entry { #[serde(skip_serializing_if = "Option::is_none")] time: Option, #[serde(skip_serializing_if = "Option::is_none")] method: Option, #[serde(skip_serializing_if = "Option::is_none")] address: Option, #[serde(skip_serializing_if = "Option::is_none")] user_agent: Option, #[serde(skip_serializing_if = "Option::is_none")] path: Option, #[serde(skip_serializing_if = "Option::is_none")] path_query: Option, #[serde(skip_serializing_if = "Option::is_none")] host: Option, #[serde(skip_serializing_if = "Option::is_none")] http_version: Option, #[serde(skip_serializing_if = "Option::is_none")] secure: Option, } let mut entry = Entry::default(); for col in &self.config.columns { match col { Column::Time => { entry.time = Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos()) } Column::Address => entry.address = Some(context.addr), Column::Method => { entry.method = Some(request.method().as_str().to_string()) } Column::Host => { entry.host = request .headers() .get(HOST) .and_then(|x| x.to_str().ok()) .map(|s| s.to_string()) } Column::Path => entry.path = Some(request.uri().path().to_string()), Column::PathQuery => { entry.path_query = request .uri() .path_and_query() .map(|pq| pq.as_str().to_string()) } Column::UserAgent => { entry.user_agent = request .headers() .get(USER_AGENT) .and_then(|x| x.to_str().ok()) .map(|s| s.to_string()) } Column::HttpVersion => { entry.http_version = Some({ let v = request.version(); if v == Version::HTTP_3 { 3 } else if v == Version::HTTP_2 { 2 } else { 1 } }) } Column::Secure => entry.secure = Some(context.secure), } } let mut out = Cursor::new(Vec::new()); serde_json::to_writer(&mut out, &entry).unwrap(); write!(out, "\n").unwrap(); String::from_utf8_lossy_owned(out.into_inner()) } }; { let mut g = self.file.write().await; if let Some((log, otime)) = g.as_mut() { if otime.elapsed() > Duration::from_secs(self.config.file_duration) { info!("closing log file"); log.flush().await?; *g = None; } } if g.is_none() { if self.config.output_temp.exists() { info!("moving log file"); rename( &self.config.output_temp, self.config.output_done.join( SystemTime::UNIX_EPOCH .elapsed() .unwrap() .as_secs() .to_string(), ), ) .await?; } info!("opening new log file"); *g = Some(( BufWriter::new( OpenOptions::new() .append(true) .create(true) .open(&self.config.output_temp) .await?, ), Instant::now(), )); } let log = &mut g.as_mut().unwrap().0; log.write_all(line.as_bytes()).await?; if self.config.flush { log.flush().await?; } } self.config.next.handle(context, request).await }) } }