/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ pub mod binning; pub mod helpers; pub mod index; pub mod index_key; pub mod merge_iterator; pub mod prefix_iterator; pub mod sort; use std::borrow::Cow; use crate::{ Database, Query, RowNum, Transaction, kv::{ helpers::{read_counter, write_counter}, index::{iter_index, read_count_index, update_index}, index_key::{IndexKey, SortKey}, merge_iterator::MergeIterator, prefix_iterator::PrefixIterator, }, }; use anyhow::{Result, anyhow}; use jellyobject::ObjectBuffer; pub type SubtreeNum = u32; const T_ROW_COUNTER: SubtreeNum = 0; const T_ROWS: SubtreeNum = 1; const T_INDEX_COUNTER: SubtreeNum = 2; const T_INDICES: SubtreeNum = 3; const INDEX_TNUM_OFFSET: SubtreeNum = 64; impl Database for T { fn transaction(&self, f: &mut dyn FnMut(&mut dyn Transaction) -> Result<()>) -> Result<()> { jellykv::Store::transaction(self, &mut |mut txn| f(&mut txn)) } } impl Transaction for &mut dyn jellykv::Transaction { fn insert(&mut self, entry: ObjectBuffer) -> Result { let mut id_counter = read_counter(*self, &T_ROW_COUNTER.to_be_bytes(), 0)?; let row = id_counter; id_counter += 1; write_counter(*self, &T_ROW_COUNTER.to_be_bytes(), id_counter)?; jellykv::Transaction::set( *self, &row_key(row), bytemuck::cast_slice(entry.0.as_slice()), )?; let ob = entry.as_object(); for (is, ik) in list_indices(*self)? { update_index(*self, is, &ik, row, ob, false)?; } Ok(row) } fn remove(&mut self, row: RowNum) -> Result<()> { let entry = self.get(row)?.ok_or(anyhow!("row did not exist"))?; let ob = entry.as_object(); for (is, ik) in list_indices(*self)? { update_index(*self, is, &ik, row, ob, true)?; } jellykv::Transaction::del(*self, &row_key(row)) } fn update(&mut self, row: RowNum, entry: ObjectBuffer) -> Result<()> { let before = self.get(row)?.ok_or(anyhow!("row to update missing"))?; let before = before.as_object(); let after = entry.as_object(); jellykv::Transaction::set( *self, &row_key(row), bytemuck::cast_slice(entry.0.as_slice()), )?; for (is, ik) in list_indices(*self)? { update_index(*self, is, &ik, row, before, true)?; update_index(*self, is, &ik, row, after, false)?; } Ok(()) } fn get(&self, row: RowNum) -> Result> { Ok(jellykv::Transaction::get(*self, &row_key(row))?.map(ObjectBuffer::from)) } fn query<'a>( &'a mut self, query: Query, ) -> Result)>> + 'a>> { let mut prefixes = Vec::new(); for (binning, mut prefix) in query.filter.get_bins() { let ik = IndexKey(binning, query.sort.key()); let is = get_or_create_index(*self, &ik)?; prefix.splice(0..0, is.to_be_bytes()); prefixes.push(prefix); } let mut iters = Vec::new(); for prefix in prefixes { iters.push(iter_index(*self, prefix, &query.sort)?) } if iters.len() == 1 { return Ok(iters.pop().unwrap()); } Ok(Box::new(MergeIterator::new(iters))) } fn query_single(&mut self, query: Query) -> Result> { Ok(self.query(query)?.next().transpose()?.map(|(e, _)| e)) } fn count(&mut self, query: Query) -> Result { let mut total = 0; for (binning, mut prefix) in query.filter.get_bins() { let ik = IndexKey(binning, SortKey::Count); let is = get_or_create_index(*self, &ik)?; prefix.splice(0..0, is.to_be_bytes()); total += read_count_index(*self, prefix)?; } Ok(total) } } fn get_or_create_index(txn: &mut dyn jellykv::Transaction, ik: &IndexKey) -> Result { match jellykv::Transaction::get(txn, &ik.to_bytes())? { Some(is) => Ok(SubtreeNum::from_be_bytes(is.try_into().unwrap())), None => { let is = alloc_subtree(txn)?; create_index(txn, is, ik)?; jellykv::Transaction::set(txn, &ik.to_bytes(), &is.to_be_bytes())?; Ok(is) } } } fn alloc_subtree(txn: &mut dyn jellykv::Transaction) -> Result { let is = read_counter( txn, &T_INDEX_COUNTER.to_be_bytes(), INDEX_TNUM_OFFSET as u64, )?; write_counter(txn, &T_INDEX_COUNTER.to_be_bytes(), is + 1)?; Ok(is as SubtreeNum) } fn create_index(txn: &mut dyn jellykv::Transaction, is: SubtreeNum, ik: &IndexKey) -> Result<()> { let rowkeys = PrefixIterator { inner: txn.iter(&T_ROWS.to_be_bytes(), false)?, prefix: Cow::Borrowed(&T_ROWS.to_be_bytes()), } .collect::, _>>()?; // TODO dont collect this for rowkey in rowkeys { let row = inv_row_key(&rowkey); let buf = ObjectBuffer::from(jellykv::Transaction::get(txn, &rowkey)?.unwrap()); update_index(txn, is, ik, row, buf.as_object(), false)?; } Ok(()) } fn list_indices(txn: &dyn jellykv::Transaction) -> Result> { let indices = PrefixIterator { inner: jellykv::Transaction::iter(txn, &T_INDICES.to_be_bytes(), false)?, prefix: Cow::Borrowed(&T_INDICES.to_be_bytes()), }; let mut out = Vec::new(); for i in indices { let i = i?; let ik = IndexKey::from_bytes(&i); let is = read_counter(txn, &i, 0)? as SubtreeNum; out.push((is, ik)); } Ok(out) } fn row_key(row: RowNum) -> Vec { let mut key = Vec::new(); key.extend(T_ROWS.to_be_bytes()); key.extend(row.to_be_bytes()); key } fn inv_row_key(k: &[u8]) -> RowNum { RowNum::from_be_bytes(k[4..12].try_into().unwrap()) } #[cfg(test)] mod test { use crate::{ Database, Filter, Query, Sort, test_shared::{AGE, NAME, new_alice, new_bob, new_charlie}, }; use anyhow::Result; use jellyobject::Path; #[test] pub fn insert_get() -> Result<()> { let db = jellykv::memory::new(); let mut bob_row = 0; db.transaction(&mut |txn| { bob_row = txn.insert(new_bob())?; Ok(()) })?; let mut bob = None; db.transaction(&mut |txn| { bob = txn.get(bob_row)?; Ok(()) })?; assert_eq!(bob.unwrap().as_object().get(NAME).unwrap(), "Bob"); Ok(()) } #[test] pub fn update() -> Result<()> { let db = jellykv::memory::new(); let mut bob_row = 0; let mut bob = None; db.transaction(&mut |txn| { bob_row = txn.insert(new_bob())?; Ok(()) })?; db.transaction(&mut |txn| { let better_bob = new_bob().as_object().insert(NAME, "Better Bob"); txn.update(bob_row, better_bob)?; Ok(()) })?; db.transaction(&mut |txn| { bob = txn.get(bob_row)?; Ok(()) })?; assert_eq!(bob.unwrap().as_object().get(NAME).unwrap(), "Better Bob"); Ok(()) } #[test] pub fn filter() -> Result<()> { let db = jellykv::memory::new(); let mut rows = [0, 0, 0]; let mut result = None; db.transaction(&mut |txn| { rows = [ txn.insert(new_bob())?, txn.insert(new_alice())?, txn.insert(new_charlie())?, ]; Ok(()) })?; db.transaction(&mut |txn| { result = txn.query_single(Query { filter: Filter::Match(Path(vec![AGE.0]), 35_u32.to_be_bytes().to_vec()), sort: Sort::None, })?; Ok(()) })?; assert_eq!(result.unwrap(), 0); Ok(()) } }