1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
/*
This file is part of jellything (https://codeberg.org/metamuffin/jellything)
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
mod backends;
mod helper;
use crate::backends::{CacheStorage, init_backend};
use anyhow::{Context, Result, anyhow};
use log::{info, warn};
use serde::{Deserialize, Serialize};
use std::{
any::Any,
collections::{BTreeMap, HashMap},
hash::{DefaultHasher, Hash, Hasher},
path::PathBuf,
sync::{
Arc, LazyLock, Mutex, OnceLock, RwLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Instant,
};
pub use helper::{EscapeKey, HashKey};
#[derive(Debug, Deserialize)]
pub struct Config {
driver: String,
path: PathBuf,
max_in_memory_cache_size: usize,
}
const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
pub static CONF_PRELOAD: std::sync::Mutex<Option<Config>> = std::sync::Mutex::new(None);
static CONF: LazyLock<Config> = LazyLock::new(|| {
CONF_PRELOAD
.lock()
.unwrap()
.take()
.expect("cache config not preloaded. logic error")
});
static CACHE_STORE: OnceLock<Box<dyn CacheStorage>> = OnceLock::new();
pub fn init_cache() -> Result<()> {
CACHE_STORE
.set(init_backend().context("cache backend")?)
.map_err(|_| ())
.unwrap();
Ok(())
}
fn bucket(key: &str) -> usize {
let mut h = DefaultHasher::new();
key.hash(&mut h);
h.finish() as usize % CACHE_GENERATION_BUCKET_COUNT
}
pub fn cache(key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> {
// we need a lock even if it exists since somebody might be still in the process of writing.
let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
let _guard = if already_within {
// TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating
CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok()
} else {
CACHE_GENERATION_LOCKS[bucket(key)].lock().ok()
};
let store = CACHE_STORE.get().unwrap();
let out = match store.read(&key)? {
Some(x) => x,
None => {
let value = generate()?;
store.store(key.to_owned(), &value)?;
value
}
};
if !already_within {
WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
}
drop(_guard);
Ok(out)
}
pub fn cache_read(key: &str) -> Result<Option<Vec<u8>>> {
CACHE_STORE.get().unwrap().read(key)
}
pub fn cache_store(key: String, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<String> {
cache(&key, generate)?;
Ok(key)
}
pub struct InMemoryCacheEntry {
size: usize,
last_access: Instant,
object: Arc<dyn Any + Send + Sync + 'static>,
}
pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<String, InMemoryCacheEntry>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));
pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0);
pub fn cache_memory<Fun, T>(key: &str, mut generate: Fun) -> Result<Arc<T>, anyhow::Error>
where
Fun: FnMut() -> Result<T, anyhow::Error>,
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
{
if !key.ends_with(".json") {
warn!("cache_memory key not ending in .json: {key:?}")
}
{
let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
if let Some(entry) = g.get_mut(key) {
entry.last_access = Instant::now();
let object = entry
.object
.clone()
.downcast::<T>()
.map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
return Ok(object);
}
}
let data = cache(&key, move || {
let object = generate()?;
Ok(serde_json::to_vec(&object)?)
})?;
let size = data.len();
let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?;
let object = Arc::new(object);
{
let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
g.insert(
key.to_owned(),
InMemoryCacheEntry {
size,
last_access: Instant::now(),
object: object.clone(),
},
);
CACHE_IN_MEMORY_SIZE.fetch_add(size, Ordering::Relaxed);
}
cleanup_cache();
Ok(object)
}
pub fn cleanup_cache() {
let current_size = CACHE_IN_MEMORY_SIZE.load(Ordering::Relaxed);
if current_size < CONF.max_in_memory_cache_size {
return;
}
info!("running cache eviction");
let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap();
// TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now
let mut k = BTreeMap::new();
for (loc, entry) in g.iter() {
k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size));
}
let mut reduction = 0;
for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) {
g.remove(loc);
reduction += size;
}
CACHE_IN_MEMORY_SIZE.fetch_sub(reduction, Ordering::Relaxed);
drop(g);
info!(
"done, {} freed",
humansize::format_size(reduction, humansize::DECIMAL)
);
}
|