aboutsummaryrefslogtreecommitdiff
path: root/cache
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2026-01-24 04:31:48 +0100
committermetamuffin <metamuffin@disroot.org>2026-01-24 04:31:48 +0100
commitb2e88a8beabf04adc28947cf82996e8692a68b71 (patch)
tree23d66c8672b69cce7835ffabae4092669062ada8 /cache
parent774f64c0789529884dd7a5232f190e347ad29532 (diff)
downloadjellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar
jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar.bz2
jellything-b2e88a8beabf04adc28947cf82996e8692a68b71.tar.zst
move things around; kv crate
Diffstat (limited to 'cache')
-rw-r--r--cache/Cargo.toml2
-rw-r--r--cache/src/backends/dummy.rs18
-rw-r--r--cache/src/backends/filesystem.rs50
-rw-r--r--cache/src/backends/mod.rs28
-rw-r--r--cache/src/backends/rocksdb.rs26
-rw-r--r--cache/src/lib.rs260
6 files changed, 123 insertions, 261 deletions
diff --git a/cache/Cargo.toml b/cache/Cargo.toml
index c545fd2..cd891b8 100644
--- a/cache/Cargo.toml
+++ b/cache/Cargo.toml
@@ -13,4 +13,4 @@ rand = "0.9.2"
serde = "1.0.228"
serde_json = "1.0.145"
percent-encoding = "2.3.2"
-rocksdb = { version = "0.24.0", features = ["multi-threaded-cf"] }
+jellykv = { path = "../kv" }
diff --git a/cache/src/backends/dummy.rs b/cache/src/backends/dummy.rs
deleted file mode 100644
index 7b0efa5..0000000
--- a/cache/src/backends/dummy.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-
-use crate::backends::CacheStorage;
-use anyhow::Result;
-
-pub struct Dummy;
-impl CacheStorage for Dummy {
- fn store(&self, _key: String, _value: &[u8]) -> Result<()> {
- Ok(())
- }
- fn read(&self, _key: &str) -> Result<Option<Vec<u8>>> {
- Ok(None) // sorry forgot
- }
-}
diff --git a/cache/src/backends/filesystem.rs b/cache/src/backends/filesystem.rs
deleted file mode 100644
index f1bbdf9..0000000
--- a/cache/src/backends/filesystem.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-
-use crate::{Config, backends::CacheStorage};
-use anyhow::{Result, bail};
-use rand::random;
-use std::{
- fs::{File, create_dir_all, rename},
- io::{ErrorKind, Read, Write},
- path::PathBuf,
-};
-
-pub struct Filesystem(PathBuf);
-
-impl Filesystem {
- pub fn new(config: &Config) -> Self {
- Self(config.path.clone())
- }
- fn temp_path(&self) -> PathBuf {
- self.0.join(format!("temp-{:016x}", random::<u128>()))
- }
-}
-
-impl CacheStorage for Filesystem {
- fn store(&self, key: String, value: &[u8]) -> Result<()> {
- let temp = self.temp_path();
- let out = self.0.join(&key);
- create_dir_all(out.parent().unwrap())?;
- File::create(&temp)?.write_all(value)?;
- rename(temp, out)?;
- Ok(())
- }
- fn read(&self, key: &str) -> Result<Option<Vec<u8>>> {
- if key.contains("..") || key.starts_with("/") {
- bail!("invalid key")
- }
- match File::open(self.0.join(key)) {
- Ok(mut f) => {
- let mut data = Vec::new();
- f.read_to_end(&mut data)?;
- Ok(Some(data))
- }
- Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
- Err(e) => Err(e.into()),
- }
- }
-}
diff --git a/cache/src/backends/mod.rs b/cache/src/backends/mod.rs
deleted file mode 100644
index 52a954b..0000000
--- a/cache/src/backends/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-pub mod dummy;
-pub mod filesystem;
-pub mod rocksdb;
-
-use crate::{
- CONF,
- backends::{dummy::Dummy, filesystem::Filesystem, rocksdb::Rocksdb},
-};
-use anyhow::{Result, bail};
-
-pub(crate) trait CacheStorage: Send + Sync + 'static {
- fn store(&self, key: String, value: &[u8]) -> Result<()>;
- fn read(&self, key: &str) -> Result<Option<Vec<u8>>>;
-}
-
-pub fn init_backend() -> Result<Box<dyn CacheStorage>> {
- Ok(match CONF.driver.as_str() {
- "filesystem" => Box::new(Filesystem::new(&CONF)),
- "rocksdb" => Box::new(Rocksdb::new(&CONF)?),
- "dummy" => Box::new(Dummy),
- _ => bail!("unknown driver"),
- })
-}
diff --git a/cache/src/backends/rocksdb.rs b/cache/src/backends/rocksdb.rs
deleted file mode 100644
index 9db86dd..0000000
--- a/cache/src/backends/rocksdb.rs
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-
-use crate::{Config, backends::CacheStorage};
-use anyhow::Result;
-use rocksdb::DB;
-
-pub struct Rocksdb(DB);
-
-impl Rocksdb {
- pub fn new(config: &Config) -> Result<Self> {
- Ok(Self(rocksdb::DB::open_default(config.path.clone())?))
- }
-}
-
-impl CacheStorage for Rocksdb {
- fn store(&self, key: String, value: &[u8]) -> Result<()> {
- Ok(self.0.put(key, value)?)
- }
- fn read(&self, key: &str) -> Result<Option<Vec<u8>>> {
- Ok(self.0.get(key)?)
- }
-}
diff --git a/cache/src/lib.rs b/cache/src/lib.rs
index 9559fbc..be2b331 100644
--- a/cache/src/lib.rs
+++ b/cache/src/lib.rs
@@ -3,65 +3,41 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2026 metamuffin <metamuffin.org>
*/
-mod backends;
mod helper;
-use crate::backends::{CacheStorage, init_backend};
use anyhow::{Context, Result, anyhow};
+pub use helper::{EscapeKey, HashKey};
+use jellykv::BlobStorage;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use std::{
any::Any,
collections::{BTreeMap, HashMap},
hash::{DefaultHasher, Hash, Hasher},
- path::PathBuf,
sync::{
- Arc, LazyLock, Mutex, OnceLock, RwLock,
+ Arc, LazyLock, Mutex, RwLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Instant,
};
-pub use helper::{EscapeKey, HashKey};
-
-#[derive(Debug, Deserialize)]
-pub struct Config {
- driver: String,
- path: PathBuf,
- max_in_memory_cache_size: usize,
-}
-
const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
-pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None);
-static CONF: LazyLock<Config> = LazyLock::new(|| {
- CONF_PRELOAD
- .lock()
- .unwrap()
- .take()
- .expect("cache config not preloaded. logic error")
-});
-
-static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new();
-
-pub fn init_cache() -> Result<()> {
- CACHE_STORE
- .set(init_backend().context("cache backend")?)
- .map_err(|_| ())
- .unwrap();
- Ok(())
+pub struct Cache {
+ storage: Box<dyn BlobStorage>,
+ memory_cache: RwLock<HashMap<String, InMemoryCacheEntry>>,
+ memory_cache_size: AtomicUsize,
+ max_memory_cache_size: usize,
}
-pub fn init_cache_dummy() -> Result<()> {
- *CONF_PRELOAD.lock().unwrap() = Some(Config {
- driver: "dummy".to_string(),
- path: PathBuf::default(),
- max_in_memory_cache_size: 0,
- });
- init_cache()
+
+pub struct InMemoryCacheEntry {
+ size: usize,
+ last_access: Instant,
+ object: Arc<dyn Any + Send + Sync + 'static>,
}
fn bucket(key: &str) -> usize {
@@ -70,122 +46,130 @@ fn bucket(key: &str) -> usize {
h.finish() as usize % CACHE_GENERATION_BUCKET_COUNT
}
-pub fn cache(key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> {
- // we need a lock even if it exists since somebody might be still in the process of writing.
- let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
- let _guard = if already_within {
- // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating
- CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok()
- } else {
- CACHE_GENERATION_LOCKS[bucket(key)].lock().ok()
- };
-
- let store = CACHE_STORE.get().unwrap();
-
- let out = match store.read(&key)? {
- Some(x) => x,
- None => {
- let value = generate()?;
- store.store(key.to_owned(), &value)?;
- value
+impl Cache {
+ pub fn new(storage: Box<dyn BlobStorage>, max_memory_cache_size: usize) -> Self {
+ Self {
+ max_memory_cache_size,
+ storage,
+ memory_cache: HashMap::new().into(),
+ memory_cache_size: AtomicUsize::new(0),
}
- };
-
- if !already_within {
- WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
}
- drop(_guard);
- Ok(out)
-}
+ pub fn cache(&self, key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> {
+ // we need a lock even if it exists since somebody might be still in the process of writing.
+ let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
+ let _guard = if already_within {
+ // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating
+ CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok()
+ } else {
+ CACHE_GENERATION_LOCKS[bucket(key)].lock().ok()
+ };
-pub fn cache_read(key: &str) -> Result<Option<Vec<u8>>> {
- CACHE_STORE.get().unwrap().read(key)
-}
-pub fn cache_store(key: String, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<String> {
- cache(&key, generate)?;
- Ok(key)
-}
+ let out = match self.storage.read(&key)? {
+ Some(x) => x,
+ None => {
+ let value = generate()?;
+ self.storage.store(key, &value)?;
+ value
+ }
+ };
-pub struct InMemoryCacheEntry {
- size: usize,
- last_access: Instant,
- object: Arc<dyn Any + Send + Sync + 'static>,
-}
-pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<String, InMemoryCacheEntry>>> =
- LazyLock::new(|| RwLock::new(HashMap::new()));
-pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0);
+ if !already_within {
+ WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
+ }
+ drop(_guard);
+ Ok(out)
+ }
-pub fn cache_memory<Fun, T>(key: &str, mut generate: Fun) -> Result<Arc<T>, anyhow::Error>
-where
- Fun: FnMut() -> Result<T, anyhow::Error>,
- T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
-{
- if !key.ends_with(".json") {
- warn!("cache_memory key not ending in .json: {key:?}")
+ pub fn cache_read(&self, key: &str) -> Result<Option<Vec<u8>>> {
+ self.storage.read(key)
+ }
+ pub fn cache_store(
+ &self,
+ key: String,
+ generate: impl FnOnce() -> Result<Vec<u8>>,
+ ) -> Result<String> {
+ self.cache(&key, generate)?;
+ Ok(key)
}
+ pub fn cache_memory<Fun, T>(
+ &self,
+ key: &str,
+ mut generate: Fun,
+ ) -> Result<Arc<T>, anyhow::Error>
+ where
+ Fun: FnMut() -> Result<T, anyhow::Error>,
+ T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
{
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- if let Some(entry) = g.get_mut(key) {
- entry.last_access = Instant::now();
- let object = entry
- .object
- .clone()
- .downcast::<T>()
- .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
- return Ok(object);
+ if !key.ends_with(".json") {
+ warn!("cache_memory key not ending in .json: {key:?}")
}
- }
- let data = cache(&key, move || {
- let object = generate()?;
- Ok(serde_json::to_vec(&object)?)
- })?;
- let size = data.len();
- let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?;
- let object = Arc::new(object);
+ {
+ let mut g = self.memory_cache.write().unwrap();
+ if let Some(entry) = g.get_mut(key) {
+ entry.last_access = Instant::now();
+ let object = entry
+ .object
+ .clone()
+ .downcast::<T>()
+ .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
+ return Ok(object);
+ }
+ }
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- g.insert(
- key.to_owned(),
- InMemoryCacheEntry {
- size,
- last_access: Instant::now(),
- object: object.clone(),
- },
- );
- CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
- }
+ let data = self.cache(&key, move || {
+ let object = generate()?;
+ Ok(serde_json::to_vec(&object)?)
+ })?;
+ let size = data.len();
+ let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?;
+ let object = Arc::new(object);
- cleanup_cache();
+ {
+ let mut g = self.memory_cache.write().unwrap();
+ g.insert(
+ key.to_owned(),
+ InMemoryCacheEntry {
+ size,
+ last_access: Instant::now(),
+ object: object.clone(),
+ },
+ );
+ self.memory_cache_size.fetch_add(size, Ordering::Relaxed);
+ }
- Ok(object)
-}
+ self.cleanup_cache();
-pub fn cleanup_cache() {
- let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed);
- if current_size < CONF.max_in_memory_cache_size {
- return;
+ Ok(object)
}
- info!("running cache eviction");
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now
- let mut k = BTreeMap::new();
- for (loc, entry) in g.iter() {
- k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size));
- }
- let mut reduction = 0;
- for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) {
- g.remove(loc);
- reduction += size;
- }
- CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed);
- drop(g);
+ fn cleanup_cache(&self) {
+ let current_size = self.memory_cache_size.load(Ordering::Relaxed);
+ if current_size < self.max_memory_cache_size {
+ return;
+ }
+ info!("running cache eviction");
+ let mut g = self.memory_cache.write().unwrap();
- info!(
- "done, {} freed",
- humansize::format_size(reduction, humansize::DECIMAL)
- );
+ // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now
+ let mut k = BTreeMap::new();
+ for (loc, entry) in g.iter() {
+ k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size));
+ }
+ let mut reduction = 0;
+ for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) {
+ g.remove(loc);
+ reduction += size;
+ }
+ self.memory_cache_size
+ .fetch_sub(reduction, Ordering::Relaxed);
+ drop(g);
+
+ info!(
+ "done, {} freed",
+ humansize::format_size(reduction, humansize::DECIMAL)
+ );
+ }
}