/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2024 metamuffin */ use crate::CONF; use anyhow::{anyhow, Context}; use base64::Engine; use bincode::{Decode, Encode}; use log::{info, warn}; use rand::random; use serde::Serialize; use std::{ any::Any, collections::{BTreeMap, HashMap}, fs::rename, future::Future, io::Seek, path::PathBuf, sync::{ atomic::{AtomicUsize, Ordering}, Arc, LazyLock, RwLock, }, time::Instant, }; use tokio::{ io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, sync::Mutex, }; #[derive(Debug, Encode, Decode, Serialize)] pub struct CachePath(pub PathBuf); impl CachePath { pub fn abs(&self) -> PathBuf { CONF.cache_path.join(&self.0) } } pub fn cache_location(seed: &[&str]) -> (usize, CachePath) { use sha2::Digest; let mut d = sha2::Sha512::new(); for s in seed { d.update(s.as_bytes()); d.update(b"\0"); } let d = d.finalize(); let n = d[0] as usize | (d[1] as usize) << 8 | (d[2] as usize) << 16 | (d[3] as usize) << 24; let fname = base64::engine::general_purpose::URL_SAFE.encode(d); let fname = &fname[..22]; let fname = format!("{}-{}", seed[0], fname); // about 128 bits (n, CachePath(fname.into())) } const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); pub async fn async_cache_file( seed: &[&str], generate: Fun, ) -> Result where Fun: FnOnce(tokio::fs::File) -> Fut, Fut: Future>, { let (bucket, location) = cache_location(seed); // we need a lock even if it exists since somebody might be still in the process of writing. let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] .lock() .await; let exists = tokio::fs::try_exists(&location.abs()) .await .context("unable to test for cache file existance")?; if !exists { let temp_path = CONF.cache_path.join(format!("temp-{:x}", random::())); let f = tokio::fs::File::create(&temp_path) .await .context("creating new cache file")?; match generate(f).await { Ok(()) => (), Err(e) => { warn!("cache generation failed, unlinking entry"); tokio::fs::remove_file(temp_path).await?; return Err(e); } } tokio::fs::rename(temp_path, &location.abs()) .await .context("rename cache")?; } drop(_guard); Ok(location) } pub fn cache_file(seed: &[&str], mut generate: Fun) -> Result where Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, { let (bucket, location) = cache_location(seed); // we need a lock even if it exists since somebody might be still in the process of writing. let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock(); if !location.abs().exists() { let temp_path = CONF.cache_path.join(format!("temp-{:x}", random::())); let f = std::fs::File::create(&temp_path).context("creating new cache file")?; match generate(f) { Ok(()) => (), Err(e) => { warn!("cache generation failed, unlinking entry"); std::fs::remove_file(temp_path)?; return Err(e); } } rename(temp_path, &location.abs()).context("rename cache")?; } drop(_guard); Ok(location) } pub struct InMemoryCacheEntry { size: usize, last_access: Instant, object: Arc, } pub static CACHE_IN_MEMORY_OBJECTS: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); pub fn cache_memory(seed: &[&str], mut generate: Fun) -> Result, anyhow::Error> where Fun: FnMut() -> Result, T: Encode + Decode + Send + Sync + 'static, { let (_, location) = cache_location(seed); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location.abs()) { entry.last_access = Instant::now(); let object = entry .object .clone() .downcast::() .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; return Ok(object); } } let location = cache_file(seed, move |mut file| { let object = generate()?; bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) .context("encoding cache object")?; Ok(()) })?; let mut file = std::fs::File::open(&location.abs())?; let object = bincode::decode_from_std_read::(&mut file, bincode::config::standard()) .context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( location.abs(), InMemoryCacheEntry { size, last_access: Instant::now(), object: object.clone(), }, ); CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); } cleanup_cache(); Ok(object) } pub async fn async_cache_memory( seed: &[&str], mut generate: Fun, ) -> Result, anyhow::Error> where Fun: FnMut() -> Fut, Fut: Future>, T: Encode + Decode + Send + Sync + 'static, { let (_, location) = cache_location(seed); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location.abs()) { entry.last_access = Instant::now(); let object = entry .object .clone() .downcast::() .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; return Ok(object); } } let location = async_cache_file(seed, move |mut file| async move { let object = generate().await?; let data = bincode::encode_to_vec(&object, bincode::config::standard()) .context("encoding cache object")?; file.write_all(&data).await?; Ok(()) }) .await?; let mut file = tokio::fs::File::open(&location.abs()).await?; let mut data = Vec::new(); file.read_to_end(&mut data) .await .context("reading cache object")?; let (object, _) = bincode::decode_from_slice::(&data, bincode::config::standard()) .context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( location.abs(), InMemoryCacheEntry { size, last_access: Instant::now(), object: object.clone(), }, ); CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); } cleanup_cache(); Ok(object) } pub fn cleanup_cache() { let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); if current_size < CONF.max_in_memory_cache_size { return; } info!("running cache eviction"); let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now let mut k = BTreeMap::new(); for (loc, entry) in g.iter() { k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); } let mut reduction = 0; for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { g.remove(loc); reduction += size; } CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); drop(g); info!("done"); }