/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ pub mod binning; pub mod helpers; pub mod index; pub mod index_key; pub mod merge_iterator; pub mod prefix_iterator; #[cfg(test)] pub mod tests; use crate::{ Database, Query, RowNum, Transaction, kv::{ helpers::{read_counter, write_counter}, index::{iter_index, read_count_index, update_index}, index_key::{IndexKey, SortKey}, merge_iterator::MergeIterator, prefix_iterator::PrefixIterator, }, }; use anyhow::{Result, anyhow}; use jellyobject::{Object, vec_to_ob, vec_u8_to_u32}; use log::{debug, info}; use std::{borrow::Cow, fmt::Write}; pub type SubtreeNum = u32; const T_ROW_COUNTER: SubtreeNum = 0; const T_ROWS: SubtreeNum = 1; const T_INDEX_COUNTER: SubtreeNum = 2; const T_INDICES: SubtreeNum = 3; const INDEX_TNUM_OFFSET: SubtreeNum = 64; impl Database for T { fn transaction(&self, f: &mut dyn FnMut(&mut dyn Transaction) -> Result<()>) -> Result<()> { jellykv::Store::transaction(self, &mut |mut txn| f(&mut txn)) } } impl Transaction for &mut dyn jellykv::Transaction { fn insert(&mut self, entry: Box) -> Result { debug!("insert {entry:?}"); let mut id_counter = read_counter(*self, &T_ROW_COUNTER.to_be_bytes(), 0)?; let row = id_counter; id_counter += 1; write_counter(*self, &T_ROW_COUNTER.to_be_bytes(), id_counter)?; jellykv::Transaction::set(*self, &row_key(row), bytemuck::cast_slice(entry.export()))?; for (is, ik) in list_indices(*self)? { update_index(*self, is, &ik, row, &entry, false)?; } Ok(row) } fn remove(&mut self, row: RowNum) -> Result<()> { debug!("remove {row}"); let entry = self.get(row)?.ok_or(anyhow!("row did not exist"))?; for (is, ik) in list_indices(*self)? { update_index(*self, is, &ik, row, &entry, true)?; } jellykv::Transaction::del(*self, &row_key(row)) } fn update(&mut self, row: RowNum, entry: Box) -> Result<()> { debug!("update {row}"); let before = self.get(row)?.ok_or(anyhow!("row to update missing"))?; jellykv::Transaction::set(*self, &row_key(row), bytemuck::cast_slice(entry.export()))?; for (is, ik) in list_indices(*self)? { update_index(*self, is, &ik, row, &before, true)?; update_index(*self, is, &ik, row, &entry, false)?; } Ok(()) } fn get(&self, row: RowNum) -> Result>> { Ok(jellykv::Transaction::get(*self, &row_key(row))? .map(vec_u8_to_u32) .map(vec_to_ob)) } fn query<'a>( &'a mut self, query: Query, ) -> Result)>> + 'a>> { debug!("query: {query}"); let mut prefixes = Vec::new(); for (binning, mut prefix) in query.filter.get_bins() { let ik = IndexKey(binning, query.sort.key()); let is = get_or_create_index(*self, &ik)?; prefix.splice(0..0, is.to_be_bytes()); prefixes.push(prefix); } let mut iters = Vec::new(); for prefix in prefixes { iters.push(iter_index( *self, prefix, &query.sort, query.continuation.clone(), )?) } if iters.len() == 1 { return Ok(iters.pop().unwrap()); } Ok(Box::new(MergeIterator::new(iters))) } fn query_single(&mut self, query: Query) -> Result> { Ok(self.query(query)?.next().transpose()?.map(|(e, _)| e)) } fn count(&mut self, query: Query) -> Result { let mut total = 0; for (binning, mut prefix) in query.filter.get_bins() { let ik = IndexKey(binning, SortKey::Count); let is = get_or_create_index(*self, &ik)?; prefix.splice(0..0, is.to_be_bytes()); total += read_count_index(*self, prefix)?; } Ok(total) } fn debug_info(&self) -> Result { let mut o = String::new(); let rc = read_counter(*self, &T_ROW_COUNTER.to_be_bytes(), 0)?; writeln!(o, "Row Counter: {rc}")?; writeln!(o, "Indices:")?; for (is, ik) in list_indices(*self)? { writeln!(o, "\tIS={is} IK={ik}")?; } Ok(o) } } fn get_or_create_index(txn: &mut dyn jellykv::Transaction, ik: &IndexKey) -> Result { match jellykv::Transaction::get(txn, &ik.to_bytes())? { Some(is) => Ok(SubtreeNum::from_be_bytes(is.try_into().unwrap())), None => { let is = alloc_subtree(txn)?; create_index(txn, is, ik)?; jellykv::Transaction::set(txn, &ik.to_bytes(), &is.to_be_bytes())?; Ok(is) } } } fn alloc_subtree(txn: &mut dyn jellykv::Transaction) -> Result { let is = read_counter( txn, &T_INDEX_COUNTER.to_be_bytes(), INDEX_TNUM_OFFSET as u64, )?; write_counter(txn, &T_INDEX_COUNTER.to_be_bytes(), is + 1)?; Ok(is as SubtreeNum) } fn create_index(txn: &mut dyn jellykv::Transaction, is: SubtreeNum, ik: &IndexKey) -> Result<()> { info!("creating new index {is}: {ik:?}"); let rowkeys = PrefixIterator { inner: txn.iter(&T_ROWS.to_be_bytes(), false)?, prefix: Cow::Borrowed(&T_ROWS.to_be_bytes()), } .collect::, _>>()?; // TODO dont collect this for rowkey in rowkeys { let row = inv_row_key(&rowkey); let buf = vec_to_ob(vec_u8_to_u32( jellykv::Transaction::get(txn, &rowkey)?.unwrap(), )); update_index(txn, is, ik, row, &buf, false)?; } Ok(()) } fn list_indices(txn: &dyn jellykv::Transaction) -> Result> { let indices = PrefixIterator { inner: jellykv::Transaction::iter(txn, &T_INDICES.to_be_bytes(), false)?, prefix: Cow::Borrowed(&T_INDICES.to_be_bytes()), }; let mut out = Vec::new(); for i in indices { let i = i?; let ik = IndexKey::from_bytes(&i); let is_raw = jellykv::Transaction::get(txn, &i)?.unwrap(); let is = SubtreeNum::from_be_bytes(is_raw.try_into().unwrap()); out.push((is, ik)); } Ok(out) } fn row_key(row: RowNum) -> Vec { let mut key = Vec::new(); key.extend(T_ROWS.to_be_bytes()); key.extend(row.to_be_bytes()); key } fn inv_row_key(k: &[u8]) -> RowNum { RowNum::from_be_bytes(k[4..12].try_into().unwrap()) }