diff options
author | metamuffin <metamuffin@disroot.org> | 2024-04-15 10:48:31 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-04-15 10:48:31 +0200 |
commit | 0acf32113051addd1aba4a6f823b9c918d839e04 (patch) | |
tree | f63adbf7a89a6e97e26023bb18772f5c3dd411ff /import/src/db.rs | |
parent | fa987c5f4a2420c6daf0c8c00514a9564991a57e (diff) | |
download | jellything-0acf32113051addd1aba4a6f823b9c918d839e04.tar jellything-0acf32113051addd1aba4a6f823b9c918d839e04.tar.bz2 jellything-0acf32113051addd1aba4a6f823b9c918d839e04.tar.zst |
import: optionally use ram for storage. might be faster
Diffstat (limited to 'import/src/db.rs')
-rw-r--r-- | import/src/db.rs | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/import/src/db.rs b/import/src/db.rs new file mode 100644 index 0000000..88c5601 --- /dev/null +++ b/import/src/db.rs @@ -0,0 +1,165 @@ +use std::collections::HashMap; + +use anyhow::anyhow; +use jellybase::database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT}; +use jellycommon::{ExtendedNode, Node}; +use log::info; +use std::sync::RwLock; + +pub(crate) trait ImportStorage: Sync { + fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>; + fn add_partial_node_ext( + &self, + id: &str, + index_path: &[usize], + node: ExtendedNode, + ) -> anyhow::Result<()>; + + fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>>; + fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>; + + fn remove_prev_nodes(&self) -> anyhow::Result<()>; + fn finish(&self) -> anyhow::Result<()>; +} + +pub(crate) struct DatabaseStorage<'a> { + pub db: &'a DataAcid, +} +impl<'a> DatabaseStorage<'a> { + pub fn new(db: &'a DataAcid) -> Self { + Self { db } + } +} +impl ImportStorage for DatabaseStorage<'_> { + fn remove_prev_nodes(&self) -> anyhow::Result<()> { + info!("removing old nodes..."); + let txn = self.db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + Ok(()) + } + fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> { + let txn = self.db.inner.begin_read()?; + let table = txn.open_table(T_NODE_IMPORT)?; + let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?; + Ok(value.value().0) + } + fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { + let txn_write = self.db.inner.begin_write()?; + let mut t_node = txn_write.open_table(T_NODE)?; + t_node.insert(id, Ser(node))?; + drop(t_node); + txn_write.commit()?; + Ok(()) + } + + fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { + let txn = self.db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + + let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); + parts.push((index_path.to_vec(), node.clone())); + table.insert(id, Ser(parts))?; + + drop(table); + txn.commit()?; + Ok(()) + } + + fn add_partial_node_ext( + &self, + id: &str, + _index_path: &[usize], + node: ExtendedNode, + ) -> anyhow::Result<()> { + // TODO merge this + let txn = self.db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_EXTENDED)?; + table.insert(id, Ser(node))?; + drop(table); + txn.commit()?; + Ok(()) + } + + fn finish(&self) -> anyhow::Result<()> { + info!("clearing temporary node tree..."); + let txn = self.db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + Ok(()) + } +} + +pub(crate) struct MemoryStorage<'a> { + pub db: &'a DataAcid, + pub parts: RwLock<HashMap<String, Vec<(Vec<usize>, Node)>>>, +} +impl<'a> MemoryStorage<'a> { + pub fn new(db: &'a DataAcid) -> Self { + Self { + db, + parts: Default::default(), + } + } +} +impl ImportStorage for MemoryStorage<'_> { + fn remove_prev_nodes(&self) -> anyhow::Result<()> { + info!("removing old nodes..."); + let txn = self.db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + Ok(()) + } + fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> { + Ok(self + .parts + .read() + .unwrap() + .get(id) + .ok_or(anyhow!("node parts not found"))? + .to_owned()) + } + fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { + let txn_write = self.db.inner.begin_write()?; + let mut t_node = txn_write.open_table(T_NODE)?; + t_node.insert(id, Ser(node))?; + drop(t_node); + txn_write.commit()?; + Ok(()) + } + + fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { + self.parts + .write() + .unwrap() + .entry(id.to_owned()) + .or_default() + .push((index_path.to_owned(), node)); + Ok(()) + } + + fn add_partial_node_ext( + &self, + id: &str, + _index_path: &[usize], + node: ExtendedNode, + ) -> anyhow::Result<()> { + // TODO merge this + let txn = self.db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_EXTENDED)?; + table.insert(id, Ser(node))?; + drop(table); + txn.commit()?; + Ok(()) + } + + fn finish(&self) -> anyhow::Result<()> { + Ok(()) + } +} |