aboutsummaryrefslogtreecommitdiff
path: root/base/src/cache.rs
diff options
context:
space:
mode:
Diffstat (limited to 'base/src/cache.rs')
-rw-r--r--base/src/cache.rs306
1 files changed, 0 insertions, 306 deletions
diff --git a/base/src/cache.rs b/base/src/cache.rs
deleted file mode 100644
index 02c42c8..0000000
--- a/base/src/cache.rs
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2025 metamuffin <metamuffin.org>
-*/
-use crate::CONF;
-use anyhow::{anyhow, Context};
-use base64::Engine;
-use bincode::{Decode, Encode};
-use log::{info, warn};
-use rand::random;
-use serde::Serialize;
-use sha2::Sha512;
-use std::{
- any::Any,
- collections::{BTreeMap, HashMap},
- fs::rename,
- future::Future,
- hash::{Hash, Hasher},
- io::{Seek, Write},
- path::PathBuf,
- sync::{
- atomic::{AtomicBool, AtomicUsize, Ordering},
- Arc, LazyLock, RwLock,
- },
- time::Instant,
-};
-use tokio::{
- io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
- sync::Mutex,
-};
-
-#[derive(Debug, Encode, Decode, Serialize)]
-pub struct CachePath(pub PathBuf);
-impl CachePath {
- pub fn abs(&self) -> PathBuf {
- CONF.cache_path.join(&self.0)
- }
-}
-
-pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) {
- use sha2::Digest;
- struct ShaHasher(Sha512);
- impl Hasher for ShaHasher {
- fn finish(&self) -> u64 {
- unreachable!()
- }
- fn write(&mut self, bytes: &[u8]) {
- self.0.update(bytes);
- }
- }
- let mut d = ShaHasher(sha2::Sha512::new());
- d.0.update(kind);
- d.0.update(b"\0");
- key.hash(&mut d);
-
- let d = d.0.finalize();
- let n =
- d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24);
- let fname = base64::engine::general_purpose::URL_SAFE.encode(d);
- let fname = &fname[..30]; // 180 bits
- let fname = format!("{}/{}", kind, fname);
- (n, CachePath(fname.into()))
-}
-
-const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
-pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
- LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
-
-pub async fn async_cache_file<Fun, Fut>(
- kind: &str,
- key: impl Hash,
- generate: Fun,
-) -> Result<CachePath, anyhow::Error>
-where
- Fun: FnOnce(tokio::fs::File) -> Fut,
- Fut: Future<Output = Result<(), anyhow::Error>>,
-{
- let (bucket, location) = cache_location(kind, key);
- let loc_abs = location.abs();
- // we need a lock even if it exists since somebody might be still in the process of writing.
- let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT]
- .lock()
- .await;
- let exists = tokio::fs::try_exists(&loc_abs)
- .await
- .context("unable to test for cache file existance")?;
- if !exists {
- let temp_path = CONF.cache_path.join(format!("temp-{:x}", random::<u128>()));
- let f = tokio::fs::File::create(&temp_path)
- .await
- .context("creating new cache file")?;
- match generate(f).await {
- Ok(()) => (),
- Err(e) => {
- warn!("cache generation failed, unlinking entry");
- tokio::fs::remove_file(temp_path).await?;
- return Err(e);
- }
- }
- tokio::fs::create_dir_all(loc_abs.parent().unwrap())
- .await
- .context("create kind dir")?;
- tokio::fs::rename(temp_path, &loc_abs)
- .await
- .context("rename cache")?;
- }
- drop(_guard);
- Ok(location)
-}
-
-thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
-
-pub fn cache_file<Fun>(
- kind: &str,
- key: impl Hash,
- mut generate: Fun,
-) -> Result<CachePath, anyhow::Error>
-where
- Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>,
-{
- let (bucket, location) = cache_location(kind, key);
- let loc_abs = location.abs();
- // we need a lock even if it exists since somebody might be still in the process of writing.
- let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
- let _guard = if already_within {
- // TODO stupid hack to avoid deadlock for nested cache_file. proper solution needed
- CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT]
- .try_lock()
- .ok()
- } else {
- Some(CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].blocking_lock())
- };
- if !loc_abs.exists() {
- let temp_path = CONF.cache_path.join(format!("temp-{:x}", random::<u128>()));
- let f = std::fs::File::create(&temp_path).context("creating new cache file")?;
- match generate(f) {
- Ok(()) => (),
- Err(e) => {
- warn!("cache generation failed, unlinking entry");
- std::fs::remove_file(temp_path)?;
- return Err(e);
- }
- }
- std::fs::create_dir_all(loc_abs.parent().unwrap()).context("create kind dir")?;
- rename(temp_path, loc_abs).context("rename cache")?;
- }
- if !already_within {
- WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
- }
- drop(_guard);
- Ok(location)
-}
-
-pub struct InMemoryCacheEntry {
- size: usize,
- last_access: Instant,
- object: Arc<dyn Any + Send + Sync + 'static>,
-}
-pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCacheEntry>>> =
- LazyLock::new(|| RwLock::new(HashMap::new()));
-pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0);
-
-pub fn cache_memory<Fun, T>(
- kind: &str,
- key: impl Hash,
- mut generate: Fun,
-) -> Result<Arc<T>, anyhow::Error>
-where
- Fun: FnMut() -> Result<T, anyhow::Error>,
- T: Encode + Decode + Send + Sync + 'static,
-{
- let (_, location) = cache_location(kind, &key);
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- if let Some(entry) = g.get_mut(&location.abs()) {
- entry.last_access = Instant::now();
- let object = entry
- .object
- .clone()
- .downcast::<T>()
- .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
- return Ok(object);
- }
- }
-
- let location = cache_file(kind, &key, move |file| {
- let object = generate()?;
- let mut file = std::io::BufWriter::new(file);
- bincode::encode_into_std_write(&object, &mut file, bincode::config::standard())
- .context("encoding cache object")?;
- file.flush()?;
- Ok(())
- })?;
- let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?);
- let object = bincode::decode_from_std_read::<T, _, _>(&mut file, bincode::config::standard())
- .context("decoding cache object")?;
- let object = Arc::new(object);
- let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode
-
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- g.insert(
- location.abs(),
- InMemoryCacheEntry {
- size,
- last_access: Instant::now(),
- object: object.clone(),
- },
- );
- CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
- }
-
- cleanup_cache();
-
- Ok(object)
-}
-
-pub async fn async_cache_memory<Fun, Fut, T>(
- kind: &str,
- key: impl Hash,
- generate: Fun,
-) -> Result<Arc<T>, anyhow::Error>
-where
- Fun: FnOnce() -> Fut,
- Fut: Future<Output = Result<T, anyhow::Error>>,
- T: Encode + Decode + Send + Sync + 'static,
-{
- let (_, location) = cache_location(kind, &key);
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- if let Some(entry) = g.get_mut(&location.abs()) {
- entry.last_access = Instant::now();
- let object = entry
- .object
- .clone()
- .downcast::<T>()
- .map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
- return Ok(object);
- }
- }
-
- let location = async_cache_file(kind, &key, move |mut file| async move {
- let object = generate().await?;
- let data = bincode::encode_to_vec(&object, bincode::config::standard())
- .context("encoding cache object")?;
-
- file.write_all(&data).await?;
-
- Ok(())
- })
- .await?;
- let mut file = tokio::fs::File::open(&location.abs()).await?;
- let mut data = Vec::new();
- file.read_to_end(&mut data)
- .await
- .context("reading cache object")?;
- let (object, _) = bincode::decode_from_slice::<T, _>(&data, bincode::config::standard())
- .context("decoding cache object")?;
- let object = Arc::new(object);
- let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode
-
- {
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
- g.insert(
- location.abs(),
- InMemoryCacheEntry {
- size,
- last_access: Instant::now(),
- object: object.clone(),
- },
- );
- CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
- }
-
- cleanup_cache();
-
- Ok(object)
-}
-
-pub fn cleanup_cache() {
- let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed);
- if current_size < CONF.max_in_memory_cache_size {
- return;
- }
- info!("running cache eviction");
- let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
-
- // TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now
- let mut k = BTreeMap::new();
- for (loc, entry) in g.iter() {
- k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size));
- }
- let mut reduction = 0;
- for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) {
- g.remove(loc);
- reduction += size;
- }
- CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed);
- drop(g);
-
- info!(
- "done, {} freed",
- humansize::format_size(reduction, humansize::DECIMAL)
- );
-}