1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
/*
This file is part of jellything (https://codeberg.org/metamuffin/jellything)
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2026 metamuffin <metamuffin.org>
*/
mod helper;
use anyhow::{Context, Result, anyhow};
pub use helper::{EscapeKey, HashKey};
use jellykv::BlobStorage;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use std::{
any::Any,
collections::{BTreeMap, HashMap},
hash::{DefaultHasher, Hash, Hasher},
sync::{
Arc, LazyLock, Mutex, RwLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Instant,
};
const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; }
pub struct Cache {
storage: Box<dyn BlobStorage>,
memory_cache: RwLock<HashMap<String, InMemoryCacheEntry>>,
memory_cache_size: AtomicUsize,
max_memory_cache_size: usize,
}
pub struct InMemoryCacheEntry {
size: usize,
last_access: Instant,
object: Arc<dyn Any + Send + Sync + 'static>,
}
fn bucket(key: &str) -> usize {
let mut h = DefaultHasher::new();
key.hash(&mut h);
h.finish() as usize % CACHE_GENERATION_BUCKET_COUNT
}
impl Cache {
pub fn new(storage: Box<dyn BlobStorage>, max_memory_cache_size: usize) -> Self {
Self {
max_memory_cache_size,
storage,
memory_cache: HashMap::new().into(),
memory_cache_size: AtomicUsize::new(0),
}
}
pub fn cache(&self, key: &str, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<Vec<u8>> {
// we need a lock even if it exists since somebody might be still in the process of writing.
let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed));
let _guard = if already_within {
// TODO stupid hack to avoid deadlock for nested calls; not locking is fine but might cause double-generating
CACHE_GENERATION_LOCKS[bucket(key)].try_lock().ok()
} else {
CACHE_GENERATION_LOCKS[bucket(key)].lock().ok()
};
let out = match self.storage.read(&key)? {
Some(x) => x,
None => {
let value = generate()?;
self.storage.store(key, &value)?;
value
}
};
if !already_within {
WITHIN_CACHE_FILE.with(|a| a.swap(false, Ordering::Relaxed));
}
drop(_guard);
Ok(out)
}
pub fn read(&self, key: &str) -> Result<Option<Vec<u8>>> {
self.storage.read(key)
}
pub fn store(&self, key: String, generate: impl FnOnce() -> Result<Vec<u8>>) -> Result<String> {
self.cache(&key, generate)?;
Ok(key)
}
pub fn cache_memory<Fun, T>(
&self,
key: &str,
mut generate: Fun,
) -> Result<Arc<T>, anyhow::Error>
where
Fun: FnMut() -> Result<T, anyhow::Error>,
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
{
if !key.ends_with(".json") {
warn!("cache_memory key not ending in .json: {key:?}")
}
{
let mut g = self.memory_cache.write().unwrap();
if let Some(entry) = g.get_mut(key) {
entry.last_access = Instant::now();
let object = entry
.object
.clone()
.downcast::<T>()
.map_err(|_| anyhow!("inconsistent types for in-memory cache"))?;
return Ok(object);
}
}
let data = self.cache(&key, move || {
let object = generate()?;
Ok(serde_json::to_vec(&object)?)
})?;
let size = data.len();
let object = serde_json::from_slice::<T>(&data).context("decoding cache object")?;
let object = Arc::new(object);
{
let mut g = self.memory_cache.write().unwrap();
g.insert(
key.to_owned(),
InMemoryCacheEntry {
size,
last_access: Instant::now(),
object: object.clone(),
},
);
self.memory_cache_size.fetch_add(size, Ordering::Relaxed);
}
self.cleanup_cache();
Ok(object)
}
fn cleanup_cache(&self) {
let current_size = self.memory_cache_size.load(Ordering::Relaxed);
if current_size < self.max_memory_cache_size {
return;
}
info!("running cache eviction");
let mut g = self.memory_cache.write().unwrap();
// TODO: if two entries have *exactly* the same size, only one of the will be remove; this is fine for now
let mut k = BTreeMap::new();
for (loc, entry) in g.iter() {
k.insert(entry.last_access.elapsed(), (loc.to_owned(), entry.size));
}
let mut reduction = 0;
for (loc, size) in k.values().rev().take(k.len().div_ceil(2)) {
g.remove(loc);
reduction += size;
}
self.memory_cache_size
.fetch_sub(reduction, Ordering::Relaxed);
drop(g);
info!(
"done, {} freed",
humansize::format_size(reduction, humansize::DECIMAL)
);
}
}
|