diff options
| author | metamuffin <metamuffin@disroot.org> | 2025-11-30 12:32:44 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2025-11-30 12:32:44 +0100 |
| commit | 8174d129fbabd2d39323678d11d868893ddb429a (patch) | |
| tree | 7979a528114cd5fb827f748f678a916e8e8eeddc /cache | |
| parent | 5db15c323d76dca9ae71b0204d63dcb09fbbcbc5 (diff) | |
| download | jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar.bz2 jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar.zst | |
new sync cache
Diffstat (limited to 'cache')
| -rw-r--r-- | cache/Cargo.toml | 1 | ||||
| -rw-r--r-- | cache/src/backends/filesystem.rs | 51 | ||||
| -rw-r--r-- | cache/src/backends/mod.rs | 14 | ||||
| -rw-r--r-- | cache/src/key.rs | 83 | ||||
| -rw-r--r-- | cache/src/lib.rs | 250 |
5 files changed, 201 insertions, 198 deletions
diff --git a/cache/Cargo.toml b/cache/Cargo.toml index c171147..412b1f2 100644 --- a/cache/Cargo.toml +++ b/cache/Cargo.toml @@ -8,7 +8,6 @@ base64 = "0.22.1" humansize = "2.1.3" anyhow = "1.0.100" log = { workspace = true } -tokio = { workspace = true } sha2 = "0.10.9" rand = "0.9.2" serde = "1.0.228" diff --git a/cache/src/backends/filesystem.rs b/cache/src/backends/filesystem.rs new file mode 100644 index 0000000..39fb7a2 --- /dev/null +++ b/cache/src/backends/filesystem.rs @@ -0,0 +1,51 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ + +use crate::{CacheKey, Config, backends::CacheStorage}; +use anyhow::Result; +use base64::Engine; +use rand::random; +use std::{ + fs::{File, rename}, + io::{ErrorKind, Read, Write}, + path::PathBuf, +}; + +pub struct Filesystem(PathBuf); + +impl Filesystem { + pub fn new(config: &Config) -> Self { + Self(config.path.clone()) + } + fn path(&self, key: CacheKey) -> PathBuf { + let filename = base64::engine::general_purpose::URL_SAFE.encode(key.0); + let filename = &filename[..30]; // 180 bits + self.0.join(filename) + } + fn temp_path(&self) -> PathBuf { + self.0.join(format!("temp-{:016x}", random::<u128>())) + } +} + +impl CacheStorage for Filesystem { + fn store(&self, key: CacheKey, value: &[u8]) -> Result<()> { + let temp = self.temp_path(); + File::create(&temp)?.write_all(value)?; + rename(temp, self.path(key))?; + Ok(()) + } + fn read(&self, key: CacheKey) -> Result<Option<Vec<u8>>> { + match File::open(self.path(key)) { + Ok(mut f) => { + let mut data = Vec::new(); + f.read_to_end(&mut data)?; + Ok(Some(data)) + } + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e.into()), + } + } +} diff --git a/cache/src/backends/mod.rs b/cache/src/backends/mod.rs new file mode 100644 index 0000000..370c5ab --- /dev/null +++ b/cache/src/backends/mod.rs @@ -0,0 +1,14 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +pub mod filesystem; + +use crate::CacheKey; +use anyhow::Result; + +pub(crate) trait CacheStorage: Send + Sync + 'static { + fn store(&self, key: CacheKey, value: &[u8]) -> Result<()>; + fn read(&self, key: CacheKey) -> Result<Option<Vec<u8>>>; +} diff --git a/cache/src/key.rs b/cache/src/key.rs new file mode 100644 index 0000000..d8ca510 --- /dev/null +++ b/cache/src/key.rs @@ -0,0 +1,83 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use crate::{CACHE_GENERATION_BUCKET_COUNT, CONF}; +use anyhow::bail; +use base64::{Engine, prelude::BASE64_URL_SAFE}; +use sha2::Sha256; +use std::{ + fmt::Display, + hash::{Hash, Hasher}, + str::FromStr, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct CacheKey(pub [u8; 32]); + +impl CacheKey { + pub fn new(ty: CacheContentType, seed: impl Hash) -> Self { + use sha2::Digest; + struct ShaHasher(Sha256); + impl Hasher for ShaHasher { + fn finish(&self) -> u64 { + unreachable!() + } + fn write(&mut self, bytes: &[u8]) { + self.0.update(bytes); + } + } + let mut d = ShaHasher(sha2::Sha256::new()); + d.0.update(CONF.secret.as_bytes()); + seed.hash(&mut d); + let d = d.0.finalize(); + let mut key: [u8; 32] = d.as_slice().try_into().unwrap(); + key[0] = ty as u8; + Self(key) + } + pub fn new_json(seed: impl Hash) -> Self { + Self::new(CacheContentType::Json, seed) + } + pub fn new_image(seed: impl Hash) -> Self { + Self::new(CacheContentType::Image, seed) + } + pub fn content_type(&self) -> CacheContentType { + match self.0[0] { + 1 => CacheContentType::Image, + 2 => CacheContentType::Json, + _ => CacheContentType::Unknown, + } + } + pub(super) fn bucket(&self) -> usize { + (self.0[1] as usize + | ((self.0[2] as usize) << 8) + | ((self.0[3] as usize) << 16) + | ((self.0[4] as usize) << 24)) + % CACHE_GENERATION_BUCKET_COUNT + } +} + +impl Display for CacheKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&BASE64_URL_SAFE.encode(self.0)) + } +} +impl FromStr for CacheKey { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + let mut out = [0; 32]; + let size = BASE64_URL_SAFE.decode_slice(s, &mut out)?; + if size != out.len() { + bail!("cache key parse invalid size") + } + Ok(Self(out)) + } +} + +#[repr(u8)] +pub enum CacheContentType { + Unknown = 0, + Image = 1, + Json = 2, +} diff --git a/cache/src/lib.rs b/cache/src/lib.rs index 115741c..fbda2cf 100644 --- a/cache/src/lib.rs +++ b/cache/src/lib.rs @@ -3,37 +3,38 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use anyhow::{Context, anyhow}; -use base64::Engine; -use log::{info, warn}; -use rand::random; +pub mod backends; +pub mod key; + +use crate::backends::{CacheStorage, filesystem::Filesystem}; +use anyhow::{Context, Result, anyhow}; +pub use key::*; +use log::info; use serde::{Deserialize, Serialize}; -use sha2::Sha512; use std::{ any::Any, collections::{BTreeMap, HashMap}, - fs::rename, - future::Future, - hash::{Hash, Hasher}, - io::{Seek, Write}, path::PathBuf, sync::{ - Arc, LazyLock, RwLock, + Arc, LazyLock, Mutex, OnceLock, RwLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Instant, }; -use tokio::{ - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, - sync::Mutex, -}; #[derive(Debug, Deserialize)] pub struct Config { path: PathBuf, max_in_memory_cache_size: usize, + secret: String, } +const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; +pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = + LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); + +thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } + pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None); static CONF: LazyLock<Config> = LazyLock::new(|| { CONF_PRELOAD @@ -43,126 +44,50 @@ static CONF: LazyLock<Config> = LazyLock::new(|| { .expect("cache config not preloaded. logic error") }); -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct CachePath(pub PathBuf); -impl CachePath { - pub fn abs(&self) -> PathBuf { - CONF.path.join(&self.0) - } -} - -pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) { - use sha2::Digest; - struct ShaHasher(Sha512); - impl Hasher for ShaHasher { - fn finish(&self) -> u64 { - unreachable!() - } - fn write(&mut self, bytes: &[u8]) { - self.0.update(bytes); - } - } - let mut d = ShaHasher(sha2::Sha512::new()); - d.0.update(kind); - d.0.update(b"\0"); - key.hash(&mut d); - - let d = d.0.finalize(); - let n = - d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24); - let fname = base64::engine::general_purpose::URL_SAFE.encode(d); - let fname = &fname[..30]; // 180 bits - let fname = format!("{}/{}", kind, fname); - (n, CachePath(fname.into())) -} - -const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; -pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = - LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); +static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new(); -pub async fn async_cache_file<Fun, Fut>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<CachePath, anyhow::Error> -where - Fun: FnOnce(tokio::fs::File) -> Fut, - Fut: Future<Output = Result<(), anyhow::Error>>, -{ - let (bucket, location) = cache_location(kind, key); - let loc_abs = location.abs(); - // we need a lock even if it exists since somebody might be still in the process of writing. - let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] - .lock() - .await; - let exists = tokio::fs::try_exists(&loc_abs) - .await - .context("unable to test for cache file existance")?; - if !exists { - let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>())); - let f = tokio::fs::File::create(&temp_path) - .await - .context("creating new cache file")?; - match generate(f).await { - Ok(()) => (), - Err(e) => { - warn!("cache generation failed, unlinking entry"); - tokio::fs::remove_file(temp_path).await?; - return Err(e); - } - } - tokio::fs::create_dir_all(loc_abs.parent().unwrap()) - .await - .context("create kind dir")?; - tokio::fs::rename(temp_path, &loc_abs) - .await - .context("rename cache")?; - } - drop(_guard); - Ok(location) +pub fn init_cache() -> Result<()> { + CACHE_STORE + .set(Box::new(Filesystem::new(&CONF))) + .map_err(|_| ()) + .unwrap(); + Ok(()) } -thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } - -pub fn cache_file<Fun>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<CachePath, anyhow::Error> -where - Fun: FnOnce(std::fs::File) -> Result<(), anyhow::Error>, -{ - let (bucket, location) = cache_location(kind, key); - let loc_abs = location.abs(); +pub fn cache(key: CacheKey, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> { // we need a lock even if it exists since somebody might be still in the process of writing. let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); let _guard = if already_within { - // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed - CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] - .try_lock() - .ok() + // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating + CACHE_GENERATION_LOCKS[key.bucket()].try_lock().ok() } else { - Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock()) + CACHE_GENERATION_LOCKS[key.bucket()].lock().ok() }; - if !loc_abs.exists() { - let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>())); - let f = std::fs::File::create(&temp_path).context("creating new cache file")?; - match generate(f) { - Ok(()) => (), - Err(e) => { - warn!("cache generation failed, unlinking entry"); - std::fs::remove_file(temp_path)?; - return Err(e); - } + + let store = CACHE_STORE.get().unwrap(); + + let out = match store.read(key)? { + Some(x) => x, + None => { + let value = generate()?; + store.store(key, &value)?; + value } - std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?; - rename(temp_path, loc_abs).context("rename cache")?; - } + }; + if !already_within { WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); } drop(_guard); - Ok(location) + Ok(out) +} + +pub fn cache_read(key: CacheKey) -> Result<Option<Vec<u8>>> { + CACHE_STORE.get().unwrap().read(key) +} +pub fn cache_store(key: CacheKey, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<CacheKey> { + cache(key, generate)?; + Ok(key) } pub struct InMemoryCacheEntry { @@ -170,23 +95,18 @@ pub struct InMemoryCacheEntry { last_access: Instant, object: Arc<dyn Any + Send + Sync + 'static>, } -pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> = +pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<CacheKey, InMemoryCacheEntry>>> = LazyLock::new(|| RwLock::new(HashMap::new())); pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); -pub fn cache_memory<Fun, T>( - kind: &str, - key: impl Hash, - mut generate: Fun, -) -> Result<Arc<T>, anyhow::Error> +pub fn cache_memory<Fun, T>(key: CacheKey, mut generate: Fun) -> Result<Arc<T>, anyhow::Error> where Fun: FnMut() -> Result<T, anyhow::Error>, T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, { - let (_, location) = cache_location(kind, &key); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(&location.abs()) { + if let Some(entry) = g.get_mut(&key) { entry.last_access = Instant::now(); let object = entry .object @@ -197,82 +117,18 @@ where } } - let location = cache_file(kind, &key, move |file| { + let data = cache(key, move || { let object = generate()?; - let mut file = std::io::BufWriter::new(file); - serde_json::to_writer(&mut file, &object).context("encoding cache object")?; - file.flush()?; - Ok(()) + Ok(serde_json::to_vec(&object)?) })?; - let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?); - let object = serde_json::from_reader::<_, T>(&mut file).context("decoding cache object")?; - let object = Arc::new(object); - let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode - - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - g.insert( - location.abs(), - InMemoryCacheEntry { - size, - last_access: Instant::now(), - object: object.clone(), - }, - ); - CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); - } - - cleanup_cache(); - - Ok(object) -} - -pub async fn async_cache_memory<Fun, Fut, T>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<Arc<T>, anyhow::Error> -where - Fun: FnOnce() -> Fut, - Fut: Future<Output = Result<T, anyhow::Error>>, - T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, -{ - let (_, location) = cache_location(kind, &key); - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(&location.abs()) { - entry.last_access = Instant::now(); - let object = entry - .object - .clone() - .downcast::<T>() - .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; - return Ok(object); - } - } - - let location = async_cache_file(kind, &key, move |mut file| async move { - let object = generate().await?; - let data = serde_json::to_vec(&object).context("encoding cache object")?; - - file.write_all(&data).await?; - - Ok(()) - }) - .await?; - let mut file = tokio::fs::File::open(&location.abs()).await?; - let mut data = Vec::new(); - file.read_to_end(&mut data) - .await - .context("reading cache object")?; + let size = data.len(); let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?; let object = Arc::new(object); - let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( - location.abs(), + key, InMemoryCacheEntry { size, last_access: Instant::now(), |