diff options
| author | metamuffin <metamuffin@disroot.org> | 2025-11-30 12:32:44 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2025-11-30 12:32:44 +0100 |
| commit | 8174d129fbabd2d39323678d11d868893ddb429a (patch) | |
| tree | 7979a528114cd5fb827f748f678a916e8e8eeddc /cache/src/lib.rs | |
| parent | 5db15c323d76dca9ae71b0204d63dcb09fbbcbc5 (diff) | |
| download | jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar.bz2 jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar.zst | |
new sync cache
Diffstat (limited to 'cache/src/lib.rs')
| -rw-r--r-- | cache/src/lib.rs | 250 |
1 files changed, 53 insertions, 197 deletions
diff --git a/cache/src/lib.rs b/cache/src/lib.rs index 115741c..fbda2cf 100644 --- a/cache/src/lib.rs +++ b/cache/src/lib.rs @@ -3,37 +3,38 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use anyhow::{Context, anyhow}; -use base64::Engine; -use log::{info, warn}; -use rand::random; +pub mod backends; +pub mod key; + +use crate::backends::{CacheStorage, filesystem::Filesystem}; +use anyhow::{Context, Result, anyhow}; +pub use key::*; +use log::info; use serde::{Deserialize, Serialize}; -use sha2::Sha512; use std::{ any::Any, collections::{BTreeMap, HashMap}, - fs::rename, - future::Future, - hash::{Hash, Hasher}, - io::{Seek, Write}, path::PathBuf, sync::{ - Arc, LazyLock, RwLock, + Arc, LazyLock, Mutex, OnceLock, RwLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Instant, }; -use tokio::{ - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, - sync::Mutex, -}; #[derive(Debug, Deserialize)] pub struct Config { path: PathBuf, max_in_memory_cache_size: usize, + secret: String, } +const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; +pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = + LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); + +thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } + pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None); static CONF: LazyLock<Config> = LazyLock::new(|| { CONF_PRELOAD @@ -43,126 +44,50 @@ static CONF: LazyLock<Config> = LazyLock::new(|| { .expect("cache config not preloaded. logic error") }); -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct CachePath(pub PathBuf); -impl CachePath { - pub fn abs(&self) -> PathBuf { - CONF.path.join(&self.0) - } -} - -pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) { - use sha2::Digest; - struct ShaHasher(Sha512); - impl Hasher for ShaHasher { - fn finish(&self) -> u64 { - unreachable!() - } - fn write(&mut self, bytes: &[u8]) { - self.0.update(bytes); - } - } - let mut d = ShaHasher(sha2::Sha512::new()); - d.0.update(kind); - d.0.update(b"\0"); - key.hash(&mut d); - - let d = d.0.finalize(); - let n = - d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24); - let fname = base64::engine::general_purpose::URL_SAFE.encode(d); - let fname = &fname[..30]; // 180 bits - let fname = format!("{}/{}", kind, fname); - (n, CachePath(fname.into())) -} - -const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; -pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = - LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); +static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new(); -pub async fn async_cache_file<Fun, Fut>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<CachePath, anyhow::Error> -where - Fun: FnOnce(tokio::fs::File) -> Fut, - Fut: Future<Output = Result<(), anyhow::Error>>, -{ - let (bucket, location) = cache_location(kind, key); - let loc_abs = location.abs(); - // we need a lock even if it exists since somebody might be still in the process of writing. - let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] - .lock() - .await; - let exists = tokio::fs::try_exists(&loc_abs) - .await - .context("unable to test for cache file existance")?; - if !exists { - let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>())); - let f = tokio::fs::File::create(&temp_path) - .await - .context("creating new cache file")?; - match generate(f).await { - Ok(()) => (), - Err(e) => { - warn!("cache generation failed, unlinking entry"); - tokio::fs::remove_file(temp_path).await?; - return Err(e); - } - } - tokio::fs::create_dir_all(loc_abs.parent().unwrap()) - .await - .context("create kind dir")?; - tokio::fs::rename(temp_path, &loc_abs) - .await - .context("rename cache")?; - } - drop(_guard); - Ok(location) +pub fn init_cache() -> Result<()> { + CACHE_STORE + .set(Box::new(Filesystem::new(&CONF))) + .map_err(|_| ()) + .unwrap(); + Ok(()) } -thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } - -pub fn cache_file<Fun>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<CachePath, anyhow::Error> -where - Fun: FnOnce(std::fs::File) -> Result<(), anyhow::Error>, -{ - let (bucket, location) = cache_location(kind, key); - let loc_abs = location.abs(); +pub fn cache(key: CacheKey, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> { // we need a lock even if it exists since somebody might be still in the process of writing. let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); let _guard = if already_within { - // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed - CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] - .try_lock() - .ok() + // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating + CACHE_GENERATION_LOCKS[key.bucket()].try_lock().ok() } else { - Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock()) + CACHE_GENERATION_LOCKS[key.bucket()].lock().ok() }; - if !loc_abs.exists() { - let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>())); - let f = std::fs::File::create(&temp_path).context("creating new cache file")?; - match generate(f) { - Ok(()) => (), - Err(e) => { - warn!("cache generation failed, unlinking entry"); - std::fs::remove_file(temp_path)?; - return Err(e); - } + + let store = CACHE_STORE.get().unwrap(); + + let out = match store.read(key)? { + Some(x) => x, + None => { + let value = generate()?; + store.store(key, &value)?; + value } - std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?; - rename(temp_path, loc_abs).context("rename cache")?; - } + }; + if !already_within { WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed)); } drop(_guard); - Ok(location) + Ok(out) +} + +pub fn cache_read(key: CacheKey) -> Result<Option<Vec<u8>>> { + CACHE_STORE.get().unwrap().read(key) +} +pub fn cache_store(key: CacheKey, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<CacheKey> { + cache(key, generate)?; + Ok(key) } pub struct InMemoryCacheEntry { @@ -170,23 +95,18 @@ pub struct InMemoryCacheEntry { last_access: Instant, object: Arc<dyn Any + Send + Sync + 'static>, } -pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> = +pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<CacheKey, InMemoryCacheEntry>>> = LazyLock::new(|| RwLock::new(HashMap::new())); pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); -pub fn cache_memory<Fun, T>( - kind: &str, - key: impl Hash, - mut generate: Fun, -) -> Result<Arc<T>, anyhow::Error> +pub fn cache_memory<Fun, T>(key: CacheKey, mut generate: Fun) -> Result<Arc<T>, anyhow::Error> where Fun: FnMut() -> Result<T, anyhow::Error>, T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, { - let (_, location) = cache_location(kind, &key); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(&location.abs()) { + if let Some(entry) = g.get_mut(&key) { entry.last_access = Instant::now(); let object = entry .object @@ -197,82 +117,18 @@ where } } - let location = cache_file(kind, &key, move |file| { + let data = cache(key, move || { let object = generate()?; - let mut file = std::io::BufWriter::new(file); - serde_json::to_writer(&mut file, &object).context("encoding cache object")?; - file.flush()?; - Ok(()) + Ok(serde_json::to_vec(&object)?) })?; - let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?); - let object = serde_json::from_reader::<_, T>(&mut file).context("decoding cache object")?; - let object = Arc::new(object); - let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode - - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - g.insert( - location.abs(), - InMemoryCacheEntry { - size, - last_access: Instant::now(), - object: object.clone(), - }, - ); - CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed); - } - - cleanup_cache(); - - Ok(object) -} - -pub async fn async_cache_memory<Fun, Fut, T>( - kind: &str, - key: impl Hash, - generate: Fun, -) -> Result<Arc<T>, anyhow::Error> -where - Fun: FnOnce() -> Fut, - Fut: Future<Output = Result<T, anyhow::Error>>, - T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, -{ - let (_, location) = cache_location(kind, &key); - { - let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); - if let Some(entry) = g.get_mut(&location.abs()) { - entry.last_access = Instant::now(); - let object = entry - .object - .clone() - .downcast::<T>() - .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?; - return Ok(object); - } - } - - let location = async_cache_file(kind, &key, move |mut file| async move { - let object = generate().await?; - let data = serde_json::to_vec(&object).context("encoding cache object")?; - - file.write_all(&data).await?; - - Ok(()) - }) - .await?; - let mut file = tokio::fs::File::open(&location.abs()).await?; - let mut data = Vec::new(); - file.read_to_end(&mut data) - .await - .context("reading cache object")?; + let size = data.len(); let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?; let object = Arc::new(object); - let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); g.insert( - location.abs(), + key, InMemoryCacheEntry { size, last_access: Instant::now(), |