/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ use anyhow::{Context, anyhow}; use base64::Engine; use bincode::{Decode, Encode}; use log::{info, warn}; use rand::random; use serde::{Deserialize, Serialize}; use sha2::Sha512; use std::{ any::Any, collections::{BTreeMap, HashMap}, fs::rename, future::Future, hash::{Hash, Hasher}, io::{Seek, Write}, path::PathBuf, sync::{ Arc, LazyLock, RwLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Instant, }; use tokio::{ io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, sync::Mutex, }; #[derive(Debug, Deserialize)] pub struct Config { path: PathBuf, max_in_memory_cache_size: usize, } pub static CONF_PRELOAD: std::sync::Mutex> = std::sync::Mutex::new(None); static CONF: LazyLock = LazyLock::new(|| { CONF_PRELOAD .lock() .unwrap() .take() .expect("cache config not preloaded. logic error") }); #[derive(Debug, Encode, Decode, Serialize, Clone, PartialEq, Eq)] pub struct CachePath(pub PathBuf); impl CachePath { pub fn abs(&self) -> PathBuf { CONF.path.join(&self.0) } } pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) { use sha2::Digest; struct ShaHasher(Sha512); impl Hasher for ShaHasher { fn finish(&self) -> u64 { unreachable!() } fn write(&mut self, bytes: &[u8]) { self.0.update(bytes); } } let mut d = ShaHasher(sha2::Sha512::new()); d.0.update(kind); d.0.update(b"\0"); key.hash(&mut d); let d = d.0.finalize(); let n = d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24); let fname = base64::engine::general_purpose::URL_SAFE.encode(d); let fname = &fname[..30]; // 180 bits let fname = format!("{}/{}", kind, fname); (n, CachePath(fname.into())) } const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); pub async fn async_cache_file( kind: &str, key: impl Hash, generate: Fun, ) -> Result where Fun: FnOnce(tokio::fs::File) -> Fut, Fut: Future>, { let (bucket, location) = cache_location(kind, key); let loc_abs = location.abs(); // we need a lock even if it exists since somebody might be still in the process of writing. let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] .lock() .await; let exists = tokio::fs::try_exists(&loc_abs) .await .context("unable to test for cache file existance")?; if !exists { let temp_path = CONF.path.join(format!("temp-{:x}", random::())); let f = tokio::fs::File::create(&temp_path) .await .context("creating new cache file")?; match generate(f).await { Ok(()) => (), Err(e) => { warn!("cache generation failed, unlinking entry"); tokio::fs::remove_file(temp_path).await?; return Err(e); } } tokio::fs::create_dir_all(loc_abs.parent().unwrap()) .await .context("create kind dir")?; tokio::fs::rename(temp_path, &loc_abs) .await .context("rename cache")?; } drop(_guard); Ok(location) } thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } pub fn cache_file( kind: &str, key: impl Hash, mut generate: Fun, ) -> Result where Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, { let (bucket, location) = cache_location(kind, key); let loc_abs = location.abs(); // we need a lock even if it exists since somebody might be still in the process of writing. let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); let _guard = if already_within { // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] .try_lock() .ok() } else { Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock()) }; if !loc_abs.exists() { let temp_path = CONF.path.join(format!("temp-{:x}", random::())); let f = std::fs::File::create(&temp_path).context("creating new cache file")?; match generate(f) { Ok(()) => (), Err(e) => { warn!("cache generation failed, unlinking entry"); std::fs::remove_file(temp_path)?; return Err(e); } } std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?; rename(temp_path, loc_abs).context("rename cache")?; } if !already_within { WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); } drop(_guard); Ok(location) } pub struct InMemoryCacheEntry { size: usize, last_access: Instant, object: Arc, } pub static CACHE_IN_MEMORY_OBJECTS: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); pub fn cache_memory( kind: &str, key: impl Hash, mut generate: Fun, ) -> Result, anyhow::Error> where Fun: FnMut() -> Result, T: Encode + Decode + Send + Sync + 'static, { let (_, location) = cache_location(kind, &key); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location.abs()) { entry.last_access = Instant::now(); let object = entry .object .clone() .downcast::() .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; return Ok(object); } } let location = cache_file(kind, &key, move |file| { let object = generate()?; let mut file = std::io::BufWriter::new(file); bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) .context("encoding cache object")?; file.flush()?; Ok(()) })?; let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?); let object = bincode::decode_from_std_read::(&mut file, bincode::config::standard()) .context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( location.abs(), InMemoryCacheEntry { size, last_access: Instant::now(), object: object.clone(), }, ); CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); } cleanup_cache(); Ok(object) } pub async fn async_cache_memory( kind: &str, key: impl Hash, generate: Fun, ) -> Result, anyhow::Error> where Fun: FnOnce() -> Fut, Fut: Future>, T: Encode + Decode + Send + Sync + 'static, { let (_, location) = cache_location(kind, &key); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location.abs()) { entry.last_access = Instant::now(); let object = entry .object .clone() .downcast::() .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; return Ok(object); } } let location = async_cache_file(kind, &key, move |mut file| async move { let object = generate().await?; let data = bincode::encode_to_vec(&object, bincode::config::standard()) .context("encoding cache object")?; file.write_all(&data).await?; Ok(()) }) .await?; let mut file = tokio::fs::File::open(&location.abs()).await?; let mut data = Vec::new(); file.read_to_end(&mut data) .await .context("reading cache object")?; let (object, _) = bincode::decode_from_slice::(&data, bincode::config::standard()) .context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( location.abs(), InMemoryCacheEntry { size, last_access: Instant::now(), object: object.clone(), }, ); CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); } cleanup_cache(); Ok(object) } pub fn cleanup_cache() { let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); if current_size < CONF.max_in_memory_cache_size { return; } info!("running cache eviction"); let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now let mut k = BTreeMap::new(); for (loc, entry) in g.iter() { k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); } let mut reduction = 0; for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { g.remove(loc); reduction += size; } CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); drop(g); info!( "done, {} freed", humansize::format_size(reduction, humansize::DECIMAL) ); }