aboutsummaryrefslogtreecommitdiff
path: root/import/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/db.rs')
-rw-r--r--import/src/db.rs165
1 files changed, 165 insertions, 0 deletions
diff --git a/import/src/db.rs b/import/src/db.rs
new file mode 100644
index 0000000..88c5601
--- /dev/null
+++ b/import/src/db.rs
@@ -0,0 +1,165 @@
+use std::collections::HashMap;
+
+use anyhow::anyhow;
+use jellybase::database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT};
+use jellycommon::{ExtendedNode, Node};
+use log::info;
+use std::sync::RwLock;
+
+pub(crate) trait ImportStorage: Sync {
+ fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>;
+ fn add_partial_node_ext(
+ &self,
+ id: &str,
+ index_path: &[usize],
+ node: ExtendedNode,
+ ) -> anyhow::Result<()>;
+
+ fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>>;
+ fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>;
+
+ fn remove_prev_nodes(&self) -> anyhow::Result<()>;
+ fn finish(&self) -> anyhow::Result<()>;
+}
+
+pub(crate) struct DatabaseStorage<'a> {
+ pub db: &'a DataAcid,
+}
+impl<'a> DatabaseStorage<'a> {
+ pub fn new(db: &'a DataAcid) -> Self {
+ Self { db }
+ }
+}
+impl ImportStorage for DatabaseStorage<'_> {
+ fn remove_prev_nodes(&self) -> anyhow::Result<()> {
+ info!("removing old nodes...");
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+ fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
+ let txn = self.db.inner.begin_read()?;
+ let table = txn.open_table(T_NODE_IMPORT)?;
+ let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?;
+ Ok(value.value().0)
+ }
+ fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
+ let txn_write = self.db.inner.begin_write()?;
+ let mut t_node = txn_write.open_table(T_NODE)?;
+ t_node.insert(id, Ser(node))?;
+ drop(t_node);
+ txn_write.commit()?;
+ Ok(())
+ }
+
+ fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+
+ let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
+ parts.push((index_path.to_vec(), node.clone()));
+ table.insert(id, Ser(parts))?;
+
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ fn add_partial_node_ext(
+ &self,
+ id: &str,
+ _index_path: &[usize],
+ node: ExtendedNode,
+ ) -> anyhow::Result<()> {
+ // TODO merge this
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_EXTENDED)?;
+ table.insert(id, Ser(node))?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ fn finish(&self) -> anyhow::Result<()> {
+ info!("clearing temporary node tree...");
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+}
+
+pub(crate) struct MemoryStorage<'a> {
+ pub db: &'a DataAcid,
+ pub parts: RwLock<HashMap<String, Vec<(Vec<usize>, Node)>>>,
+}
+impl<'a> MemoryStorage<'a> {
+ pub fn new(db: &'a DataAcid) -> Self {
+ Self {
+ db,
+ parts: Default::default(),
+ }
+ }
+}
+impl ImportStorage for MemoryStorage<'_> {
+ fn remove_prev_nodes(&self) -> anyhow::Result<()> {
+ info!("removing old nodes...");
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+ fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
+ Ok(self
+ .parts
+ .read()
+ .unwrap()
+ .get(id)
+ .ok_or(anyhow!("node parts not found"))?
+ .to_owned())
+ }
+ fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
+ let txn_write = self.db.inner.begin_write()?;
+ let mut t_node = txn_write.open_table(T_NODE)?;
+ t_node.insert(id, Ser(node))?;
+ drop(t_node);
+ txn_write.commit()?;
+ Ok(())
+ }
+
+ fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
+ self.parts
+ .write()
+ .unwrap()
+ .entry(id.to_owned())
+ .or_default()
+ .push((index_path.to_owned(), node));
+ Ok(())
+ }
+
+ fn add_partial_node_ext(
+ &self,
+ id: &str,
+ _index_path: &[usize],
+ node: ExtendedNode,
+ ) -> anyhow::Result<()> {
+ // TODO merge this
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_EXTENDED)?;
+ table.insert(id, Ser(node))?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ fn finish(&self) -> anyhow::Result<()> {
+ Ok(())
+ }
+}