use anyhow::{anyhow, Context}; use jellybase::database::{ redb::ReadableTable, tantivy::{doc, DateTime}, DataAcid, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT, }; use jellycommon::{ExtendedNode, Node}; use log::info; use std::collections::HashMap; use std::sync::RwLock; pub(crate) trait ImportStorage: Sync { fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>; fn add_partial_node_ext( &self, id: &str, index_path: &[usize], node: ExtendedNode, ) -> anyhow::Result<()>; fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>>; fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>; fn remove_prev_nodes(&self) -> anyhow::Result<()>; fn finish(&self) -> anyhow::Result<()>; } pub(crate) struct DatabaseStorage<'a> { pub db: &'a DataAcid, } impl<'a> DatabaseStorage<'a> { pub fn new(db: &'a DataAcid) -> Self { Self { db } } } impl ImportStorage for DatabaseStorage<'_> { fn remove_prev_nodes(&self) -> anyhow::Result<()> { info!("removing old nodes..."); let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE)?; table.retain(|_, _| false)?; drop(table); txn.commit()?; Ok(()) } fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>> { let txn = self.db.inner.begin_read()?; let table = txn.open_table(T_NODE_IMPORT)?; let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?; Ok(value.value().0) } fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { insert_complete_node(&self.db, id, node) } fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); parts.push((index_path.to_vec(), node.clone())); table.insert(id, Ser(parts))?; drop(table); txn.commit()?; Ok(()) } fn add_partial_node_ext( &self, id: &str, _index_path: &[usize], node: ExtendedNode, ) -> anyhow::Result<()> { // TODO merge this let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_EXTENDED)?; table.insert(id, Ser(node))?; drop(table); txn.commit()?; Ok(()) } fn finish(&self) -> anyhow::Result<()> { info!("clearing temporary node tree..."); let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; table.retain(|_, _| false)?; drop(table); txn.commit()?; self.db.node_index.writer.write().unwrap().commit()?; Ok(()) } } pub(crate) struct MemoryStorage<'a> { pub db: &'a DataAcid, pub parts: RwLock, Node)>>>, } impl<'a> MemoryStorage<'a> { pub fn new(db: &'a DataAcid) -> Self { Self { db, parts: Default::default(), } } } impl ImportStorage for MemoryStorage<'_> { fn remove_prev_nodes(&self) -> anyhow::Result<()> { info!("removing old nodes..."); let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE)?; table.retain(|_, _| false)?; drop(table); txn.commit()?; self.db .node_index .writer .read() .unwrap() .delete_all_documents()?; self.db.node_index.writer.write().unwrap().commit()?; Ok(()) } fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>> { Ok(self .parts .read() .unwrap() .get(id) .ok_or(anyhow!("node parts not found"))? .to_owned()) } fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { insert_complete_node(&self.db, id, node) } fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { self.parts .write() .unwrap() .entry(id.to_owned()) .or_default() .push((index_path.to_owned(), node)); Ok(()) } fn add_partial_node_ext( &self, id: &str, _index_path: &[usize], node: ExtendedNode, ) -> anyhow::Result<()> { // TODO merge this let txn = self.db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_EXTENDED)?; table.insert(id, Ser(node))?; drop(table); txn.commit()?; Ok(()) } fn finish(&self) -> anyhow::Result<()> { self.db.node_index.writer.write().unwrap().commit()?; Ok(()) } } fn insert_complete_node(db: &DataAcid, id: &str, node: Node) -> anyhow::Result<()> { let txn_write = db.inner.begin_write()?; let mut t_node = txn_write.open_table(T_NODE)?; t_node.insert(id, Ser(node.clone()))?; drop(t_node); txn_write.commit()?; db .node_index .writer .read() .unwrap() .add_document(doc!( db.node_index.id => node.public.id.unwrap_or_default(), db.node_index.title => node.public.title.unwrap_or_default(), db.node_index.description => node.public.description.unwrap_or_default(), db.node_index.releasedate => DateTime::from_timestamp_millis(node.public.release_date.unwrap_or_default()), db.node_index.f_index => node.public.index.unwrap_or_default() as u64, db.node_index.parent => node.public.path.last().cloned().unwrap_or_default(), )) .context("inserting document")?; Ok(()) }