aboutsummaryrefslogtreecommitdiff
path: root/kv/src/rocksdb.rs
blob: 50e7e74441d89d81f8f8ddf4822b5fcb6df2c7a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*
    This file is part of jellything (https://codeberg.org/metamuffin/jellything)
    which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
    Copyright (C) 2026 metamuffin <metamuffin.org>
*/

use crate::{BlobStorage, Store, Transaction};
use anyhow::Result;
use humansize::{DECIMAL, SizeFormatter};
use rocksdb::{Direction, ErrorKind, IteratorMode, OptimisticTransactionDB};
use std::{
    path::Path,
    sync::atomic::{AtomicU64, Ordering},
};

pub static NUM_TXN_COMMIT: AtomicU64 = AtomicU64::new(0);
pub static NUM_TXN_ATTEMPT: AtomicU64 = AtomicU64::new(0);

pub fn new(path: &Path) -> Result<OptimisticTransactionDB> {
    Ok(OptimisticTransactionDB::open_default(path)?)
}

impl Store for OptimisticTransactionDB {
    fn transaction(&self, f: &mut dyn FnMut(&mut dyn Transaction) -> Result<()>) -> Result<()> {
        loop {
            let mut txn = self.transaction();
            NUM_TXN_ATTEMPT.fetch_add(1, Ordering::Relaxed);
            f(&mut txn)?;
            match txn.commit() {
                Ok(()) => break,
                Err(e) if e.kind() == ErrorKind::Busy => continue,
                Err(e) => return Err(e.into()),
            }
        }
        NUM_TXN_COMMIT.fetch_add(1, Ordering::Relaxed);
        Ok(())
    }
    fn debug_info(&self) -> Result<String> {
        let size = self.get_approximate_sizes(&[rocksdb::Range::new(&[0x00], &[0xff])])[0];
        let attempts = NUM_TXN_ATTEMPT.load(Ordering::Relaxed);
        let commits = NUM_TXN_COMMIT.load(Ordering::Relaxed);
        let retry_factor = attempts as f64 / commits as f64;
        Ok(format!(
            "transactions attempted:    {attempts:>12}\n\
             transactions commited:     {commits:>12}\n\
             transaction retry factor:  {retry_factor:>12.03}\n\
             approximate size on disk:  {:>12}\n",
            SizeFormatter::new(size, DECIMAL).to_string()
        ))
    }
}

impl Transaction for rocksdb::Transaction<'_, OptimisticTransactionDB> {
    fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
        Ok(self.put(key, value)?)
    }

    fn del(&mut self, key: &[u8]) -> Result<()> {
        Ok(self.delete(key)?)
    }
    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        Ok(self.get(key)?)
    }

    fn iter<'a>(
        &'a self,
        key: &[u8],
        reverse: bool,
    ) -> Result<Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a>> {
        let mut iter = self.iterator(IteratorMode::Start);
        iter.set_mode(IteratorMode::From(
            key,
            if reverse {
                Direction::Reverse
            } else {
                Direction::Forward
            },
        ));
        Ok(Box::new(iter.map(|e| {
            e.map(|(k, _)| k.into_vec()).map_err(|e| e.into())
        })))
    }
}

impl BlobStorage for OptimisticTransactionDB {
    fn store(&self, key: &str, value: &[u8]) -> Result<()> {
        Ok(self.put(key, value)?)
    }
    fn read(&self, key: &str) -> Result<Option<Vec<u8>>> {
        Ok(self.get(key)?)
    }
    fn debug_info(&self) -> Result<String> {
        let size = self.get_approximate_sizes(&[rocksdb::Range::new(&[0x00], &[0xff])])[0];
        Ok(format!(
            "approximate size on disk:  {:>12}\n",
            SizeFormatter::new(size, DECIMAL).to_string()
        ))
    }
}