aboutsummaryrefslogtreecommitdiff
path: root/import/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/db.rs')
-rw-r--r--import/src/db.rs208
1 files changed, 0 insertions, 208 deletions
diff --git a/import/src/db.rs b/import/src/db.rs
deleted file mode 100644
index 7a3636c..0000000
--- a/import/src/db.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use anyhow::{anyhow, Context};
-use jellybase::database::{
- redb::{ReadableTable, ReadableTableMetadata},
- tantivy::{doc, DateTime},
- DataAcid, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT,
-};
-use jellycommon::{ExtendedNode, Node};
-use log::info;
-use std::collections::HashMap;
-use std::sync::RwLock;
-
-pub(crate) trait ImportStorage: Sync {
- fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>;
- fn add_partial_node_ext(
- &self,
- id: &str,
- index_path: &[usize],
- node: ExtendedNode,
- ) -> anyhow::Result<()>;
-
- fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>>;
- fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>;
-
- fn pre_clean(&self) -> anyhow::Result<()>;
- fn remove_prev_nodes(&self) -> anyhow::Result<()>;
- fn finish(&self) -> anyhow::Result<()>;
-}
-
-pub(crate) struct DatabaseStorage<'a> {
- pub db: &'a DataAcid,
-}
-impl<'a> DatabaseStorage<'a> {
- pub fn new(db: &'a DataAcid) -> Self {
- Self { db }
- }
-}
-impl ImportStorage for DatabaseStorage<'_> {
- fn pre_clean(&self) -> anyhow::Result<()> {
- let txn = self.db.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
- if !table.is_empty()? {
- info!("clearing temporary node tree from an aborted last import...");
- table.retain(|_, _| false)?;
- }
- drop(table);
- txn.commit()?;
- Ok(())
- }
- fn remove_prev_nodes(&self) -> anyhow::Result<()> {
- info!("removing old nodes...");
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE)?;
- table.retain(|_, _| false)?;
- drop(table);
- txn.commit()?;
- Ok(())
- }
- fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
- let txn = self.db.inner.begin_read()?;
- let table = txn.open_table(T_NODE_IMPORT)?;
- let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?;
- Ok(value.value().0)
- }
- fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
- insert_complete_node(self.db, id, node)
- }
-
- fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
-
- let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
- parts.push((index_path.to_vec(), node.clone()));
- table.insert(id, Ser(parts))?;
-
- drop(table);
- txn.commit()?;
- Ok(())
- }
-
- fn add_partial_node_ext(
- &self,
- id: &str,
- _index_path: &[usize],
- node: ExtendedNode,
- ) -> anyhow::Result<()> {
- // TODO merge this
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_EXTENDED)?;
- table.insert(id, Ser(node))?;
- drop(table);
- txn.commit()?;
- Ok(())
- }
-
- fn finish(&self) -> anyhow::Result<()> {
- info!("clearing temporary node tree...");
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
- table.retain(|_, _| false)?;
- drop(table);
- txn.commit()?;
-
- self.db.node_index.writer.write().unwrap().commit()?;
- Ok(())
- }
-}
-
-pub type Parts = RwLock<HashMap<String, Vec<(Vec<usize>, Node)>>>;
-pub(crate) struct MemoryStorage<'a> {
- pub db: &'a DataAcid,
- pub parts: Parts,
-}
-impl<'a> MemoryStorage<'a> {
- pub fn new(db: &'a DataAcid) -> Self {
- Self {
- db,
- parts: Default::default(),
- }
- }
-}
-impl ImportStorage for MemoryStorage<'_> {
- fn pre_clean(&self) -> anyhow::Result<()> {
- Ok(())
- }
- fn remove_prev_nodes(&self) -> anyhow::Result<()> {
- info!("removing old nodes...");
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE)?;
- table.retain(|_, _| false)?;
- drop(table);
- txn.commit()?;
- self.db
- .node_index
- .writer
- .read()
- .unwrap()
- .delete_all_documents()?;
- self.db.node_index.writer.write().unwrap().commit()?;
- Ok(())
- }
- fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
- Ok(self
- .parts
- .read()
- .unwrap()
- .get(id)
- .ok_or(anyhow!("node parts not found"))?
- .to_owned())
- }
- fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
- insert_complete_node(self.db, id, node)
- }
-
- fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
- self.parts
- .write()
- .unwrap()
- .entry(id.to_owned())
- .or_default()
- .push((index_path.to_owned(), node));
- Ok(())
- }
-
- fn add_partial_node_ext(
- &self,
- id: &str,
- _index_path: &[usize],
- node: ExtendedNode,
- ) -> anyhow::Result<()> {
- // TODO merge this
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_EXTENDED)?;
- table.insert(id, Ser(node))?;
- drop(table);
- txn.commit()?;
- Ok(())
- }
-
- fn finish(&self) -> anyhow::Result<()> {
- self.db.node_index.writer.write().unwrap().commit()?;
- Ok(())
- }
-}
-
-fn insert_complete_node(db: &DataAcid, id: &str, node: Node) -> anyhow::Result<()> {
- let txn_write = db.inner.begin_write()?;
- let mut t_node = txn_write.open_table(T_NODE)?;
- t_node.insert(id, Ser(node.clone()))?;
- drop(t_node);
- txn_write.commit()?;
-
- db
- .node_index
- .writer
- .read()
- .unwrap()
- .add_document(doc!(
- db.node_index.id => node.public.id.unwrap_or_default(),
- db.node_index.title => node.public.title.unwrap_or_default(),
- db.node_index.description => node.public.description.unwrap_or_default(),
- db.node_index.releasedate => DateTime::from_timestamp_millis(node.public.release_date.unwrap_or_default()),
- db.node_index.f_index => node.public.index.unwrap_or_default() as u64,
- db.node_index.parent => node.public.path.last().cloned().unwrap_or_default(),
- ))
- .context("inserting document")?;
- Ok(())
-}