diff options
| author | metamuffin <metamuffin@disroot.org> | 2026-01-24 04:31:48 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2026-01-24 04:31:48 +0100 |
| commit | b2e88a8beabf04adc28947cf82996e8692a68b71 (patch) | |
| tree | 23d66c8672b69cce7835ffabae4092669062ada8 /cache | |
| parent | 774f64c0789529884dd7a5232f190e347ad29532 (diff) | |
| download | jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar.bz2 jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar.zst | |
move things around; kv crate
Diffstat (limited to 'cache')
| -rw-r--r-- | cache/Cargo.toml | 2 | ||||
| -rw-r--r-- | cache/src/backends/dummy.rs | 18 | ||||
| -rw-r--r-- | cache/src/backends/filesystem.rs | 50 | ||||
| -rw-r--r-- | cache/src/backends/mod.rs | 28 | ||||
| -rw-r--r-- | cache/src/backends/rocksdb.rs | 26 | ||||
| -rw-r--r-- | cache/src/lib.rs | 260 |
6 files changed, 123 insertions, 261 deletions
diff --git a/cache/Cargo.toml b/cache/Cargo.toml index c545fd2..cd891b8 100644 --- a/cache/Cargo.toml +++ b/cache/Cargo.toml @@ -13,4 +13,4 @@ rand = "0.9.2" serde = "1.0.228" serde_json = "1.0.145" percent-encoding = "2.3.2" -rocksdb = { version = "0.24.0", features = ["multi-threaded-cf"] } +jellykv = { path = "../kv" } diff --git a/cache/src/backends/dummy.rs b/cache/src/backends/dummy.rs deleted file mode 100644 index 7b0efa5..0000000 --- a/cache/src/backends/dummy.rs +++ /dev/null @@ -1,18 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ - -use crate::backends::CacheStorage; -use anyhow::Result; - -pub struct Dummy; -impl CacheStorage for Dummy { - fn store(&self, _key: String, _value: &[u8]) -> Result<()> { - Ok(()) - } - fn read(&self, _key: &str) -> Result<Option<Vec<u8>>> { - Ok(None) // sorry forgot - } -} diff --git a/cache/src/backends/filesystem.rs b/cache/src/backends/filesystem.rs deleted file mode 100644 index f1bbdf9..0000000 --- a/cache/src/backends/filesystem.rs +++ /dev/null @@ -1,50 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ - -use crate::{Config, backends::CacheStorage}; -use anyhow::{Result, bail}; -use rand::random; -use std::{ - fs::{File, create_dir_all, rename}, - io::{ErrorKind, Read, Write}, - path::PathBuf, -}; - -pub struct Filesystem(PathBuf); - -impl Filesystem { - pub fn new(config: &Config) -> Self { - Self(config.path.clone()) - } - fn temp_path(&self) -> PathBuf { - self.0.join(format!("temp-{:016x}", random::<u128>())) - } -} - -impl CacheStorage for Filesystem { - fn store(&self, key: String, value: &[u8]) -> Result<()> { - let temp = self.temp_path(); - let out = self.0.join(&key); - create_dir_all(out.parent().unwrap())?; - File::create(&temp)?.write_all(value)?; - rename(temp, out)?; - Ok(()) - } - fn read(&self, key: &str) -> Result<Option<Vec<u8>>> { - if key.contains("..") || key.starts_with("/") { - bail!("invalid key") - } - match File::open(self.0.join(key)) { - Ok(mut f) => { - let mut data = Vec::new(); - f.read_to_end(&mut data)?; - Ok(Some(data)) - } - Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), - Err(e) => Err(e.into()), - } - } -} diff --git a/cache/src/backends/mod.rs b/cache/src/backends/mod.rs deleted file mode 100644 index 52a954b..0000000 --- a/cache/src/backends/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ -pub mod dummy; -pub mod filesystem; -pub mod rocksdb; - -use crate::{ - CONF, - backends::{dummy::Dummy, filesystem::Filesystem, rocksdb::Rocksdb}, -}; -use anyhow::{Result, bail}; - -pub(crate) trait CacheStorage: Send + Sync + 'static { - fn store(&self, key: String, value: &[u8]) -> Result<()>; - fn read(&self, key: &str) -> Result<Option<Vec<u8>>>; -} - -pub fn init_backend() -> Result<Box<dyn CacheStorage>> { - Ok(match CONF.driver.as_str() { - "filesystem" => Box::new(Filesystem::new(&CONF)), - "rocksdb" => Box::new(Rocksdb::new(&CONF)?), - "dummy" => Box::new(Dummy), - _ => bail!("unknown driver"), - }) -} diff --git a/cache/src/backends/rocksdb.rs b/cache/src/backends/rocksdb.rs deleted file mode 100644 index 9db86dd..0000000 --- a/cache/src/backends/rocksdb.rs +++ /dev/null @@ -1,26 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ - -use crate::{Config, backends::CacheStorage}; -use anyhow::Result; -use rocksdb::DB; - -pub struct Rocksdb(DB); - -impl Rocksdb { - pub fn new(config: &Config) -> Result<Self> { - Ok(Self(rocksdb::DB::open_default(config.path.clone())?)) - } -} - -impl CacheStorage for Rocksdb { - fn store(&self, key: String, value: &[u8]) -> Result<()> { - Ok(self.0.put(key, value)?) - } - fn read(&self, key: &str) -> Result<Option<Vec<u8>>> { - Ok(self.0.get(key)?) - } -} diff --git a/cache/src/lib.rs b/cache/src/lib.rs index 9559fbc..be2b331 100644 --- a/cache/src/lib.rs +++ b/cache/src/lib.rs @@ -3,65 +3,41 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin <metamuffin.org> */ -mod backends; mod helper; -use crate::backends::{CacheStorage, init_backend}; use anyhow::{Context, Result, anyhow}; +pub use helper::{EscapeKey, HashKey}; +use jellykv::BlobStorage; use log::{info, warn}; use serde::{Deserialize, Serialize}; use std::{ any::Any, collections::{BTreeMap, HashMap}, hash::{DefaultHasher, Hash, Hasher}, - path::PathBuf, sync::{ - Arc, LazyLock, Mutex, OnceLock, RwLock, + Arc, LazyLock, Mutex, RwLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Instant, }; -pub use helper::{EscapeKey, HashKey}; - -#[derive(Debug, Deserialize)] -pub struct Config { - driver: String, - path: PathBuf, - max_in_memory_cache_size: usize, -} - const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } -pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None); -static CONF: LazyLock<Config> = LazyLock::new(|| { - CONF_PRELOAD - .lock() - .unwrap() - .take() - .expect("cache config not preloaded. logic error") -}); - -static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new(); - -pub fn init_cache() -> Result<()> { - CACHE_STORE - .set(init_backend().context("cache backend")?) - .map_err(|_| ()) - .unwrap(); - Ok(()) +pub struct Cache { + storage: Box<dyn BlobStorage>, + memory_cache: RwLock<HashMap<String, InMemoryCacheEntry>>, + memory_cache_size: AtomicUsize, + max_memory_cache_size: usize, } -pub fn init_cache_dummy() -> Result<()> { - *CONF_PRELOAD.lock().unwrap() = Some(Config { - driver: "dummy".to_string(), - path: PathBuf::default(), - max_in_memory_cache_size: 0, - }); - init_cache() + +pub struct InMemoryCacheEntry { + size: usize, + last_access: Instant, + object: Arc<dyn Any + Send + Sync + 'static>, } fn bucket(key: &str) -> usize { @@ -70,122 +46,130 @@ fn bucket(key: &str) -> usize { h.finish() as usize % CACHE_GENERATION_BUCKET_COUNT } -pub fn cache(key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> { - // we need a lock even if it exists since somebody might be still in the process of writing. - let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); - let _guard = if already_within { - // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating - CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok() - } else { - CACHE_GENERATION_LOCKS[bucket(key)].lock().ok() - }; - - let store = CACHE_STORE.get().unwrap(); - - let out = match store.read(&key)? { - Some(x) => x, - None => { - let value = generate()?; - store.store(key.to_owned(), &value)?; - value +impl Cache { + pub fn new(storage: Box<dyn BlobStorage>, max_memory_cache_size: usize) -> Self { + Self { + max_memory_cache_size, + storage, + memory_cache: HashMap::new().into(), + memory_cache_size: AtomicUsize::new(0), } - }; - - if !already_within { - WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); } - drop(_guard); - Ok(out) -} + pub fn cache(&self, key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> { + // we need a lock even if it exists since somebody might be still in the process of writing. + let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); + let _guard = if already_within { + // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating + CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok() + } else { + CACHE_GENERATION_LOCKS[bucket(key)].lock().ok() + }; -pub fn cache_read(key: &str) -> Result<Option<Vec<u8>>> { - CACHE_STORE.get().unwrap().read(key) -} -pub fn cache_store(key: String, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<String> { - cache(&key, generate)?; - Ok(key) -} + let out = match self.storage.read(&key)? { + Some(x) => x, + None => { + let value = generate()?; + self.storage.store(key, &value)?; + value + } + }; -pub struct InMemoryCacheEntry { - size: usize, - last_access: Instant, - object: Arc<dyn Any + Send + Sync + 'static>, -} -pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<String, InMemoryCacheEntry>>> = - LazyLock::new(|| RwLock::new(HashMap::new())); -pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); + if !already_within { + WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); + } + drop(_guard); + Ok(out) + } -pub fn cache_memory<Fun, T>(key: &str, mut generate: Fun) -> Result<Arc<T>, anyhow::Error> -where - Fun: FnMut() -> Result<T, anyhow::Error>, - T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, -{ - if !key.ends_with(".json") { - warn!("cache_memory key not ending in .json: {key:?}") + pub fn cache_read(&self, key: &str) -> Result<Option<Vec<u8>>> { + self.storage.read(key) + } + pub fn cache_store( + &self, + key: String, + generate: impl FnOnce() -> Result<Vec<u8>>, + ) -> Result<String> { + self.cache(&key, generate)?; + Ok(key) } + pub fn cache_memory<Fun, T>( + &self, + key: &str, + mut generate: Fun, + ) -> Result<Arc<T>, anyhow::Error> + where + Fun: FnMut() -> Result<T, anyhow::Error>, + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(key) { - entry.last_access = Instant::now(); - let object = entry - .object - .clone() - .downcast::<T>() - .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; - return Ok(object); + if !key.ends_with(".json") { + warn!("cache_memory key not ending in .json: {key:?}") } - } - let data = cache(&key, move || { - let object = generate()?; - Ok(serde_json::to_vec(&object)?) - })?; - let size = data.len(); - let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?; - let object = Arc::new(object); + { + let mut g = self.memory_cache.write().unwrap(); + if let Some(entry) = g.get_mut(key) { + entry.last_access = Instant::now(); + let object = entry + .object + .clone() + .downcast::<T>() + .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; + return Ok(object); + } + } - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - g.insert( - key.to_owned(), - InMemoryCacheEntry { - size, - last_access: Instant::now(), - object: object.clone(), - }, - ); - CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); - } + let data = self.cache(&key, move || { + let object = generate()?; + Ok(serde_json::to_vec(&object)?) + })?; + let size = data.len(); + let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?; + let object = Arc::new(object); - cleanup_cache(); + { + let mut g = self.memory_cache.write().unwrap(); + g.insert( + key.to_owned(), + InMemoryCacheEntry { + size, + last_access: Instant::now(), + object: object.clone(), + }, + ); + self.memory_cache_size.fetch_add(size, Ordering::Relaxed); + } - Ok(object) -} + self.cleanup_cache(); -pub fn cleanup_cache() { - let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); - if current_size < CONF.max_in_memory_cache_size { - return; + Ok(object) } - info!("running cache eviction"); - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now - let mut k = BTreeMap::new(); - for (loc, entry) in g.iter() { - k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); - } - let mut reduction = 0; - for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { - g.remove(loc); - reduction += size; - } - CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); - drop(g); + fn cleanup_cache(&self) { + let current_size = self.memory_cache_size.load(Ordering::Relaxed); + if current_size < self.max_memory_cache_size { + return; + } + info!("running cache eviction"); + let mut g = self.memory_cache.write().unwrap(); - info!( - "done, {} freed", - humansize::format_size(reduction, humansize::DECIMAL) - ); + // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now + let mut k = BTreeMap::new(); + for (loc, entry) in g.iter() { + k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); + } + let mut reduction = 0; + for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { + g.remove(loc); + reduction += size; + } + self.memory_cache_size + .fetch_sub(reduction, Ordering::Relaxed); + drop(g); + + info!( + "done, {} freed", + humansize::format_size(reduction, humansize::DECIMAL) + ); + } } |