/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ use crate::{BlobStorage, Store, Transaction}; use anyhow::Result; use humansize::{DECIMAL, SizeFormatter}; use rocksdb::{Direction, ErrorKind, IteratorMode, OptimisticTransactionDB}; use std::{ path::Path, sync::atomic::{AtomicU64, Ordering}, }; pub static NUM_TXN_COMMIT: AtomicU64 = AtomicU64::new(0); pub static NUM_TXN_ATTEMPT: AtomicU64 = AtomicU64::new(0); pub fn new(path: &Path) -> Result { Ok(OptimisticTransactionDB::open_default(path)?) } impl Store for OptimisticTransactionDB { fn transaction(&self, f: &mut dyn FnMut(&mut dyn Transaction) -> Result<()>) -> Result<()> { loop { let mut txn = self.transaction(); NUM_TXN_ATTEMPT.fetch_add(1, Ordering::Relaxed); f(&mut txn)?; match txn.commit() { Ok(()) => break, Err(e) if e.kind() == ErrorKind::Busy => continue, Err(e) => return Err(e.into()), } } NUM_TXN_COMMIT.fetch_add(1, Ordering::Relaxed); Ok(()) } fn debug_info(&self) -> Result { let size = self.get_approximate_sizes(&[rocksdb::Range::new(&[0x00], &[0xff])])[0]; let attempts = NUM_TXN_ATTEMPT.load(Ordering::Relaxed); let commits = NUM_TXN_COMMIT.load(Ordering::Relaxed); let retry_factor = attempts as f64 / commits as f64; Ok(format!( "transactions attempted: {attempts:>12}\n\ transactions commited: {commits:>12}\n\ transaction retry factor: {retry_factor:>12.03}\n\ approximate size on disk: {:>12}\n", SizeFormatter::new(size, DECIMAL).to_string() )) } } impl Transaction for rocksdb::Transaction<'_, OptimisticTransactionDB> { fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> { Ok(self.put(key, value)?) } fn del(&mut self, key: &[u8]) -> Result<()> { Ok(self.delete(key)?) } fn get(&self, key: &[u8]) -> Result>> { Ok(self.get(key)?) } fn iter<'a>( &'a self, key: &[u8], reverse: bool, ) -> Result>> + 'a>> { let mut iter = self.iterator(IteratorMode::Start); iter.set_mode(IteratorMode::From( key, if reverse { Direction::Reverse } else { Direction::Forward }, )); Ok(Box::new(iter.map(|e| { e.map(|(k, _)| k.into_vec()).map_err(|e| e.into()) }))) } } impl BlobStorage for OptimisticTransactionDB { fn store(&self, key: &str, value: &[u8]) -> Result<()> { Ok(self.put(key, value)?) } fn read(&self, key: &str) -> Result>> { Ok(self.get(key)?) } fn debug_info(&self) -> Result { let size = self.get_approximate_sizes(&[rocksdb::Range::new(&[0x00], &[0xff])])[0]; Ok(format!( "approximate size on disk: {:>12}\n", SizeFormatter::new(size, DECIMAL).to_string() )) } }