aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--database/src/kv/binning.rs67
-rw-r--r--database/src/kv/index.rs14
-rw-r--r--database/src/kv/mod.rs80
-rw-r--r--database/src/lib.rs6
4 files changed, 118 insertions, 49 deletions
diff --git a/database/src/kv/binning.rs b/database/src/kv/binning.rs
index 16c88d4..1057a28 100644
--- a/database/src/kv/binning.rs
+++ b/database/src/kv/binning.rs
@@ -52,30 +52,51 @@ impl BinningComponent {
}
impl Filter {
- pub fn get_bins(&self) -> Vec<Binning> {
- fn recurse(f: &Filter) -> Vec<Vec<BinningComponent>> {
- match f {
- Filter::True => vec![vec![]],
- Filter::All(filters) => {
- let mut o = vec![vec![]];
- for filter in filters {
- let mut new_o = Vec::new();
- for par in recurse(filter) {
- for mut prev in o.clone() {
- prev.extend(par.clone());
- new_o.push(prev);
- }
+ pub fn get_binnings(&self) -> Vec<Binning> {
+ self.get_bins_inner()
+ .into_iter()
+ .map(|e| Binning(e.into_iter().map(|(e, _)| e).collect()))
+ .collect()
+ }
+ pub fn get_bins(&self) -> Vec<(Binning, Vec<u8>)> {
+ self.get_bins_inner()
+ .into_iter()
+ .map(|e| {
+ let (a, b): (Vec<BinningComponent>, Vec<Vec<u8>>) = e.into_iter().unzip();
+ (Binning(a), b.into_iter().flatten().collect())
+ })
+ .collect()
+ }
+ fn get_bins_inner(&self) -> Vec<Vec<(BinningComponent, Vec<u8>)>> {
+ match self {
+ Filter::True => vec![vec![]],
+ Filter::All(filters) => {
+ let mut o = vec![vec![]];
+ for filter in filters {
+ let mut new_o = Vec::new();
+ for par in filter.get_bins_inner() {
+ for mut prev in o.clone() {
+ prev.extend(par.clone());
+ new_o.push(prev);
}
- o = new_o;
}
- o
+ o = new_o;
}
- Filter::Any(filters) => filters.iter().flat_map(|f| recurse(f)).collect(),
- Filter::Match(path, _) => vec![vec![BinningComponent::Match(path.to_owned())]],
- Filter::Has(path) => vec![vec![BinningComponent::Has(path.to_owned())]],
+ o
+ }
+ Filter::Any(filters) => filters.iter().flat_map(|f| f.get_bins_inner()).collect(),
+ Filter::Match(path, value) => {
+ vec![vec![(BinningComponent::Match(path.to_owned()), {
+ let mut co = Vec::new();
+ co.extend((co.len() as u32).to_be_bytes());
+ co.extend(value);
+ co
+ })]]
+ }
+ Filter::Has(path) => {
+ vec![vec![(BinningComponent::Has(path.to_owned()), vec![])]]
}
}
- recurse(self).into_iter().map(Binning::new).collect()
}
}
@@ -94,11 +115,11 @@ mod test {
Filter::Has(Path(vec![Tag(0)])),
Filter::Has(Path(vec![Tag(1)])),
]);
- let bins = vec![Binning::new(vec![
+ let bins = vec![Binning(vec![
BinningComponent::Has(Path(vec![Tag(0)])),
BinningComponent::Has(Path(vec![Tag(1)])),
])];
- assert_eq!(f.get_bins(), bins)
+ assert_eq!(f.get_binnings(), bins)
}
#[test]
@@ -111,7 +132,7 @@ mod test {
Binning::new(vec![BinningComponent::Has(Path(vec![Tag(0)]))]),
Binning::new(vec![BinningComponent::Has(Path(vec![Tag(1)]))]),
];
- assert_eq!(f.get_bins(), bins)
+ assert_eq!(f.get_binnings(), bins)
}
#[test]
@@ -144,6 +165,6 @@ mod test {
BinningComponent::Has(Path(vec![Tag(3)])),
]),
];
- assert_eq!(f.get_bins(), bins)
+ assert_eq!(f.get_binnings(), bins)
}
}
diff --git a/database/src/kv/index.rs b/database/src/kv/index.rs
index 27aabd6..522ae18 100644
--- a/database/src/kv/index.rs
+++ b/database/src/kv/index.rs
@@ -19,14 +19,14 @@ use crate::{
pub fn update_index(
txn: &mut dyn jellykv::Transaction,
is: SubtreeNum,
- ik: IndexKey,
+ ik: &IndexKey,
row: RowNum,
ob: Object<'_>,
remove: bool,
) -> Result<()> {
let mut ks = vec![is.to_be_bytes().to_vec()];
ik.0.apply(ob, &mut ks);
- match ik.1 {
+ match &ik.1 {
SortKey::None => {
for mut k in ks {
k.extend(row.to_be_bytes());
@@ -53,3 +53,13 @@ pub fn update_index(
}
Ok(())
}
+
+pub fn read_count_index(
+ txn: &dyn jellykv::Transaction,
+ is: SubtreeNum,
+ prefix: Vec<u8>,
+) -> Result<u64> {
+ let mut k = is.to_be_bytes().to_vec();
+ k.extend(&prefix);
+ read_counter(txn, &k, 0)
+}
diff --git a/database/src/kv/mod.rs b/database/src/kv/mod.rs
index 1a058a7..2d2f3cc 100644
--- a/database/src/kv/mod.rs
+++ b/database/src/kv/mod.rs
@@ -17,8 +17,8 @@ use crate::{
Database, Query, RowIter, RowNum, Transaction,
kv::{
helpers::{read_counter, write_counter},
- index::update_index,
- index_key::IndexKey,
+ index::{read_count_index, update_index},
+ index_key::{IndexKey, SortKey},
prefix_iterator::PrefixIterator,
},
};
@@ -54,9 +54,8 @@ impl Transaction for &mut dyn jellykv::Transaction {
)?;
let ob = entry.as_object();
-
for (is, ik) in list_indices(*self)? {
- update_index(*self, is, ik, row, ob, false)?;
+ update_index(*self, is, &ik, row, ob, false)?;
}
Ok(row)
@@ -64,9 +63,9 @@ impl Transaction for &mut dyn jellykv::Transaction {
fn remove(&mut self, row: RowNum) -> Result<()> {
let entry = self.get(row)?.ok_or(anyhow!("row did not exist"))?;
let ob = entry.as_object();
- // for index in self.indices.values() {
- // index.remove(db, row, ob)?;
- // }
+ for (is, ik) in list_indices(*self)? {
+ update_index(*self, is, &ik, row, ob, true)?;
+ }
jellykv::Transaction::del(*self, &row_key(row))
}
fn update(&mut self, row: RowNum, entry: ObjectBuffer) -> Result<()> {
@@ -80,12 +79,10 @@ impl Transaction for &mut dyn jellykv::Transaction {
bytemuck::cast_slice(entry.0.as_slice()),
)?;
- // for index in self.indices.values() {
- // if !index.compare(before, after) {
- // index.remove(txn, row, before)?;
- // index.add(txn, row, after)?;
- // }
- // }
+ for (is, ik) in list_indices(*self)? {
+ update_index(*self, is, &ik, row, before, true)?;
+ update_index(*self, is, &ik, row, after, false)?;
+ }
Ok(())
}
@@ -94,7 +91,7 @@ impl Transaction for &mut dyn jellykv::Transaction {
Ok(jellykv::Transaction::get(*self, &row_key(row))?.map(ObjectBuffer::from))
}
- fn query(&self, query: Query) -> Result<RowIter> {
+ fn query(&mut self, query: Query) -> Result<RowIter> {
// query
// .filter
// .get_bins()
@@ -106,20 +103,58 @@ impl Transaction for &mut dyn jellykv::Transaction {
// .map(|i| i.query(txn, &query.sort))
todo!()
}
- fn query_single(&self, query: Query) -> Result<Option<RowNum>> {
+ fn query_single(&mut self, query: Query) -> Result<Option<RowNum>> {
self.query(query)?.next().transpose()
}
- fn count(&self, query: Query) -> Result<u64> {
+ fn count(&mut self, query: Query) -> Result<u64> {
let mut total = 0;
- // for b in query.filter.get_bins() {
- // if let Some(&c) = self.counters.get(&b) {
- // total += read_counter(txn, c)?;
- // }
- // }
+ for (binning, prefix) in query.filter.get_bins() {
+ let ik = IndexKey(binning, SortKey::Count);
+ let is = get_or_create_index(*self, &ik)?;
+ total += read_count_index(*self, is, prefix)?;
+ }
Ok(total)
}
}
+fn get_or_create_index(txn: &mut dyn jellykv::Transaction, ik: &IndexKey) -> Result<SubtreeNum> {
+ match jellykv::Transaction::get(txn, &ik.to_bytes())? {
+ Some(is) => Ok(SubtreeNum::from_be_bytes(is.try_into().unwrap())),
+ None => {
+ let is = alloc_subtree(txn)?;
+ create_index(txn, is, ik)?;
+ jellykv::Transaction::set(txn, &ik.to_bytes(), &is.to_be_bytes())?;
+ Ok(is)
+ }
+ }
+}
+
+fn alloc_subtree(txn: &mut dyn jellykv::Transaction) -> Result<SubtreeNum> {
+ let is = read_counter(
+ txn,
+ &T_INDEX_COUNTER.to_be_bytes(),
+ INDEX_TNUM_OFFSET as u64,
+ )?;
+ write_counter(txn, &T_INDEX_COUNTER.to_be_bytes(), is + 1)?;
+ Ok(is as SubtreeNum)
+}
+
+fn create_index(txn: &mut dyn jellykv::Transaction, is: SubtreeNum, ik: &IndexKey) -> Result<()> {
+ let rowkeys = PrefixIterator {
+ inner: txn.iter(&T_ROWS.to_be_bytes(), false)?,
+ prefix: Cow::Borrowed(&T_ROWS.to_be_bytes()),
+ }
+ .collect::<Result<Vec<_>, _>>()?; // TODO dont collect this
+
+ for rowkey in rowkeys {
+ let row = inv_row_key(&rowkey);
+ let buf = ObjectBuffer::from(jellykv::Transaction::get(txn, &rowkey)?.unwrap());
+ update_index(txn, is, ik, row, buf.as_object(), false)?;
+ }
+
+ Ok(())
+}
+
fn list_indices(txn: &dyn jellykv::Transaction) -> Result<Vec<(SubtreeNum, IndexKey)>> {
let indices = PrefixIterator {
inner: jellykv::Transaction::iter(txn, &T_INDICES.to_be_bytes(), false)?,
@@ -141,6 +176,9 @@ fn row_key(row: RowNum) -> Vec<u8> {
key.extend(row.to_be_bytes());
key
}
+fn inv_row_key(k: &[u8]) -> RowNum {
+ RowNum::from_be_bytes(k[4..12].try_into().unwrap())
+}
#[cfg(test)]
mod test {
diff --git a/database/src/lib.rs b/database/src/lib.rs
index da15d86..d4b9fba 100644
--- a/database/src/lib.rs
+++ b/database/src/lib.rs
@@ -22,9 +22,9 @@ pub trait Transaction {
fn remove(&mut self, row: RowNum) -> Result<()>;
fn update(&mut self, row: RowNum, entry: ObjectBuffer) -> Result<()>;
fn get(&self, row: RowNum) -> Result<Option<ObjectBuffer>>;
- fn query(&self, query: Query) -> Result<RowIter>;
- fn query_single(&self, query: Query) -> Result<Option<RowNum>>;
- fn count(&self, query: Query) -> Result<u64>;
+ fn query(&mut self, query: Query) -> Result<RowIter>;
+ fn query_single(&mut self, query: Query) -> Result<Option<RowNum>>;
+ fn count(&mut self, query: Query) -> Result<u64>;
}
#[derive(Default)]