/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ use crate::{AssetLocationExt, CONF}; use anyhow::{anyhow, Context}; use base64::Engine; use bincode::{Decode, Encode}; use jellycommon::AssetLocation; use log::{info, warn}; use rand::random; use std::{ any::Any, collections::{BTreeMap, HashMap}, fs::rename, future::Future, io::Seek, path::PathBuf, str::FromStr, sync::{ atomic::{AtomicUsize, Ordering}, Arc, LazyLock, RwLock, }, time::Instant, }; use tokio::sync::Mutex; pub fn cache_location(seed: &[&str]) -> (usize, AssetLocation) { use sha2::Digest; let mut d = sha2::Sha512::new(); for s in seed { d.update(s.as_bytes()); d.update(b"\0"); } let d = d.finalize(); let n = d[0] as usize | (d[1] as usize) << 8 | (d[2] as usize) << 16 | (d[3] as usize) << 24; let fname = base64::engine::general_purpose::URL_SAFE.encode(d); let fname = &fname[..22]; let fname = format!("{}-{}", seed[0], fname); // about 128 bits (n, AssetLocation::Cache(fname.into())) } const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); pub async fn async_cache_file( seed: &[&str], generate: Fun, ) -> Result where Fun: FnOnce(tokio::fs::File) -> Fut, Fut: Future>, { let (bucket, location) = cache_location(seed); // we need a lock even if it exists since somebody might be still in the process of writing. let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] .lock() .await; let exists = tokio::fs::try_exists(location.path()) .await .context("unable to test for cache file existance")?; if !exists { let temp_path = AssetLocation::Cache(PathBuf::from_str(&format!("temp-{:x}", random::()))?) .path(); let f = tokio::fs::File::create(&temp_path) .await .context("creating new cache file")?; match generate(f).await { Ok(()) => (), Err(e) => { warn!("cache generation failed, unlinking entry"); tokio::fs::remove_file(temp_path).await?; return Err(e); } } tokio::fs::rename(temp_path, location.path()) .await .context("rename cache")?; } drop(_guard); Ok(location) } pub fn cache_file(seed: &[&str], mut generate: Fun) -> Result where Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, { let (bucket, location) = cache_location(seed); // we need a lock even if it exists since somebody might be still in the process of writing. let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock(); let exists = location.path().exists(); if !exists { let temp_path = AssetLocation::Cache(PathBuf::from_str(&format!("temp-{:x}", random::()))?) .path(); let f = std::fs::File::create(&temp_path).context("creating new cache file")?; match generate(f) { Ok(()) => (), Err(e) => { warn!("cache generation failed, unlinking entry"); std::fs::remove_file(temp_path)?; return Err(e); } } rename(temp_path, location.path()).context("rename cache")?; } drop(_guard); Ok(location) } pub struct InMemoryCacheEntry { size: usize, last_access: Instant, object: Arc, } pub static CACHE_IN_MEMORY_OBJECTS: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); pub fn cache_memory(seed: &[&str], mut generate: Fun) -> Result, anyhow::Error> where Fun: FnMut() -> Result, T: Encode + Decode + Send + Sync + 'static, { let (_, location) = cache_location(seed); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location) { entry.last_access = Instant::now(); let object = entry .object .clone() .downcast::() .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; return Ok(object); } } let location = cache_file(seed, move |mut file| { let object = generate()?; bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) .context("encoding cache object")?; Ok(()) })?; let mut file = std::fs::File::open(location.path())?; let object = bincode::decode_from_std_read::(&mut file, bincode::config::standard()) .context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( location, InMemoryCacheEntry { size, last_access: Instant::now(), object: object.clone(), }, ); CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); } cleanup_cache(); Ok(object) } pub fn cleanup_cache() { let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); if current_size < CONF.max_in_memory_cache_size { return; } info!("running cache eviction"); let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now let mut k = BTreeMap::new(); for (loc, entry) in g.iter() { k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); } let mut reduction = 0; for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { g.remove(loc); reduction += size; } CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); drop(g); info!("done"); }