diff options
| author | metamuffin <metamuffin@disroot.org> | 2026-01-24 04:31:48 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2026-01-24 04:31:48 +0100 |
| commit | b2e88a8beabf04adc28947cf82996e8692a68b71 (patch) | |
| tree | 23d66c8672b69cce7835ffabae4092669062ada8 /cache/src/lib.rs | |
| parent | 774f64c0789529884dd7a5232f190e347ad29532 (diff) | |
| download | jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar.bz2 jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar.zst | |
move things around; kv crate
Diffstat (limited to 'cache/src/lib.rs')
| -rw-r--r-- | cache/src/lib.rs | 260 |
1 files changed, 122 insertions, 138 deletions
diff --git a/cache/src/lib.rs b/cache/src/lib.rs index 9559fbc..be2b331 100644 --- a/cache/src/lib.rs +++ b/cache/src/lib.rs @@ -3,65 +3,41 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin <metamuffin.org> */ -mod backends; mod helper; -use crate::backends::{CacheStorage, init_backend}; use anyhow::{Context, Result, anyhow}; +pub use helper::{EscapeKey, HashKey}; +use jellykv::BlobStorage; use log::{info, warn}; use serde::{Deserialize, Serialize}; use std::{ any::Any, collections::{BTreeMap, HashMap}, hash::{DefaultHasher, Hash, Hasher}, - path::PathBuf, sync::{ - Arc, LazyLock, Mutex, OnceLock, RwLock, + Arc, LazyLock, Mutex, RwLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Instant, }; -pub use helper::{EscapeKey, HashKey}; - -#[derive(Debug, Deserialize)] -pub struct Config { - driver: String, - path: PathBuf, - max_in_memory_cache_size: usize, -} - const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } -pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None); -static CONF: LazyLock<Config> = LazyLock::new(|| { - CONF_PRELOAD - .lock() - .unwrap() - .take() - .expect("cache config not preloaded. logic error") -}); - -static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new(); - -pub fn init_cache() -> Result<()> { - CACHE_STORE - .set(init_backend().context("cache backend")?) - .map_err(|_| ()) - .unwrap(); - Ok(()) +pub struct Cache { + storage: Box<dyn BlobStorage>, + memory_cache: RwLock<HashMap<String, InMemoryCacheEntry>>, + memory_cache_size: AtomicUsize, + max_memory_cache_size: usize, } -pub fn init_cache_dummy() -> Result<()> { - *CONF_PRELOAD.lock().unwrap() = Some(Config { - driver: "dummy".to_string(), - path: PathBuf::default(), - max_in_memory_cache_size: 0, - }); - init_cache() + +pub struct InMemoryCacheEntry { + size: usize, + last_access: Instant, + object: Arc<dyn Any + Send + Sync + 'static>, } fn bucket(key: &str) -> usize { @@ -70,122 +46,130 @@ fn bucket(key: &str) -> usize { h.finish() as usize % CACHE_GENERATION_BUCKET_COUNT } -pub fn cache(key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> { - // we need a lock even if it exists since somebody might be still in the process of writing. - let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); - let _guard = if already_within { - // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating - CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok() - } else { - CACHE_GENERATION_LOCKS[bucket(key)].lock().ok() - }; - - let store = CACHE_STORE.get().unwrap(); - - let out = match store.read(&key)? { - Some(x) => x, - None => { - let value = generate()?; - store.store(key.to_owned(), &value)?; - value +impl Cache { + pub fn new(storage: Box<dyn BlobStorage>, max_memory_cache_size: usize) -> Self { + Self { + max_memory_cache_size, + storage, + memory_cache: HashMap::new().into(), + memory_cache_size: AtomicUsize::new(0), } - }; - - if !already_within { - WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); } - drop(_guard); - Ok(out) -} + pub fn cache(&self, key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> { + // we need a lock even if it exists since somebody might be still in the process of writing. + let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); + let _guard = if already_within { + // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating + CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok() + } else { + CACHE_GENERATION_LOCKS[bucket(key)].lock().ok() + }; -pub fn cache_read(key: &str) -> Result<Option<Vec<u8>>> { - CACHE_STORE.get().unwrap().read(key) -} -pub fn cache_store(key: String, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<String> { - cache(&key, generate)?; - Ok(key) -} + let out = match self.storage.read(&key)? { + Some(x) => x, + None => { + let value = generate()?; + self.storage.store(key, &value)?; + value + } + }; -pub struct InMemoryCacheEntry { - size: usize, - last_access: Instant, - object: Arc<dyn Any + Send + Sync + 'static>, -} -pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<String, InMemoryCacheEntry>>> = - LazyLock::new(|| RwLock::new(HashMap::new())); -pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); + if !already_within { + WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); + } + drop(_guard); + Ok(out) + } -pub fn cache_memory<Fun, T>(key: &str, mut generate: Fun) -> Result<Arc<T>, anyhow::Error> -where - Fun: FnMut() -> Result<T, anyhow::Error>, - T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, -{ - if !key.ends_with(".json") { - warn!("cache_memory key not ending in .json: {key:?}") + pub fn cache_read(&self, key: &str) -> Result<Option<Vec<u8>>> { + self.storage.read(key) + } + pub fn cache_store( + &self, + key: String, + generate: impl FnOnce() -> Result<Vec<u8>>, + ) -> Result<String> { + self.cache(&key, generate)?; + Ok(key) } + pub fn cache_memory<Fun, T>( + &self, + key: &str, + mut generate: Fun, + ) -> Result<Arc<T>, anyhow::Error> + where + Fun: FnMut() -> Result<T, anyhow::Error>, + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(key) { - entry.last_access = Instant::now(); - let object = entry - .object - .clone() - .downcast::<T>() - .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; - return Ok(object); + if !key.ends_with(".json") { + warn!("cache_memory key not ending in .json: {key:?}") } - } - let data = cache(&key, move || { - let object = generate()?; - Ok(serde_json::to_vec(&object)?) - })?; - let size = data.len(); - let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?; - let object = Arc::new(object); + { + let mut g = self.memory_cache.write().unwrap(); + if let Some(entry) = g.get_mut(key) { + entry.last_access = Instant::now(); + let object = entry + .object + .clone() + .downcast::<T>() + .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; + return Ok(object); + } + } - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - g.insert( - key.to_owned(), - InMemoryCacheEntry { - size, - last_access: Instant::now(), - object: object.clone(), - }, - ); - CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); - } + let data = self.cache(&key, move || { + let object = generate()?; + Ok(serde_json::to_vec(&object)?) + })?; + let size = data.len(); + let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?; + let object = Arc::new(object); - cleanup_cache(); + { + let mut g = self.memory_cache.write().unwrap(); + g.insert( + key.to_owned(), + InMemoryCacheEntry { + size, + last_access: Instant::now(), + object: object.clone(), + }, + ); + self.memory_cache_size.fetch_add(size, Ordering::Relaxed); + } - Ok(object) -} + self.cleanup_cache(); -pub fn cleanup_cache() { - let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); - if current_size < CONF.max_in_memory_cache_size { - return; + Ok(object) } - info!("running cache eviction"); - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now - let mut k = BTreeMap::new(); - for (loc, entry) in g.iter() { - k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); - } - let mut reduction = 0; - for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { - g.remove(loc); - reduction += size; - } - CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); - drop(g); + fn cleanup_cache(&self) { + let current_size = self.memory_cache_size.load(Ordering::Relaxed); + if current_size < self.max_memory_cache_size { + return; + } + info!("running cache eviction"); + let mut g = self.memory_cache.write().unwrap(); - info!( - "done, {} freed", - humansize::format_size(reduction, humansize::DECIMAL) - ); + // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now + let mut k = BTreeMap::new(); + for (loc, entry) in g.iter() { + k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); + } + let mut reduction = 0; + for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { + g.remove(loc); + reduction += size; + } + self.memory_cache_size + .fetch_sub(reduction, Ordering::Relaxed); + drop(g); + + info!( + "done, {} freed", + humansize::format_size(reduction, humansize::DECIMAL) + ); + } } |