diff options
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r-- | import/src/lib.rs | 119 |
1 files changed, 79 insertions, 40 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index 692cf7f..a4d1611 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -13,7 +13,7 @@ use async_recursion::async_recursion; use futures::{executor::block_on, stream::FuturesUnordered, StreamExt}; use jellybase::{ cache::{async_cache_file, cache_memory}, - database::Database, + database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_IMPORT}, federation::Federation, AssetLocationExt, CONF, }; @@ -42,32 +42,54 @@ use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); -pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { +pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; - if !db.node_import.is_empty() { - info!("clearing temporary node tree from an aborted last import..."); - db.node_import.clear()?; + + { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + if !table.is_empty()? { + info!("clearing temporary node tree from an aborted last import..."); + table.drain::<&str>(..)?; + } + drop(table); + txn.commit()?; } info!("loading sources..."); import_path(CONF.library_path.clone(), vec![], db, fed) .await .context("indexing")?; info!("removing old nodes..."); - db.node.clear()?; + { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + } info!("merging nodes..."); merge_nodes(db).context("merging nodes")?; info!("generating paths..."); generate_node_paths(db).context("generating paths")?; info!("clearing temporary node tree..."); - db.node_import.clear()?; + { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + } info!("import completed"); drop(permit); Ok(()) } -pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { - for r in db.node_import.iter() { - let (id, mut nodes) = r?; +pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> { + let txn_read = db.inner.begin_read()?; + let t_node_import = txn_read.open_table(T_NODE_IMPORT)?; + for r in t_node_import.iter()? { + let (id, nodes) = r?; + let mut nodes = nodes.value().0; nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); @@ -77,27 +99,36 @@ pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { .reduce(|x, y| merge_node(x, y)) .unwrap(); - node.public.id = Some(id.clone()); + node.public.id = Some(id.value().to_owned()); node.public.path = vec![]; // will be reconstructed in the next pass - db.node.insert(&id, &node)?; + { + let txn_write = db.inner.begin_write()?; + let mut t_node = txn_write.open_table(T_NODE)?; + t_node.insert(id.value(), Ser(node))?; + drop(t_node); + txn_write.commit()?; + } } Ok(()) } -pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> { - fn traverse(db: &Database, c: String, mut path: Vec<String>) -> anyhow::Result<()> { - let node = db - .node - .update_and_fetch(&c, |mut nc| { - if let Some(nc) = &mut nc { - if nc.public.path.is_empty() { - nc.public.path = path.clone(); - } - } - nc - })? - .ok_or(anyhow!("node {c:?} missing"))?; +pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> { + fn traverse(db: &DataAcid, c: String, mut path: Vec<String>) -> anyhow::Result<()> { + let node = { + let txn = db.inner.begin_write()?; + let table = txn.open_table(T_NODE)?; + + let mut node = table.get(&*c)?.ok_or(anyhow!("your mum"))?.value().0; + + if node.public.path.is_empty() { + node.public.path = path.clone(); + } + + drop(table); + txn.commit()?; + node + }; path.push(c); for c in node.public.children { @@ -126,7 +157,7 @@ fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { pub async fn import_path( path: PathBuf, index_path: Vec<usize>, - db: &Database, + db: &DataAcid, fed: &Federation, ) -> anyhow::Result<()> { if path.is_dir() { @@ -190,15 +221,19 @@ async fn process_source( s: ImportSource, path: &Path, index_path: &[usize], - db: &Database, + db: &DataAcid, fed: &Federation, ) -> anyhow::Result<()> { - let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { - db.node_import.fetch_and_update(id, |l| { - let mut l = l.unwrap_or_default(); - l.push((index_path.to_vec(), n.clone())); - Some(l) - })?; + let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + + let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); + node.push((index_path.to_vec(), n.clone())); + table.insert(id, Ser(node))?; + + drop(table); + txn.commit()?; Ok(()) }; match s { @@ -497,16 +532,20 @@ static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); async fn import_remote( id: String, host: &str, - db: &Database, + db: &DataAcid, session: &Arc<Session>, index_path: &[usize], ) -> anyhow::Result<()> { - let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { - db.node_import.fetch_and_update(id, |l| { - let mut l = l.unwrap_or_default(); - l.push((index_path.to_vec(), n.clone())); - Some(l) - })?; + let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + + let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); + node.push((index_path.to_vec(), n.clone())); + table.insert(id, Ser(node))?; + + drop(table); + txn.commit()?; Ok(()) }; let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); |