diff options
Diffstat (limited to 'base/src/cache.rs')
-rw-r--r-- | base/src/cache.rs | 306 |
1 files changed, 0 insertions, 306 deletions
diff --git a/base/src/cache.rs b/base/src/cache.rs deleted file mode 100644 index 02c42c8..0000000 --- a/base/src/cache.rs +++ /dev/null @@ -1,306 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin <metamuffin.org> -*/ -use crate::CONF; -use anyhow::{anyhow, Context}; -use base64::Engine; -use bincode::{Decode, Encode}; -use log::{info, warn}; -use rand::random; -use serde::Serialize; -use sha2::Sha512; -use std::{ - any::Any, - collections::{BTreeMap, HashMap}, - fs::rename, - future::Future, - hash::{Hash, Hasher}, - io::{Seek, Write}, - path::PathBuf, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, LazyLock, RwLock, - }, - time::Instant, -}; -use tokio::{ - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, - sync::Mutex, -}; - -#[derive(Debug, Encode, Decode, Serialize)] -pub struct CachePath(pub PathBuf); -impl CachePath { - pub fn abs(&self) -> PathBuf { - CONF.cache_path.join(&self.0) - } -} - -pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) { - use sha2::Digest; - struct ShaHasher(Sha512); - impl Hasher for ShaHasher { - fn finish(&self) -> u64 { - unreachable!() - } - fn write(&mut self, bytes: &[u8]) { - self.0.update(bytes); - } - } - let mut d = ShaHasher(sha2::Sha512::new()); - d.0.update(kind); - d.0.update(b"\0"); - key.hash(&mut d); - - let d = d.0.finalize(); - let n = - d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24); - let fname = base64::engine::general_purpose::URL_SAFE.encode(d); - let fname = &fname[..30]; // 180 bits - let fname = format!("{}/{}", kind, fname); - (n, CachePath(fname.into())) -} - -const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; -pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = - LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); - -pub async fn async_cache_file<Fun, Fut>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<CachePath, anyhow::Error> -where - Fun: FnOnce(tokio::fs::File) -> Fut, - Fut: Future<Output = Result<(), anyhow::Error>>, -{ - let (bucket, location) = cache_location(kind, key); - let loc_abs = location.abs(); - // we need a lock even if it exists since somebody might be still in the process of writing. - let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] - .lock() - .await; - let exists = tokio::fs::try_exists(&loc_abs) - .await - .context("unable to test for cache file existance")?; - if !exists { - let temp_path = CONF.cache_path.join(format!("temp-{:x}", random::<u128>())); - let f = tokio::fs::File::create(&temp_path) - .await - .context("creating new cache file")?; - match generate(f).await { - Ok(()) => (), - Err(e) => { - warn!("cache generation failed, unlinking entry"); - tokio::fs::remove_file(temp_path).await?; - return Err(e); - } - } - tokio::fs::create_dir_all(loc_abs.parent().unwrap()) - .await - .context("create kind dir")?; - tokio::fs::rename(temp_path, &loc_abs) - .await - .context("rename cache")?; - } - drop(_guard); - Ok(location) -} - -thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } - -pub fn cache_file<Fun>( - kind: &str, - key: impl Hash, - mut generate: Fun, -) -> Result<CachePath, anyhow::Error> -where - Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, -{ - let (bucket, location) = cache_location(kind, key); - let loc_abs = location.abs(); - // we need a lock even if it exists since somebody might be still in the process of writing. - let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); - let _guard = if already_within { - // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed - CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] - .try_lock() - .ok() - } else { - Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock()) - }; - if !loc_abs.exists() { - let temp_path = CONF.cache_path.join(format!("temp-{:x}", random::<u128>())); - let f = std::fs::File::create(&temp_path).context("creating new cache file")?; - match generate(f) { - Ok(()) => (), - Err(e) => { - warn!("cache generation failed, unlinking entry"); - std::fs::remove_file(temp_path)?; - return Err(e); - } - } - std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?; - rename(temp_path, loc_abs).context("rename cache")?; - } - if !already_within { - WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); - } - drop(_guard); - Ok(location) -} - -pub struct InMemoryCacheEntry { - size: usize, - last_access: Instant, - object: Arc<dyn Any + Send + Sync + 'static>, -} -pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> = - LazyLock::new(|| RwLock::new(HashMap::new())); -pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); - -pub fn cache_memory<Fun, T>( - kind: &str, - key: impl Hash, - mut generate: Fun, -) -> Result<Arc<T>, anyhow::Error> -where - Fun: FnMut() -> Result<T, anyhow::Error>, - T: Encode + Decode + Send + Sync + 'static, -{ - let (_, location) = cache_location(kind, &key); - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(&location.abs()) { - entry.last_access = Instant::now(); - let object = entry - .object - .clone() - .downcast::<T>() - .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; - return Ok(object); - } - } - - let location = cache_file(kind, &key, move |file| { - let object = generate()?; - let mut file = std::io::BufWriter::new(file); - bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) - .context("encoding cache object")?; - file.flush()?; - Ok(()) - })?; - let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?); - let object = bincode::decode_from_std_read::<T, _, _>(&mut file, bincode::config::standard()) - .context("decoding cache object")?; - let object = Arc::new(object); - let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode - - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - g.insert( - location.abs(), - InMemoryCacheEntry { - size, - last_access: Instant::now(), - object: object.clone(), - }, - ); - CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); - } - - cleanup_cache(); - - Ok(object) -} - -pub async fn async_cache_memory<Fun, Fut, T>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<Arc<T>, anyhow::Error> -where - Fun: FnOnce() -> Fut, - Fut: Future<Output = Result<T, anyhow::Error>>, - T: Encode + Decode + Send + Sync + 'static, -{ - let (_, location) = cache_location(kind, &key); - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(&location.abs()) { - entry.last_access = Instant::now(); - let object = entry - .object - .clone() - .downcast::<T>() - .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; - return Ok(object); - } - } - - let location = async_cache_file(kind, &key, move |mut file| async move { - let object = generate().await?; - let data = bincode::encode_to_vec(&object, bincode::config::standard()) - .context("encoding cache object")?; - - file.write_all(&data).await?; - - Ok(()) - }) - .await?; - let mut file = tokio::fs::File::open(&location.abs()).await?; - let mut data = Vec::new(); - file.read_to_end(&mut data) - .await - .context("reading cache object")?; - let (object, _) = bincode::decode_from_slice::<T, _>(&data, bincode::config::standard()) - .context("decoding cache object")?; - let object = Arc::new(object); - let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode - - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - g.insert( - location.abs(), - InMemoryCacheEntry { - size, - last_access: Instant::now(), - object: object.clone(), - }, - ); - CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); - } - - cleanup_cache(); - - Ok(object) -} - -pub fn cleanup_cache() { - let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); - if current_size < CONF.max_in_memory_cache_size { - return; - } - info!("running cache eviction"); - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - - // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now - let mut k = BTreeMap::new(); - for (loc, entry) in g.iter() { - k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); - } - let mut reduction = 0; - for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { - g.remove(loc); - reduction += size; - } - CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); - drop(g); - - info!( - "done, {} freed", - humansize::format_size(reduction, humansize::DECIMAL) - ); -} |