From da985cc06e4caa7501222dbf54f212536fd42b0c Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 18 Dec 2025 18:45:53 +0100 Subject: transaction interface --- database/src/backends/rocksdb.rs | 57 +++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 15 deletions(-) (limited to 'database/src/backends/rocksdb.rs') diff --git a/database/src/backends/rocksdb.rs b/database/src/backends/rocksdb.rs index 31229a9..cdcb60a 100644 --- a/database/src/backends/rocksdb.rs +++ b/database/src/backends/rocksdb.rs @@ -4,41 +4,68 @@ Copyright (C) 2025 metamuffin */ -use std::path::Path; - -use crate::backends::KV; +use crate::backends::{Db, ReadTransaction, WriteTransaction, WriteTxnFunction}; use anyhow::Result; -use rocksdb::DB; +use rocksdb::{ErrorKind, OptimisticTransactionDB}; +use std::path::Path; pub struct Rocksdb { - db: DB, + db: OptimisticTransactionDB, } impl Rocksdb { pub fn new(path: &Path) -> Result { Ok(Self { - db: DB::open_default(path)?, + db: OptimisticTransactionDB::open_default(path)?, }) } } -impl KV for Rocksdb { - fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { - Ok(self.db.put(key, value)?) +impl Db for Rocksdb { + fn write_transaction(&self, f: &mut WriteTxnFunction) -> Result<()> { + loop { + let mut txn = self.db.transaction(); + f(&mut txn)?; + match txn.commit() { + Ok(()) => break Ok(()), + Err(e) if e.kind() == ErrorKind::Busy => continue, + Err(e) => return Err(e.into()), + } + } } - fn get(&self, key: &[u8]) -> Result>> { - Ok(self.db.get(key)?) + fn read_transaction(&self, f: &mut super::ReadTxnFunction) -> Result<()> { + loop { + let txn = self.db.transaction(); + f(&txn)?; + match txn.commit() { + Ok(()) => break Ok(()), + Err(e) if e.kind() == ErrorKind::Busy => continue, + Err(e) => return Err(e.into()), + } + } } - fn del(&self, key: &[u8]) -> Result<()> { - Ok(self.db.delete(key)?) +} +impl WriteTransaction for rocksdb::Transaction<'_, OptimisticTransactionDB> { + fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + Ok(self.put(key, value)?) + } + + fn del(&mut self, key: &[u8]) -> Result<()> { + Ok(self.delete(key)?) + } +} +impl ReadTransaction for rocksdb::Transaction<'_, OptimisticTransactionDB> { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self.get(key)?) } fn next(&self, key: &[u8]) -> Result>> { - let mut it = self.db.raw_iterator(); + let mut it = self.raw_iterator(); it.seek_for_prev(key); it.next(); Ok(it.key().map(Vec::from)) } + fn prev(&self, key: &[u8]) -> Result>> { - let mut it = self.db.raw_iterator(); + let mut it = self.raw_iterator(); it.seek(key); it.prev(); Ok(it.key().map(Vec::from)) -- cgit v1.3