use std::collections::HashMap; use anyhow::anyhow; use jellybase::database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT}; use jellycommon::{ExtendedNode, Node}; use log::info; use std::sync::RwLock; pub(crate) trait ImportStorage: Sync { fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>; fn add_partial_node_ext( &self, id: &str, index_path: &[usize], node: ExtendedNode, ) -> anyhow::Result<()>; fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>>; fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>; fn remove_prev_nodes(&self) -> anyhow::Result<()>; fn finish(&self) -> anyhow::Result<()>; } pub(crate) struct DatabaseStorage<'a> { pub db: &'a DataAcid, } impl<'a> DatabaseStorage<'a> { pub fn new(db: &'a DataAcid) -> Self { Self { db } } } impl ImportStorage for DatabaseStorage<'_> { fn remove_prev_nodes(&self) -> anyhow::Result<()> { info!("removing old nodes..."); let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE)?; table.drain::<&str>(..)?; drop(table); txn.commit()?; Ok(()) } fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>> { let txn = self.db.inner.begin_read()?; let table = txn.open_table(T_NODE_IMPORT)?; let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?; Ok(value.value().0) } fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { let txn_write = self.db.inner.begin_write()?; let mut t_node = txn_write.open_table(T_NODE)?; t_node.insert(id, Ser(node))?; drop(t_node); txn_write.commit()?; Ok(()) } fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); parts.push((index_path.to_vec(), node.clone())); table.insert(id, Ser(parts))?; drop(table); txn.commit()?; Ok(()) } fn add_partial_node_ext( &self, id: &str, _index_path: &[usize], node: ExtendedNode, ) -> anyhow::Result<()> { // TODO merge this let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_EXTENDED)?; table.insert(id, Ser(node))?; drop(table); txn.commit()?; Ok(()) } fn finish(&self) -> anyhow::Result<()> { info!("clearing temporary node tree..."); let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; table.drain::<&str>(..)?; drop(table); txn.commit()?; Ok(()) } } pub(crate) struct MemoryStorage<'a> { pub db: &'a DataAcid, pub parts: RwLock, Node)>>>, } impl<'a> MemoryStorage<'a> { pub fn new(db: &'a DataAcid) -> Self { Self { db, parts: Default::default(), } } } impl ImportStorage for MemoryStorage<'_> { fn remove_prev_nodes(&self) -> anyhow::Result<()> { info!("removing old nodes..."); let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE)?; table.drain::<&str>(..)?; drop(table); txn.commit()?; Ok(()) } fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>> { Ok(self .parts .read() .unwrap() .get(id) .ok_or(anyhow!("node parts not found"))? .to_owned()) } fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { let txn_write = self.db.inner.begin_write()?; let mut t_node = txn_write.open_table(T_NODE)?; t_node.insert(id, Ser(node))?; drop(t_node); txn_write.commit()?; Ok(()) } fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { self.parts .write() .unwrap() .entry(id.to_owned()) .or_default() .push((index_path.to_owned(), node)); Ok(()) } fn add_partial_node_ext( &self, id: &str, _index_path: &[usize], node: ExtendedNode, ) -> anyhow::Result<()> { // TODO merge this let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_EXTENDED)?; table.insert(id, Ser(node))?; drop(table); txn.commit()?; Ok(()) } fn finish(&self) -> anyhow::Result<()> { Ok(()) } }