From da985cc06e4caa7501222dbf54f212536fd42b0c Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 18 Dec 2025 18:45:53 +0100 Subject: transaction interface --- Cargo.lock | 15 +++++++++ database/Cargo.toml | 1 + database/src/backends/memory.rs | 59 +++++++++++++++++++++++++++-------- database/src/backends/mod.rs | 19 +++++++++--- database/src/backends/redb.rs | 67 +++++++++++++++++++++++++++++----------- database/src/backends/rocksdb.rs | 57 +++++++++++++++++++++++++--------- database/src/indices/mod.rs | 6 ++-- database/src/indices/order.rs | 6 ++-- database/src/lib.rs | 2 -- database/src/table.rs | 11 ++++--- database/src/transaction.rs | 42 ------------------------- 11 files changed, 180 insertions(+), 105 deletions(-) delete mode 100644 database/src/transaction.rs diff --git a/Cargo.lock b/Cargo.lock index a31cc5c..a50ba6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,6 +464,20 @@ name = "bytemuck" version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "byteorder" @@ -1834,6 +1848,7 @@ name = "jellydb" version = "0.1.0" dependencies = [ "anyhow", + "bytemuck", "jellycommon", "log", "redb", diff --git a/database/Cargo.toml b/database/Cargo.toml index 7e79b92..9354353 100644 --- a/database/Cargo.toml +++ b/database/Cargo.toml @@ -11,3 +11,4 @@ serde_json = "1.0.145" redb = "3.1.0" anyhow = "1.0.100" rocksdb = { version = "0.24.0", features = ["multi-threaded-cf"] } +bytemuck = { version = "1.24.0", features = ["derive"] } diff --git a/database/src/backends/memory.rs b/database/src/backends/memory.rs index f1952a3..3c2fdea 100644 --- a/database/src/backends/memory.rs +++ b/database/src/backends/memory.rs @@ -4,10 +4,14 @@ Copyright (C) 2025 metamuffin */ -use crate::backends::KV; +use crate::backends::{Db, ReadTransaction, ReadTxnFunction, WriteTransaction, WriteTxnFunction}; use anyhow::Result; -use std::{collections::BTreeMap, sync::RwLock}; +use std::{ + collections::BTreeMap, + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; +type Inner = BTreeMap, Vec>; pub struct Memory(RwLock, Vec>>); impl Memory { @@ -15,25 +19,54 @@ impl Memory { Self(RwLock::new(BTreeMap::new())) } } -impl KV for Memory { - fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.0.write().unwrap().insert(key.to_vec(), value.to_vec()); +impl Db for Memory { + fn write_transaction(&self, f: &mut WriteTxnFunction) -> Result<()> { + f(&mut self.0.write().unwrap()) + } + fn read_transaction(&self, f: &mut ReadTxnFunction) -> Result<()> { + f(&self.0.read().unwrap()) + } +} +impl WriteTransaction for RwLockWriteGuard<'_, Inner> { + fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + (**self).insert(key.to_vec(), value.to_vec()); + Ok(()) + } + fn del(&mut self, key: &[u8]) -> Result<()> { + (**self).remove(key); Ok(()) } +} +impl ReadTransaction for RwLockWriteGuard<'_, Inner> { fn get(&self, key: &[u8]) -> Result>> { - Ok(self.0.read().unwrap().get(key).cloned()) + Ok((**self).get(key).cloned()) } - fn del(&self, key: &[u8]) -> Result<()> { - self.0.write().unwrap().remove(key); - Ok(()) + fn next(&self, key: &[u8]) -> Result>> { + Ok((**self) + .range(key.to_vec()..) + .next() + .map(|(k, _)| k.to_owned())) + } + fn prev(&self, key: &[u8]) -> Result>> { + Ok((**self) + .range(..key.to_vec()) + .next_back() + .map(|(k, _)| k.to_owned())) + } +} +impl ReadTransaction for RwLockReadGuard<'_, Inner> { + fn get(&self, key: &[u8]) -> Result>> { + Ok((**self).get(key).cloned()) } fn next(&self, key: &[u8]) -> Result>> { - let r = self.0.read().unwrap(); - Ok(r.range(key.to_vec()..).next().map(|(k, _)| k.to_owned())) + Ok((**self) + .range(key.to_vec()..) + .next() + .map(|(k, _)| k.to_owned())) } fn prev(&self, key: &[u8]) -> Result>> { - let r = self.0.read().unwrap(); - Ok(r.range(..key.to_vec()) + Ok((**self) + .range(..key.to_vec()) .next_back() .map(|(k, _)| k.to_owned())) } diff --git a/database/src/backends/mod.rs b/database/src/backends/mod.rs index 814cc50..a95b00a 100644 --- a/database/src/backends/mod.rs +++ b/database/src/backends/mod.rs @@ -12,15 +12,24 @@ use crate::backends::{memory::Memory, redb::Redb, rocksdb::Rocksdb}; use anyhow::{Result, bail}; use std::{path::Path, sync::Arc}; -pub trait KV { - fn set(&self, key: &[u8], value: &[u8]) -> Result<()>; - fn get<'a>(&'a self, key: &[u8]) -> Result>>; - fn del(&self, key: &[u8]) -> Result<()>; +pub type WriteTxnFunction = dyn FnMut(&mut dyn WriteTransaction) -> Result<()>; +pub type ReadTxnFunction = dyn FnMut(&dyn ReadTransaction) -> Result<()>; + +pub trait Db { + fn write_transaction(&self, f: &mut WriteTxnFunction) -> Result<()>; + fn read_transaction(&self, f: &mut ReadTxnFunction) -> Result<()>; +} +pub trait WriteTransaction: ReadTransaction { + fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()>; + fn del(&mut self, key: &[u8]) -> Result<()>; +} +pub trait ReadTransaction { + fn get(&self, key: &[u8]) -> Result>>; fn next(&self, key: &[u8]) -> Result>>; fn prev(&self, key: &[u8]) -> Result>>; } -pub fn create_backend(driver: &str, path: &Path) -> Result> { +pub fn create_backend(driver: &str, path: &Path) -> Result> { Ok(match driver { "rocksdb" => Arc::new(Rocksdb::new(path)?), "redb" => Arc::new(Redb::new(path)?), diff --git a/database/src/backends/redb.rs b/database/src/backends/redb.rs index 1b672b6..d2849de 100644 --- a/database/src/backends/redb.rs +++ b/database/src/backends/redb.rs @@ -4,11 +4,10 @@ Copyright (C) 2025 metamuffin */ -use std::path::Path; - -use crate::backends::KV; +use crate::backends::{Db, ReadTransaction, ReadTxnFunction, WriteTransaction, WriteTxnFunction}; use anyhow::Result; -use redb::{Database, ReadableDatabase, TableDefinition}; +use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition}; +use std::path::Path; pub struct Redb { db: Database, @@ -23,29 +22,62 @@ impl Redb { }) } } -impl KV for Redb { - fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { - let txn = self.db.begin_write()?; - txn.open_table(TABLE)?.insert(key, value)?; +impl Db for Redb { + fn write_transaction(&self, f: &mut WriteTxnFunction) -> Result<()> { + let mut txn = self.db.begin_write()?; + f(&mut txn)?; txn.commit()?; Ok(()) } - fn del(&self, key: &[u8]) -> Result<()> { - let txn = self.db.begin_write()?; - txn.open_table(TABLE)?.remove(key)?; - txn.commit()?; + fn read_transaction(&self, f: &mut ReadTxnFunction) -> Result<()> { + let mut txn = self.db.begin_read()?; + f(&mut txn)?; Ok(()) } +} +impl WriteTransaction for redb::WriteTransaction { + fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + self.open_table(TABLE)?.insert(key, value)?; + Ok(()) + } + fn del(&mut self, key: &[u8]) -> Result<()> { + self.open_table(TABLE)?.remove(key)?; + Ok(()) + } +} +impl ReadTransaction for redb::WriteTransaction { + fn get<'a>(&'a self, key: &[u8]) -> Result>> { + match self.open_table(TABLE)?.get(key)? { + Some(v) => Ok(Some(v.value().to_vec())), + None => Ok(None), + } + } + fn next(&self, key: &[u8]) -> Result>> { + let table = self.open_table(TABLE)?; + let mut iter = table.range(key..)?; + match iter.next() { + Some(k) => Ok(Some(k?.0.value().to_vec())), + None => Ok(None), + } + } + fn prev(&self, key: &[u8]) -> Result>> { + let table = self.open_table(TABLE)?; + let mut iter = table.range(..key)?; + match iter.next_back() { + Some(k) => Ok(Some(k?.0.value().to_vec())), + None => Ok(None), + } + } +} +impl ReadTransaction for redb::ReadTransaction { fn get<'a>(&'a self, key: &[u8]) -> Result>> { - let txn = self.db.begin_read()?; - match txn.open_table(TABLE)?.get(key)? { + match self.open_table(TABLE)?.get(key)? { Some(v) => Ok(Some(v.value().to_vec())), None => Ok(None), } } fn next(&self, key: &[u8]) -> Result>> { - let txn = self.db.begin_read()?; - let table = txn.open_table(TABLE)?; + let table = self.open_table(TABLE)?; let mut iter = table.range(key..)?; match iter.next() { Some(k) => Ok(Some(k?.0.value().to_vec())), @@ -53,8 +85,7 @@ impl KV for Redb { } } fn prev(&self, key: &[u8]) -> Result>> { - let txn = self.db.begin_read()?; - let table = txn.open_table(TABLE)?; + let table = self.open_table(TABLE)?; let mut iter = table.range(..key)?; match iter.next_back() { Some(k) => Ok(Some(k?.0.value().to_vec())), diff --git a/database/src/backends/rocksdb.rs b/database/src/backends/rocksdb.rs index 31229a9..cdcb60a 100644 --- a/database/src/backends/rocksdb.rs +++ b/database/src/backends/rocksdb.rs @@ -4,41 +4,68 @@ Copyright (C) 2025 metamuffin */ -use std::path::Path; - -use crate::backends::KV; +use crate::backends::{Db, ReadTransaction, WriteTransaction, WriteTxnFunction}; use anyhow::Result; -use rocksdb::DB; +use rocksdb::{ErrorKind, OptimisticTransactionDB}; +use std::path::Path; pub struct Rocksdb { - db: DB, + db: OptimisticTransactionDB, } impl Rocksdb { pub fn new(path: &Path) -> Result { Ok(Self { - db: DB::open_default(path)?, + db: OptimisticTransactionDB::open_default(path)?, }) } } -impl KV for Rocksdb { - fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { - Ok(self.db.put(key, value)?) +impl Db for Rocksdb { + fn write_transaction(&self, f: &mut WriteTxnFunction) -> Result<()> { + loop { + let mut txn = self.db.transaction(); + f(&mut txn)?; + match txn.commit() { + Ok(()) => break Ok(()), + Err(e) if e.kind() == ErrorKind::Busy => continue, + Err(e) => return Err(e.into()), + } + } } - fn get(&self, key: &[u8]) -> Result>> { - Ok(self.db.get(key)?) + fn read_transaction(&self, f: &mut super::ReadTxnFunction) -> Result<()> { + loop { + let txn = self.db.transaction(); + f(&txn)?; + match txn.commit() { + Ok(()) => break Ok(()), + Err(e) if e.kind() == ErrorKind::Busy => continue, + Err(e) => return Err(e.into()), + } + } } - fn del(&self, key: &[u8]) -> Result<()> { - Ok(self.db.delete(key)?) +} +impl WriteTransaction for rocksdb::Transaction<'_, OptimisticTransactionDB> { + fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + Ok(self.put(key, value)?) + } + + fn del(&mut self, key: &[u8]) -> Result<()> { + Ok(self.delete(key)?) + } +} +impl ReadTransaction for rocksdb::Transaction<'_, OptimisticTransactionDB> { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self.get(key)?) } fn next(&self, key: &[u8]) -> Result>> { - let mut it = self.db.raw_iterator(); + let mut it = self.raw_iterator(); it.seek_for_prev(key); it.next(); Ok(it.key().map(Vec::from)) } + fn prev(&self, key: &[u8]) -> Result>> { - let mut it = self.db.raw_iterator(); + let mut it = self.raw_iterator(); it.seek(key); it.prev(); Ok(it.key().map(Vec::from)) diff --git a/database/src/indices/mod.rs b/database/src/indices/mod.rs index 5856254..523235e 100644 --- a/database/src/indices/mod.rs +++ b/database/src/indices/mod.rs @@ -4,14 +4,14 @@ Copyright (C) 2025 metamuffin */ -use crate::{backends::KV, table::RowNum}; +use crate::{backends::WriteTransaction, table::RowNum}; use anyhow::Result; pub mod order; pub trait Index { - fn add(&self, db: &dyn KV, row: RowNum, val: &T) -> Result<()>; - fn remove(&self, db: &dyn KV, row: RowNum, val: &T) -> Result<()>; + fn add(&self, db: &mut dyn WriteTransaction, row: RowNum, val: &T) -> Result<()>; + fn remove(&self, db: &mut dyn WriteTransaction, row: RowNum, val: &T) -> Result<()>; fn compare(&self, before: &T, after: &T) -> bool { let _ = (before, after); true diff --git a/database/src/indices/order.rs b/database/src/indices/order.rs index 04fb975..5b7924b 100644 --- a/database/src/indices/order.rs +++ b/database/src/indices/order.rs @@ -4,7 +4,7 @@ Copyright (C) 2025 metamuffin */ -use crate::{backends::KV, indices::Index, table::Table}; +use crate::{backends::WriteTransaction, indices::Index, table::Table}; use anyhow::Result; pub struct OrderIndex { @@ -25,11 +25,11 @@ impl OrderIndex { } } impl Index for OrderIndex { - fn add(&self, db: &dyn KV, id: u64, val: &T) -> Result<()> { + fn add(&self, db: &mut dyn WriteTransaction, id: u64, val: &T) -> Result<()> { db.set(&self.key(id, val), &[])?; Ok(()) } - fn remove(&self, db: &dyn KV, id: u64, val: &T) -> Result<()> { + fn remove(&self, db: &mut dyn WriteTransaction, id: u64, val: &T) -> Result<()> { db.del(&self.key(id, val))?; Ok(()) } diff --git a/database/src/lib.rs b/database/src/lib.rs index 7618fe7..6c70e51 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -3,8 +3,6 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ - pub mod backends; pub mod indices; pub mod table; -pub mod transaction; diff --git a/database/src/table.rs b/database/src/table.rs index 8c6b724..e2f6196 100644 --- a/database/src/table.rs +++ b/database/src/table.rs @@ -4,7 +4,10 @@ Copyright (C) 2025 metamuffin */ -use crate::{backends::KV, indices::Index}; +use crate::{ + backends::{ReadTransaction, WriteTransaction}, + indices::Index, +}; use anyhow::Result; use serde::{Serialize, de::DeserializeOwned}; @@ -27,7 +30,7 @@ impl Table { key.extend(row.to_be_bytes()); key } - pub fn insert(&self, db: &dyn KV, entry: T) -> Result { + pub fn insert(&self, db: &mut dyn WriteTransaction, entry: T) -> Result { let mut id_counter = db .get(&self.id.to_be_bytes())? .map(|k| k.as_slice().try_into().map(RowNum::from_be_bytes).ok()) @@ -45,13 +48,13 @@ impl Table { Ok(id_counter) } - pub fn get(&self, db: &dyn KV, row: RowNum) -> Result> { + pub fn get(&self, db: &dyn ReadTransaction, row: RowNum) -> Result> { Ok(db .get(&self.key(row))? .map(|v| serde_json::from_slice(&v)) .transpose()?) } - pub fn remove(&self, db: &dyn KV, row: RowNum) -> Result { + pub fn remove(&self, db: &mut dyn WriteTransaction, row: RowNum) -> Result { let Some(entry) = self.get(db, row)? else { return Ok(false); }; diff --git a/database/src/transaction.rs b/database/src/transaction.rs deleted file mode 100644 index c0e89b7..0000000 --- a/database/src/transaction.rs +++ /dev/null @@ -1,42 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ - -use crate::backends::KV; -use anyhow::Result; -use std::sync::Arc; - -pub struct TransactionController { - db: Arc, -} -impl TransactionController { - pub fn transact(&self, tx: impl Fn(&mut Transaction) -> Result<()>) -> Result<()> { - tx(&mut Transaction { - db: self.db.clone(), - })?; - Ok(()) - } -} - -pub struct Transaction { - db: Arc, -} -impl KV for Transaction { - fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.db.set(key, value) - } - fn get(&self, key: &[u8]) -> Result>> { - self.db.get(key) - } - fn del(&self, key: &[u8]) -> Result<()> { - self.db.del(key) - } - fn next(&self, key: &[u8]) -> Result>> { - self.db.next(key) - } - fn prev(&self, key: &[u8]) -> Result>> { - self.db.prev(key) - } -} -- cgit v1.3