diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-28 00:48:52 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-28 00:48:52 +0200 |
commit | 80d28b764c95891551e28c395783f5ff9d065743 (patch) | |
tree | f25898b1c939a939c63236ca4e8e843e81069947 /cache | |
parent | 335ba978dbaf203f3603a815147fd75dbf205723 (diff) | |
download | jellything-80d28b764c95891551e28c395783f5ff9d065743.tar jellything-80d28b764c95891551e28c395783f5ff9d065743.tar.bz2 jellything-80d28b764c95891551e28c395783f5ff9d065743.tar.zst |
start with splitting server
Diffstat (limited to 'cache')
-rw-r--r-- | cache/Cargo.toml | 15 | ||||
-rw-r--r-- | cache/src/lib.rs | 319 |
2 files changed, 334 insertions, 0 deletions
diff --git a/cache/Cargo.toml b/cache/Cargo.toml new file mode 100644 index 0000000..42f5e00 --- /dev/null +++ b/cache/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "jellycache" +version = "0.1.0" +edition = "2024" + +[dependencies] +base64 = "0.22.1" +bincode = "2.0.0-rc.3" +humansize = "2.1.3" +anyhow = "1.0.95" +log = { workspace = true } +tokio = { workspace = true } +sha2 = "0.10.8" +rand = "0.9.1" +serde = "1.0.217" diff --git a/cache/src/lib.rs b/cache/src/lib.rs new file mode 100644 index 0000000..2d2cfa3 --- /dev/null +++ b/cache/src/lib.rs @@ -0,0 +1,319 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use anyhow::{Context, anyhow}; +use base64::Engine; +use bincode::{Decode, Encode}; +use log::{info, warn}; +use rand::random; +use serde::{Deserialize, Serialize}; +use sha2::Sha512; +use std::{ + any::Any, + collections::{BTreeMap, HashMap}, + fs::rename, + future::Future, + hash::{Hash, Hasher}, + io::{Seek, Write}, + path::PathBuf, + sync::{ + Arc, LazyLock, RwLock, + atomic::{AtomicBool, AtomicUsize, Ordering}, + }, + time::Instant, +}; +use tokio::{ + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + sync::Mutex, +}; + +#[derive(Debug, Deserialize)] +pub struct Config { + path: PathBuf, + max_in_memory_cache_size: usize, +} + +static CONF: LazyLock<Config> = LazyLock::new(|| { + CONF_PRELOAD + .blocking_lock() + .take() + .expect("cache config not preloaded. logic error") +}); +static CONF_PRELOAD: Mutex<Option<Config>> = Mutex::const_new(None); + +#[derive(Debug, Encode, Decode, Serialize, Clone)] +pub struct CachePath(pub PathBuf); +impl CachePath { + pub fn abs(&self) -> PathBuf { + CONF.path.join(&self.0) + } +} + +pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) { + use sha2::Digest; + struct ShaHasher(Sha512); + impl Hasher for ShaHasher { + fn finish(&self) -> u64 { + unreachable!() + } + fn write(&mut self, bytes: &[u8]) { + self.0.update(bytes); + } + } + let mut d = ShaHasher(sha2::Sha512::new()); + d.0.update(kind); + d.0.update(b"\0"); + key.hash(&mut d); + + let d = d.0.finalize(); + let n = + d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24); + let fname = base64::engine::general_purpose::URL_SAFE.encode(d); + let fname = &fname[..30]; // 180 bits + let fname = format!("{}/{}", kind, fname); + (n, CachePath(fname.into())) +} + +const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; +pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = + LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); + +pub async fn async_cache_file<Fun, Fut>( + kind: &str, + key: impl Hash, + generate: Fun, +) -> Result<CachePath, anyhow::Error> +where + Fun: FnOnce(tokio::fs::File) -> Fut, + Fut: Future<Output = Result<(), anyhow::Error>>, +{ + let (bucket, location) = cache_location(kind, key); + let loc_abs = location.abs(); + // we need a lock even if it exists since somebody might be still in the process of writing. + let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] + .lock() + .await; + let exists = tokio::fs::try_exists(&loc_abs) + .await + .context("unable to test for cache file existance")?; + if !exists { + let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>())); + let f = tokio::fs::File::create(&temp_path) + .await + .context("creating new cache file")?; + match generate(f).await { + Ok(()) => (), + Err(e) => { + warn!("cache generation failed, unlinking entry"); + tokio::fs::remove_file(temp_path).await?; + return Err(e); + } + } + tokio::fs::create_dir_all(loc_abs.parent().unwrap()) + .await + .context("create kind dir")?; + tokio::fs::rename(temp_path, &loc_abs) + .await + .context("rename cache")?; + } + drop(_guard); + Ok(location) +} + +thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } + +pub fn cache_file<Fun>( + kind: &str, + key: impl Hash, + mut generate: Fun, +) -> Result<CachePath, anyhow::Error> +where + Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, +{ + let (bucket, location) = cache_location(kind, key); + let loc_abs = location.abs(); + // we need a lock even if it exists since somebody might be still in the process of writing. + let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); + let _guard = if already_within { + // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed + CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] + .try_lock() + .ok() + } else { + Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock()) + }; + if !loc_abs.exists() { + let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>())); + let f = std::fs::File::create(&temp_path).context("creating new cache file")?; + match generate(f) { + Ok(()) => (), + Err(e) => { + warn!("cache generation failed, unlinking entry"); + std::fs::remove_file(temp_path)?; + return Err(e); + } + } + std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?; + rename(temp_path, loc_abs).context("rename cache")?; + } + if !already_within { + WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); + } + drop(_guard); + Ok(location) +} + +pub struct InMemoryCacheEntry { + size: usize, + last_access: Instant, + object: Arc<dyn Any + Send + Sync + 'static>, +} +pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> = + LazyLock::new(|| RwLock::new(HashMap::new())); +pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); + +pub fn cache_memory<Fun, T>( + kind: &str, + key: impl Hash, + mut generate: Fun, +) -> Result<Arc<T>, anyhow::Error> +where + Fun: FnMut() -> Result<T, anyhow::Error>, + T: Encode + Decode + Send + Sync + 'static, +{ + let (_, location) = cache_location(kind, &key); + { + let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); + if let Some(entry) = g.get_mut(&location.abs()) { + entry.last_access = Instant::now(); + let object = entry + .object + .clone() + .downcast::<T>() + .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; + return Ok(object); + } + } + + let location = cache_file(kind, &key, move |file| { + let object = generate()?; + let mut file = std::io::BufWriter::new(file); + bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) + .context("encoding cache object")?; + file.flush()?; + Ok(()) + })?; + let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?); + let object = bincode::decode_from_std_read::<T, _, _>(&mut file, bincode::config::standard()) + .context("decoding cache object")?; + let object = Arc::new(object); + let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode + + { + let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); + g.insert( + location.abs(), + InMemoryCacheEntry { + size, + last_access: Instant::now(), + object: object.clone(), + }, + ); + CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); + } + + cleanup_cache(); + + Ok(object) +} + +pub async fn async_cache_memory<Fun, Fut, T>( + kind: &str, + key: impl Hash, + generate: Fun, +) -> Result<Arc<T>, anyhow::Error> +where + Fun: FnOnce() -> Fut, + Fut: Future<Output = Result<T, anyhow::Error>>, + T: Encode + Decode + Send + Sync + 'static, +{ + let (_, location) = cache_location(kind, &key); + { + let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); + if let Some(entry) = g.get_mut(&location.abs()) { + entry.last_access = Instant::now(); + let object = entry + .object + .clone() + .downcast::<T>() + .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; + return Ok(object); + } + } + + let location = async_cache_file(kind, &key, move |mut file| async move { + let object = generate().await?; + let data = bincode::encode_to_vec(&object, bincode::config::standard()) + .context("encoding cache object")?; + + file.write_all(&data).await?; + + Ok(()) + }) + .await?; + let mut file = tokio::fs::File::open(&location.abs()).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data) + .await + .context("reading cache object")?; + let (object, _) = bincode::decode_from_slice::<T, _>(&data, bincode::config::standard()) + .context("decoding cache object")?; + let object = Arc::new(object); + let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode + + { + let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); + g.insert( + location.abs(), + InMemoryCacheEntry { + size, + last_access: Instant::now(), + object: object.clone(), + }, + ); + CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); + } + + cleanup_cache(); + + Ok(object) +} + +pub fn cleanup_cache() { + let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed); + if current_size < CONF.max_in_memory_cache_size { + return; + } + info!("running cache eviction"); + let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); + + // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now + let mut k = BTreeMap::new(); + for (loc, entry) in g.iter() { + k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size)); + } + let mut reduction = 0; + for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) { + g.remove(loc); + reduction += size; + } + CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed); + drop(g); + + info!( + "done, {} freed", + humansize::format_size(reduction, humansize::DECIMAL) + ); +} |