aboutsummaryrefslogtreecommitdiff
path: root/cache
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-28 00:48:52 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-28 00:48:52 +0200
commit80d28b764c95891551e28c395783f5ff9d065743 (patch)
treef25898b1c939a939c63236ca4e8e843e81069947 /cache
parent335ba978dbaf203f3603a815147fd75dbf205723 (diff)
downloadjellything-80d28b764c95891551e28c395783f5ff9d065743.tar
jellything-80d28b764c95891551e28c395783f5ff9d065743.tar.bz2
jellything-80d28b764c95891551e28c395783f5ff9d065743.tar.zst
start with splitting server
Diffstat (limited to 'cache')
-rw-r--r--cache/Cargo.toml15
-rw-r--r--cache/src/lib.rs319
2 files changed, 334 insertions, 0 deletions
diff --git a/cache/Cargo.toml b/cache/Cargo.toml
new file mode 100644
index 0000000..42f5e00
--- /dev/null
+++ b/cache/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "jellycache"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+base64 = "0.22.1"
+bincode = "2.0.0-rc.3"
+humansize = "2.1.3"
+anyhow = "1.0.95"
+log = { workspace = true }
+tokio = { workspace = true }
+sha2 = "0.10.8"
+rand = "0.9.1"
+serde = "1.0.217"
diff --git a/cache/src/lib.rs b/cache/src/lib.rs
new file mode 100644
index 0000000..2d2cfa3
--- /dev/null
+++ b/cache/src/lib.rs
@@ -0,0 +1,319 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use anyhow::{Context, anyhow};
+use base64::Engine;
+use bincode::{Decode, Encode};
+use log::{info, warn};
+use rand::random;
+use serde::{Deserialize, Serialize};
+use sha2::Sha512;
+use std::{
+ any::Any,
+ collections::{BTreeMap, HashMap},
+ fs::rename,
+ future::Future,
+ hash::{Hash, Hasher},
+ io::{Seek, Write},
+ path::PathBuf,
+ sync::{
+ Arc, LazyLock, RwLock,
+ atomic::{AtomicBool, AtomicUsize, Ordering},
+ },
+ time::Instant,
+};
+use tokio::{
+ io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
+ sync::Mutex,
+};
+
+#[derive(Debug, Deserialize)]
+pub struct Config {
+ path: PathBuf,
+ max_in_memory_cache_size: usize,
+}
+
+static CONF: LazyLock<Config> = LazyLock::new(|| {
+ CONF_PRELOAD
+ .blocking_lock()
+ .take()
+ .expect("cache config not preloaded. logic error")
+});
+static CONF_PRELOAD: Mutex<Option<Config>> = Mutex::const_new(None);
+
+#[derive(Debug, Encode, Decode, Serialize, Clone)]
+pub struct CachePath(pub PathBuf);
+impl CachePath {
+ pub fn abs(&self) -> PathBuf {
+ CONF.path.join(&self.0)
+ }
+}
+
+pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) {
+ use sha2::Digest;
+ struct ShaHasher(Sha512);
+ impl Hasher for ShaHasher {
+ fn finish(&self) -> u64 {
+ unreachable!()
+ }
+ fn write(&mut self, bytes: &[u8]) {
+ self.0.update(bytes);
+ }
+ }
+ let mut d = ShaHasher(sha2::Sha512::new());
+ d.0.update(kind);
+ d.0.update(b"\0");
+ key.hash(&mut d);
+
+ let d = d.0.finalize();
+ let n =
+ d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24);
+ let fname = base64::engine::general_purpose::URL_SAFE.encode(d);
+ let fname = &fname[..30]; // 180 bits
+ let fname = format!("{}/{}", kind, fname);
+ (n, CachePath(fname.into()))
+}
+
+const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
+pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
+ LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
+
+pub async fn async_cache_file<Fun, Fut>(
+ kind: &str,
+ key: impl Hash,
+ generate: Fun,
+) -> Result<CachePath, anyhow::Error>
+where
+ Fun: FnOnce(tokio::fs::File) -> Fut,
+ Fut: Future<Output = Result<(), anyhow::Error>>,
+{
+ let (bucket, location) = cache_location(kind, key);
+ let loc_abs = location.abs();
+ // we need a lock even if it exists since somebody might be still in the process of writing.
+ let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT]
+ .lock()
+ .await;
+ let exists = tokio::fs::try_exists(&loc_abs)
+ .await
+ .context("unable to test for cache file existance")?;
+ if !exists {
+ let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>()));
+ let f = tokio::fs::File::create(&temp_path)
+ .await
+ .context("creating new cache file")?;
+ match generate(f).await {
+ Ok(()) => (),
+ Err(e) => {
+ warn!("cache generation failed, unlinking entry");
+ tokio::fs::remove_file(temp_path).await?;
+ return Err(e);
+ }
+ }
+ tokio::fs::create_dir_all(loc_abs.parent().unwrap())
+ .await
+ .context("create kind dir")?;
+ tokio::fs::rename(temp_path, &loc_abs)
+ .await
+ .context("rename cache")?;
+ }
+ drop(_guard);
+ Ok(location)
+}
+
+thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
+
+pub fn cache_file<Fun>(
+ kind: &str,
+ key: impl Hash,
+ mut generate: Fun,
+) -> Result<CachePath, anyhow::Error>
+where
+ Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>,
+{
+ let (bucket, location) = cache_location(kind, key);
+ let loc_abs = location.abs();
+ // we need a lock even if it exists since somebody might be still in the process of writing.
+ let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
+ let _guard = if already_within {
+ // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed
+ CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT]
+ .try_lock()
+ .ok()
+ } else {
+ Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock())
+ };
+ if !loc_abs.exists() {
+ let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>()));
+ let f = std::fs::File::create(&temp_path).context("creating new cache file")?;
+ match generate(f) {
+ Ok(()) => (),
+ Err(e) => {
+ warn!("cache generation failed, unlinking entry");
+ std::fs::remove_file(temp_path)?;
+ return Err(e);
+ }
+ }
+ std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?;
+ rename(temp_path, loc_abs).context("rename cache")?;
+ }
+ if !already_within {
+ WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
+ }
+ drop(_guard);
+ Ok(location)
+}
+
+pub struct InMemoryCacheEntry {
+ size: usize,
+ last_access: Instant,
+ object: Arc<dyn Any + Send + Sync + 'static>,
+}
+pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> =
+ LazyLock::new(|| RwLock::new(HashMap::new()));
+pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0);
+
+pub fn cache_memory<Fun, T>(
+ kind: &str,
+ key: impl Hash,
+ mut generate: Fun,
+) -> Result<Arc<T>, anyhow::Error>
+where
+ Fun: FnMut() -> Result<T, anyhow::Error>,
+ T: Encode + Decode + Send + Sync + 'static,
+{
+ let (_, location) = cache_location(kind, &key);
+ {
+ let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
+ if let Some(entry) = g.get_mut(&location.abs()) {
+ entry.last_access = Instant::now();
+ let object = entry
+ .object
+ .clone()
+ .downcast::<T>()
+ .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
+ return Ok(object);
+ }
+ }
+
+ let location = cache_file(kind, &key, move |file| {
+ let object = generate()?;
+ let mut file = std::io::BufWriter::new(file);
+ bincode::encode_into_std_write(&object, &mut file, bincode::config::standard())
+ .context("encoding cache object")?;
+ file.flush()?;
+ Ok(())
+ })?;
+ let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?);
+ let object = bincode::decode_from_std_read::<T, _, _>(&mut file, bincode::config::standard())
+ .context("decoding cache object")?;
+ let object = Arc::new(object);
+ let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode
+
+ {
+ let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
+ g.insert(
+ location.abs(),
+ InMemoryCacheEntry {
+ size,
+ last_access: Instant::now(),
+ object: object.clone(),
+ },
+ );
+ CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
+ }
+
+ cleanup_cache();
+
+ Ok(object)
+}
+
+pub async fn async_cache_memory<Fun, Fut, T>(
+ kind: &str,
+ key: impl Hash,
+ generate: Fun,
+) -> Result<Arc<T>, anyhow::Error>
+where
+ Fun: FnOnce() -> Fut,
+ Fut: Future<Output = Result<T, anyhow::Error>>,
+ T: Encode + Decode + Send + Sync + 'static,
+{
+ let (_, location) = cache_location(kind, &key);
+ {
+ let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
+ if let Some(entry) = g.get_mut(&location.abs()) {
+ entry.last_access = Instant::now();
+ let object = entry
+ .object
+ .clone()
+ .downcast::<T>()
+ .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
+ return Ok(object);
+ }
+ }
+
+ let location = async_cache_file(kind, &key, move |mut file| async move {
+ let object = generate().await?;
+ let data = bincode::encode_to_vec(&object, bincode::config::standard())
+ .context("encoding cache object")?;
+
+ file.write_all(&data).await?;
+
+ Ok(())
+ })
+ .await?;
+ let mut file = tokio::fs::File::open(&location.abs()).await?;
+ let mut data = Vec::new();
+ file.read_to_end(&mut data)
+ .await
+ .context("reading cache object")?;
+ let (object, _) = bincode::decode_from_slice::<T, _>(&data, bincode::config::standard())
+ .context("decoding cache object")?;
+ let object = Arc::new(object);
+ let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode
+
+ {
+ let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
+ g.insert(
+ location.abs(),
+ InMemoryCacheEntry {
+ size,
+ last_access: Instant::now(),
+ object: object.clone(),
+ },
+ );
+ CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
+ }
+
+ cleanup_cache();
+
+ Ok(object)
+}
+
+pub fn cleanup_cache() {
+ let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed);
+ if current_size < CONF.max_in_memory_cache_size {
+ return;
+ }
+ info!("running cache eviction");
+ let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
+
+ // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now
+ let mut k = BTreeMap::new();
+ for (loc, entry) in g.iter() {
+ k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size));
+ }
+ let mut reduction = 0;
+ for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) {
+ g.remove(loc);
+ reduction += size;
+ }
+ CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed);
+ drop(g);
+
+ info!(
+ "done, {} freed",
+ humansize::format_size(reduction, humansize::DECIMAL)
+ );
+}