diff options
Diffstat (limited to 'import/src/db.rs')
-rw-r--r-- | import/src/db.rs | 208 |
1 files changed, 0 insertions, 208 deletions
diff --git a/import/src/db.rs b/import/src/db.rs deleted file mode 100644 index 7a3636c..0000000 --- a/import/src/db.rs +++ /dev/null @@ -1,208 +0,0 @@ -use anyhow::{anyhow, Context}; -use jellybase::database::{ - redb::{ReadableTable, ReadableTableMetadata}, - tantivy::{doc, DateTime}, - DataAcid, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT, -}; -use jellycommon::{ExtendedNode, Node}; -use log::info; -use std::collections::HashMap; -use std::sync::RwLock; - -pub(crate) trait ImportStorage: Sync { - fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>; - fn add_partial_node_ext( - &self, - id: &str, - index_path: &[usize], - node: ExtendedNode, - ) -> anyhow::Result<()>; - - fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>>; - fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>; - - fn pre_clean(&self) -> anyhow::Result<()>; - fn remove_prev_nodes(&self) -> anyhow::Result<()>; - fn finish(&self) -> anyhow::Result<()>; -} - -pub(crate) struct DatabaseStorage<'a> { - pub db: &'a DataAcid, -} -impl<'a> DatabaseStorage<'a> { - pub fn new(db: &'a DataAcid) -> Self { - Self { db } - } -} -impl ImportStorage for DatabaseStorage<'_> { - fn pre_clean(&self) -> anyhow::Result<()> { - let txn = self.db.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - if !table.is_empty()? { - info!("clearing temporary node tree from an aborted last import..."); - table.retain(|_, _| false)?; - } - drop(table); - txn.commit()?; - Ok(()) - } - fn remove_prev_nodes(&self) -> anyhow::Result<()> { - info!("removing old nodes..."); - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE)?; - table.retain(|_, _| false)?; - drop(table); - txn.commit()?; - Ok(()) - } - fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> { - let txn = self.db.inner.begin_read()?; - let table = txn.open_table(T_NODE_IMPORT)?; - let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?; - Ok(value.value().0) - } - fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { - insert_complete_node(self.db, id, node) - } - - fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - - let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); - parts.push((index_path.to_vec(), node.clone())); - table.insert(id, Ser(parts))?; - - drop(table); - txn.commit()?; - Ok(()) - } - - fn add_partial_node_ext( - &self, - id: &str, - _index_path: &[usize], - node: ExtendedNode, - ) -> anyhow::Result<()> { - // TODO merge this - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_EXTENDED)?; - table.insert(id, Ser(node))?; - drop(table); - txn.commit()?; - Ok(()) - } - - fn finish(&self) -> anyhow::Result<()> { - info!("clearing temporary node tree..."); - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - table.retain(|_, _| false)?; - drop(table); - txn.commit()?; - - self.db.node_index.writer.write().unwrap().commit()?; - Ok(()) - } -} - -pub type Parts = RwLock<HashMap<String, Vec<(Vec<usize>, Node)>>>; -pub(crate) struct MemoryStorage<'a> { - pub db: &'a DataAcid, - pub parts: Parts, -} -impl<'a> MemoryStorage<'a> { - pub fn new(db: &'a DataAcid) -> Self { - Self { - db, - parts: Default::default(), - } - } -} -impl ImportStorage for MemoryStorage<'_> { - fn pre_clean(&self) -> anyhow::Result<()> { - Ok(()) - } - fn remove_prev_nodes(&self) -> anyhow::Result<()> { - info!("removing old nodes..."); - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE)?; - table.retain(|_, _| false)?; - drop(table); - txn.commit()?; - self.db - .node_index - .writer - .read() - .unwrap() - .delete_all_documents()?; - self.db.node_index.writer.write().unwrap().commit()?; - Ok(()) - } - fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> { - Ok(self - .parts - .read() - .unwrap() - .get(id) - .ok_or(anyhow!("node parts not found"))? - .to_owned()) - } - fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { - insert_complete_node(self.db, id, node) - } - - fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { - self.parts - .write() - .unwrap() - .entry(id.to_owned()) - .or_default() - .push((index_path.to_owned(), node)); - Ok(()) - } - - fn add_partial_node_ext( - &self, - id: &str, - _index_path: &[usize], - node: ExtendedNode, - ) -> anyhow::Result<()> { - // TODO merge this - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_EXTENDED)?; - table.insert(id, Ser(node))?; - drop(table); - txn.commit()?; - Ok(()) - } - - fn finish(&self) -> anyhow::Result<()> { - self.db.node_index.writer.write().unwrap().commit()?; - Ok(()) - } -} - -fn insert_complete_node(db: &DataAcid, id: &str, node: Node) -> anyhow::Result<()> { - let txn_write = db.inner.begin_write()?; - let mut t_node = txn_write.open_table(T_NODE)?; - t_node.insert(id, Ser(node.clone()))?; - drop(t_node); - txn_write.commit()?; - - db - .node_index - .writer - .read() - .unwrap() - .add_document(doc!( - db.node_index.id => node.public.id.unwrap_or_default(), - db.node_index.title => node.public.title.unwrap_or_default(), - db.node_index.description => node.public.description.unwrap_or_default(), - db.node_index.releasedate => DateTime::from_timestamp_millis(node.public.release_date.unwrap_or_default()), - db.node_index.f_index => node.public.index.unwrap_or_default() as u64, - db.node_index.parent => node.public.path.last().cloned().unwrap_or_default(), - )) - .context("inserting document")?; - Ok(()) -} |