aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs119
1 files changed, 79 insertions, 40 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 692cf7f..a4d1611 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -13,7 +13,7 @@ use async_recursion::async_recursion;
use futures::{executor::block_on, stream::FuturesUnordered, StreamExt};
use jellybase::{
cache::{async_cache_file, cache_memory},
- database::Database,
+ database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_IMPORT},
federation::Federation,
AssetLocationExt, CONF,
};
@@ -42,32 +42,54 @@ use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking};
static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
-pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> {
+pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> {
let permit = IMPORT_SEM.try_acquire()?;
- if !db.node_import.is_empty() {
- info!("clearing temporary node tree from an aborted last import...");
- db.node_import.clear()?;
+
+ {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+ if !table.is_empty()? {
+ info!("clearing temporary node tree from an aborted last import...");
+ table.drain::<&str>(..)?;
+ }
+ drop(table);
+ txn.commit()?;
}
info!("loading sources...");
import_path(CONF.library_path.clone(), vec![], db, fed)
.await
.context("indexing")?;
info!("removing old nodes...");
- db.node.clear()?;
+ {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ }
info!("merging nodes...");
merge_nodes(db).context("merging nodes")?;
info!("generating paths...");
generate_node_paths(db).context("generating paths")?;
info!("clearing temporary node tree...");
- db.node_import.clear()?;
+ {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ }
info!("import completed");
drop(permit);
Ok(())
}
-pub fn merge_nodes(db: &Database) -> anyhow::Result<()> {
- for r in db.node_import.iter() {
- let (id, mut nodes) = r?;
+pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> {
+ let txn_read = db.inner.begin_read()?;
+ let t_node_import = txn_read.open_table(T_NODE_IMPORT)?;
+ for r in t_node_import.iter()? {
+ let (id, nodes) = r?;
+ let mut nodes = nodes.value().0;
nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
@@ -77,27 +99,36 @@ pub fn merge_nodes(db: &Database) -> anyhow::Result<()> {
.reduce(|x, y| merge_node(x, y))
.unwrap();
- node.public.id = Some(id.clone());
+ node.public.id = Some(id.value().to_owned());
node.public.path = vec![]; // will be reconstructed in the next pass
- db.node.insert(&id, &node)?;
+ {
+ let txn_write = db.inner.begin_write()?;
+ let mut t_node = txn_write.open_table(T_NODE)?;
+ t_node.insert(id.value(), Ser(node))?;
+ drop(t_node);
+ txn_write.commit()?;
+ }
}
Ok(())
}
-pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> {
- fn traverse(db: &Database, c: String, mut path: Vec<String>) -> anyhow::Result<()> {
- let node = db
- .node
- .update_and_fetch(&c, |mut nc| {
- if let Some(nc) = &mut nc {
- if nc.public.path.is_empty() {
- nc.public.path = path.clone();
- }
- }
- nc
- })?
- .ok_or(anyhow!("node {c:?} missing"))?;
+pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> {
+ fn traverse(db: &DataAcid, c: String, mut path: Vec<String>) -> anyhow::Result<()> {
+ let node = {
+ let txn = db.inner.begin_write()?;
+ let table = txn.open_table(T_NODE)?;
+
+ let mut node = table.get(&*c)?.ok_or(anyhow!("your mum"))?.value().0;
+
+ if node.public.path.is_empty() {
+ node.public.path = path.clone();
+ }
+
+ drop(table);
+ txn.commit()?;
+ node
+ };
path.push(c);
for c in node.public.children {
@@ -126,7 +157,7 @@ fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
pub async fn import_path(
path: PathBuf,
index_path: Vec<usize>,
- db: &Database,
+ db: &DataAcid,
fed: &Federation,
) -> anyhow::Result<()> {
if path.is_dir() {
@@ -190,15 +221,19 @@ async fn process_source(
s: ImportSource,
path: &Path,
index_path: &[usize],
- db: &Database,
+ db: &DataAcid,
fed: &Federation,
) -> anyhow::Result<()> {
- let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
- db.node_import.fetch_and_update(id, |l| {
- let mut l = l.unwrap_or_default();
- l.push((index_path.to_vec(), n.clone()));
- Some(l)
- })?;
+ let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+
+ let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
+ node.push((index_path.to_vec(), n.clone()));
+ table.insert(id, Ser(node))?;
+
+ drop(table);
+ txn.commit()?;
Ok(())
};
match s {
@@ -497,16 +532,20 @@ static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16);
async fn import_remote(
id: String,
host: &str,
- db: &Database,
+ db: &DataAcid,
session: &Arc<Session>,
index_path: &[usize],
) -> anyhow::Result<()> {
- let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
- db.node_import.fetch_and_update(id, |l| {
- let mut l = l.unwrap_or_default();
- l.push((index_path.to_vec(), n.clone()));
- Some(l)
- })?;
+ let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+
+ let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
+ node.push((index_path.to_vec(), n.clone()));
+ table.insert(id, Ser(node))?;
+
+ drop(table);
+ txn.commit()?;
Ok(())
};
let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();