From 4ba86694e393c61107e27c4127efc0455b329524 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 26 Feb 2026 12:47:40 +0100 Subject: db continuation --- database/src/kv/index.rs | 61 +++++++++++++++++++++++++++++++----------------- database/src/kv/mod.rs | 7 +++++- 2 files changed, 45 insertions(+), 23 deletions(-) (limited to 'database') diff --git a/database/src/kv/index.rs b/database/src/kv/index.rs index 60d291c..ee14d77 100644 --- a/database/src/kv/index.rs +++ b/database/src/kv/index.rs @@ -13,7 +13,7 @@ use crate::{ prefix_iterator::PrefixIterator, }, }; -use anyhow::Result; +use anyhow::{Result, bail}; use jellyobject::Object; use std::iter::empty; @@ -86,22 +86,31 @@ pub fn iter_index<'a>( txn: &'a dyn jellykv::Transaction, prefix: Vec, sort: &Sort, + resume: Option>, ) -> Result)>> + 'a>> { Ok(match sort { - Sort::None => Box::new( - PrefixIterator { - inner: txn.iter(&prefix, false)?, - prefix: prefix.into(), + Sort::None => { + let mut start = prefix.clone(); + if let Some(resume) = resume { + if resume.len() != 8 { + bail!("invalid resume length") + } + start.extend(resume); + start = inc_key(start); } - .map(|k| { - k.map(|k| { - ( - RowNum::from_be_bytes(k[k.len() - 8..].try_into().unwrap()), - k[4..].to_vec(), - ) - }) - }), - ), + Box::new( + PrefixIterator { + inner: txn.iter(&start, false)?, + prefix: prefix.into(), + } + .map(|k| { + k.map(|k| { + let rn = RowNum::from_be_bytes(k[k.len() - 8..].try_into().unwrap()); + (rn, rn.to_be_bytes().to_vec()) + }) + }), + ) + } Sort::Random(seed) => { let mut k = prefix.clone(); k.extend(randomize(*seed).to_be_bytes()); @@ -118,19 +127,27 @@ pub fn iter_index<'a>( } Sort::Value(value_sort) => { assert!(value_sort.offset.is_none(), "TODO"); + let mut start = prefix.clone(); + if let Some(resume) = resume { + if resume.len() != 8 { + bail!("invalid resume length") + } + start.extend(resume); + start = inc_key(start); + } Box::new( PrefixIterator { inner: match value_sort.order { - SortOrder::Ascending => txn.iter(&prefix, false)?, - SortOrder::Descending => txn.iter(&prefix_end(prefix.clone()), true)?, + SortOrder::Ascending => txn.iter(&start, false)?, + SortOrder::Descending => txn.iter(&inc_key(prefix.clone()), true)?, }, - prefix: prefix.into(), + prefix: prefix.clone().into(), } - .map(|k| { + .map(move |k| { k.map(|k| { ( RowNum::from_be_bytes(k[k.len() - 8..].try_into().unwrap()), - k[4..].to_vec(), + k[prefix.len()..].to_vec(), ) }) }), @@ -140,15 +157,15 @@ pub fn iter_index<'a>( }) } -fn prefix_end(mut prefix: Vec) -> Vec { - for v in prefix.iter_mut().rev() { +fn inc_key(mut k: Vec) -> Vec { + for v in k.iter_mut().rev() { let (nv, carry) = v.overflowing_add(1); *v = nv; if !carry { break; } } - prefix + k } fn randomize(mut x: u64) -> u64 { diff --git a/database/src/kv/mod.rs b/database/src/kv/mod.rs index 69e9c3b..837aa51 100644 --- a/database/src/kv/mod.rs +++ b/database/src/kv/mod.rs @@ -111,7 +111,12 @@ impl Transaction for &mut dyn jellykv::Transaction { } let mut iters = Vec::new(); for prefix in prefixes { - iters.push(iter_index(*self, prefix, &query.sort)?) + iters.push(iter_index( + *self, + prefix, + &query.sort, + query.continuation.clone(), + )?) } if iters.len() == 1 { return Ok(iters.pop().unwrap()); -- cgit v1.3