aboutsummaryrefslogtreecommitdiff
path: root/cache/src/lib.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-11-30 12:32:44 +0100
committermetamuffin <metamuffin@disroot.org>2025-11-30 12:32:44 +0100
commit8174d129fbabd2d39323678d11d868893ddb429a (patch)
tree7979a528114cd5fb827f748f678a916e8e8eeddc /cache/src/lib.rs
parent5db15c323d76dca9ae71b0204d63dcb09fbbcbc5 (diff)
downloadjellything-8174d129fbabd2d39323678d11d868893ddb429a.tar
jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar.bz2
jellything-8174d129fbabd2d39323678d11d868893ddb429a.tar.zst
new sync cache
Diffstat (limited to 'cache/src/lib.rs')
-rw-r--r--cache/src/lib.rs250
1 files changed, 53 insertions, 197 deletions
diff --git a/cache/src/lib.rs b/cache/src/lib.rs
index 115741c..fbda2cf 100644
--- a/cache/src/lib.rs
+++ b/cache/src/lib.rs
@@ -3,37 +3,38 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use anyhow::{Context, anyhow};
-use base64::Engine;
-use log::{info, warn};
-use rand::random;
+pub mod backends;
+pub mod key;
+
+use crate::backends::{CacheStorage, filesystem::Filesystem};
+use anyhow::{Context, Result, anyhow};
+pub use key::*;
+use log::info;
use serde::{Deserialize, Serialize};
-use sha2::Sha512;
use std::{
any::Any,
collections::{BTreeMap, HashMap},
- fs::rename,
- future::Future,
- hash::{Hash, Hasher},
- io::{Seek, Write},
path::PathBuf,
sync::{
- Arc, LazyLock, RwLock,
+ Arc, LazyLock, Mutex, OnceLock, RwLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Instant,
};
-use tokio::{
- io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
- sync::Mutex,
-};
#[derive(Debug, Deserialize)]
pub struct Config {
path: PathBuf,
max_in_memory_cache_size: usize,
+ secret: String,
}
+const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
+pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
+ LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
+
+thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
+
pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None);
static CONF: LazyLock<Config> = LazyLock::new(|| {
CONF_PRELOAD
@@ -43,126 +44,50 @@ static CONF: LazyLock<Config> = LazyLock::new(|| {
.expect("cache config not preloaded. logic error")
});
-#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
-pub struct CachePath(pub PathBuf);
-impl CachePath {
- pub fn abs(&self) -> PathBuf {
- CONF.path.join(&self.0)
- }
-}
-
-pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) {
- use sha2::Digest;
- struct ShaHasher(Sha512);
- impl Hasher for ShaHasher {
- fn finish(&self) -> u64 {
- unreachable!()
- }
- fn write(&mut self, bytes: &[u8]) {
- self.0.update(bytes);
- }
- }
- let mut d = ShaHasher(sha2::Sha512::new());
- d.0.update(kind);
- d.0.update(b"\0");
- key.hash(&mut d);
-
- let d = d.0.finalize();
- let n =
- d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24);
- let fname = base64::engine::general_purpose::URL_SAFE.encode(d);
- let fname = &fname[..30]; // 180 bits
- let fname = format!("{}/{}", kind, fname);
- (n, CachePath(fname.into()))
-}
-
-const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
-pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
- LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
+static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new();
-pub async fn async_cache_file<Fun, Fut>(
- kind: &str,
- key: impl Hash,
- generate: Fun,
-) -> Result<CachePath, anyhow::Error>
-where
- Fun: FnOnce(tokio::fs::File) -> Fut,
- Fut: Future<Output = Result<(), anyhow::Error>>,
-{
- let (bucket, location) = cache_location(kind, key);
- let loc_abs = location.abs();
- // we need a lock even if it exists since somebody might be still in the process of writing.
- let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT]
- .lock()
- .await;
- let exists = tokio::fs::try_exists(&loc_abs)
- .await
- .context("unable to test for cache file existance")?;
- if !exists {
- let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>()));
- let f = tokio::fs::File::create(&temp_path)
- .await
- .context("creating new cache file")?;
- match generate(f).await {
- Ok(()) => (),
- Err(e) => {
- warn!("cache generation failed, unlinking entry");
- tokio::fs::remove_file(temp_path).await?;
- return Err(e);
- }
- }
- tokio::fs::create_dir_all(loc_abs.parent().unwrap())
- .await
- .context("create kind dir")?;
- tokio::fs::rename(temp_path, &loc_abs)
- .await
- .context("rename cache")?;
- }
- drop(_guard);
- Ok(location)
+pub fn init_cache() -> Result<()> {
+ CACHE_STORE
+ .set(Box::new(Filesystem::new(&CONF)))
+ .map_err(|_| ())
+ .unwrap();
+ Ok(())
}
-thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
-
-pub fn cache_file<Fun>(
- kind: &str,
- key: impl Hash,
- generate: Fun,
-) -> Result<CachePath, anyhow::Error>
-where
- Fun: FnOnce(std::fs::File) -> Result<(), anyhow::Error>,
-{
- let (bucket, location) = cache_location(kind, key);
- let loc_abs = location.abs();
+pub fn cache(key: CacheKey, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> {
// we need a lock even if it exists since somebody might be still in the process of writing.
let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
let _guard = if already_within {
- // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed
- CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT]
- .try_lock()
- .ok()
+ // TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating
+ CACHE_GENERATION_LOCKS[key.bucket()].try_lock().ok()
} else {
- Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock())
+ CACHE_GENERATION_LOCKS[key.bucket()].lock().ok()
};
- if !loc_abs.exists() {
- let temp_path = CONF.path.join(format!("temp-{:x}", random::<u128>()));
- let f = std::fs::File::create(&temp_path).context("creating new cache file")?;
- match generate(f) {
- Ok(()) => (),
- Err(e) => {
- warn!("cache generation failed, unlinking entry");
- std::fs::remove_file(temp_path)?;
- return Err(e);
- }
+
+ let store = CACHE_STORE.get().unwrap();
+
+ let out = match store.read(key)? {
+ Some(x) => x,
+ None => {
+ let value = generate()?;
+ store.store(key, &value)?;
+ value
}
- std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?;
- rename(temp_path, loc_abs).context("rename cache")?;
- }
+ };
+
if !already_within {
WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
}
drop(_guard);
- Ok(location)
+ Ok(out)
+}
+
+pub fn cache_read(key: CacheKey) -> Result<Option<Vec<u8>>> {
+ CACHE_STORE.get().unwrap().read(key)
+}
+pub fn cache_store(key: CacheKey, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<CacheKey> {
+ cache(key, generate)?;
+ Ok(key)
}
pub struct InMemoryCacheEntry {
@@ -170,23 +95,18 @@ pub struct InMemoryCacheEntry {
last_access: Instant,
object: Arc<dyn Any + Send + Sync + 'static>,
}
-pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> =
+pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<CacheKey, InMemoryCacheEntry>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));
pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0);
-pub fn cache_memory<Fun, T>(
- kind: &str,
- key: impl Hash,
- mut generate: Fun,
-) -> Result<Arc<T>, anyhow::Error>
+pub fn cache_memory<Fun, T>(key: CacheKey, mut generate: Fun) -> Result<Arc<T>, anyhow::Error>
where
Fun: FnMut() -> Result<T, anyhow::Error>,
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
{
- let (_, location) = cache_location(kind, &key);
{
let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- if let Some(entry) = g.get_mut(&location.abs()) {
+ if let Some(entry) = g.get_mut(&key) {
entry.last_access = Instant::now();
let object = entry
.object
@@ -197,82 +117,18 @@ where
}
}
- let location = cache_file(kind, &key, move |file| {
+ let data = cache(key, move || {
let object = generate()?;
- let mut file = std::io::BufWriter::new(file);
- serde_json::to_writer(&mut file, &object).context("encoding cache object")?;
- file.flush()?;
- Ok(())
+ Ok(serde_json::to_vec(&object)?)
})?;
- let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?);
- let object = serde_json::from_reader::<_, T>(&mut file).context("decoding cache object")?;
- let object = Arc::new(object);
- let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode
-
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- g.insert(
- location.abs(),
- InMemoryCacheEntry {
- size,
- last_access: Instant::now(),
- object: object.clone(),
- },
- );
- CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
- }
-
- cleanup_cache();
-
- Ok(object)
-}
-
-pub async fn async_cache_memory<Fun, Fut, T>(
- kind: &str,
- key: impl Hash,
- generate: Fun,
-) -> Result<Arc<T>, anyhow::Error>
-where
- Fun: FnOnce() -> Fut,
- Fut: Future<Output = Result<T, anyhow::Error>>,
- T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
-{
- let (_, location) = cache_location(kind, &key);
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- if let Some(entry) = g.get_mut(&location.abs()) {
- entry.last_access = Instant::now();
- let object = entry
- .object
- .clone()
- .downcast::<T>()
- .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
- return Ok(object);
- }
- }
-
- let location = async_cache_file(kind, &key, move |mut file| async move {
- let object = generate().await?;
- let data = serde_json::to_vec(&object).context("encoding cache object")?;
-
- file.write_all(&data).await?;
-
- Ok(())
- })
- .await?;
- let mut file = tokio::fs::File::open(&location.abs()).await?;
- let mut data = Vec::new();
- file.read_to_end(&mut data)
- .await
- .context("reading cache object")?;
+ let size = data.len();
let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?;
let object = Arc::new(object);
- let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode
{
let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
g.insert(
- location.abs(),
+ key,
InMemoryCacheEntry {
size,
last_access: Instant::now(),