diff options
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r-- | import/src/lib.rs | 198 |
1 files changed, 65 insertions, 133 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index 274b63d..40648d0 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -4,6 +4,7 @@ Copyright (C) 2024 metamuffin <metamuffin.org> */ #![feature(lazy_cell)] +pub mod db; pub mod infojson; pub mod tmdb; pub mod trakt; @@ -11,11 +12,12 @@ pub mod trakt; use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; use base64::Engine; +use db::{DatabaseStorage, ImportStorage, MemoryStorage}; use futures::{stream::FuturesUnordered, StreamExt}; use jellybase::{ assetfed::AssetInner, cache::{async_cache_file, cache_memory}, - database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT}, + database::{DataAcid, ReadableTable, T_NODE_IMPORT}, federation::Federation, CONF, SECRETS, }; @@ -73,94 +75,68 @@ pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), }; + if CONF.use_in_memory_import_storage { + import_inner(&MemoryStorage::new(db), fed, &ap).await?; + } else { + import_inner(&DatabaseStorage::new(db), fed, &ap).await?; + } + + drop(permit); + Ok(()) +} + +pub(crate) async fn import_inner( + db: &impl ImportStorage, + fed: &Federation, + ap: &Apis, +) -> anyhow::Result<()> { info!("loading sources..."); - import_path(CONF.library_path.clone(), vec![], db, fed, &ap) + import_path(CONF.library_path.clone(), vec![], db, fed, ap) .await .context("indexing")?; - info!("removing old nodes..."); - { - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE)?; - table.drain::<&str>(..)?; - drop(table); - txn.commit()?; - } + db.remove_prev_nodes()?; info!("merging nodes..."); - merge_nodes(db).context("merging nodes")?; - info!("generating paths..."); - generate_node_paths(db).context("generating paths")?; - info!("clearing temporary node tree..."); - { - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - table.drain::<&str>(..)?; - drop(table); - txn.commit()?; - } + generate_node_paths(db).context("merging nodes")?; + db.finish()?; info!("import completed"); - drop(permit); Ok(()) } -pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> { - let txn_read = db.inner.begin_read()?; - let t_node_import = txn_read.open_table(T_NODE_IMPORT)?; - for r in t_node_import.iter()? { - let (id, nodes) = r?; - let mut nodes = nodes.value().0; +fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<()> { + // TODO mark nodes done to allow recursion + fn traverse( + db: &impl ImportStorage, + id: String, + mut path: Vec<String>, + parent_title: &str, + ) -> anyhow::Result<()> { + let node = { + let mut parts = db + .get_partial_parts(&id) + .context(anyhow!("path = {path:?}"))?; - nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); + parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); - let mut node = nodes - .into_iter() - .map(|(_, x)| x) - .reduce(|x, y| merge_node(x, y).unwrap()) - .unwrap(); + let mut node = parts + .into_iter() + .map(|(_, x)| x) + .reduce(|x, y| merge_node(x, y).unwrap()) + .unwrap(); - node.public.id = Some(id.value().to_owned()); - node.public.path = vec![]; // will be reconstructed in the next pass - node.public.federated = None; + node.public.id = Some(id.to_owned()); + node.public.path = vec![]; // will be reconstructed in the next pass + node.public.federated = None; - // TODO this discardes a lot of information. maybe change this. - if let Some(media) = &node.public.media { - for t in &media.tracks { - if let Some(host) = t.federated.first() { - if host != &CONF.hostname { - node.public.federated = Some(host.to_string()) + // TODO this discardes a lot of information. maybe change this. + if let Some(media) = &node.public.media { + for t in &media.tracks { + if let Some(host) = t.federated.first() { + if host != &CONF.hostname { + node.public.federated = Some(host.to_string()) + } } } } - } - - { - let txn_write = db.inner.begin_write()?; - let mut t_node = txn_write.open_table(T_NODE)?; - t_node.insert(id.value(), Ser(node))?; - drop(t_node); - txn_write.commit()?; - } - } - Ok(()) -} - -pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> { - fn traverse( - db: &DataAcid, - c: String, - mut path: Vec<String>, - parent_title: &str, - ) -> anyhow::Result<()> { - let node = { - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE)?; - - let mut node = table - .get(&*c)? - .ok_or(anyhow!( - "missing child when generating paths: {c:?} at {path:?}" - ))? - .value() - .0; if node.public.path.is_empty() { node.public.path = path.clone(); @@ -182,14 +158,12 @@ pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> { _ => None, }; - table.insert(c.as_str(), Ser(node.clone()))?; + db.insert_complete_node(&id, node.clone())?; - drop(table); - txn.commit()?; node }; - path.push(c); + path.push(id); let ps = node.public.title.unwrap_or_default(); for c in node.public.children { traverse(db, c, path.clone(), &ps)?; @@ -217,7 +191,7 @@ fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { async fn import_path( path: PathBuf, mut index_path: Vec<usize>, - db: &DataAcid, + db: &impl ImportStorage, fed: &Federation, ap: &Apis, ) -> anyhow::Result<()> { @@ -288,31 +262,10 @@ async fn process_source( s: ImportSource, path: &Path, index_path: &[usize], - db: &DataAcid, + db: &impl ImportStorage, fed: &Federation, ap: &Apis, ) -> anyhow::Result<()> { - let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - - let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); - node.push((index_path.to_vec(), n.clone())); - table.insert(id, Ser(node))?; - - drop(table); - txn.commit()?; - Ok(()) - }; - let insert_node_ext = move |id: &str, n: ExtendedNode| -> anyhow::Result<()> { - // TODO merge this - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_EXTENDED)?; - table.insert(id, Ser(n))?; - drop(table); - txn.commit()?; - Ok(()) - }; match s { ImportSource::Override(mut n) => { if let Some(backdrop) = n.private.backdrop.clone() { @@ -321,7 +274,7 @@ async fn process_source( if let Some(poster) = n.private.poster.clone() { n.public.poster = Some(AssetInner::Library(poster).ser()); } - insert_node(&id, n)? + db.add_partial_node(&id, index_path, n)? } ImportSource::Trakt { id: tid, kind } => { info!("trakt {id}"); @@ -386,8 +339,8 @@ async fn process_source( } } } - insert_node(&id, node)?; - insert_node_ext(&id, node_ext)?; + db.add_partial_node(&id, index_path, node)?; + db.add_partial_node_ext(&id, index_path, node_ext)?; if let Some(tid) = trakt_object.ids.tmdb { if let Some(kind) = match kind { @@ -441,7 +394,7 @@ async fn process_source( parse_release_date(&date).context("parsing release date")?; } - insert_node(&id, node)?; + db.add_partial_node(&id, index_path, node)?; } ImportSource::Media { path: mpath, @@ -486,7 +439,7 @@ async fn process_source( node.public.children.push(inf_id); } } - insert_node(&id, node)?; + db.add_partial_node(&id, index_path, node)?; } else if abspath.is_file() { let _permit = SEM_IMPORT.acquire().await.unwrap(); let metadata = { @@ -579,7 +532,7 @@ async fn process_source( } } drop(_permit); - insert_node(&id, node)?; + db.add_partial_node(&id, index_path, node)?; } else { warn!("non file/dir import ignored: {abspath:?}") } @@ -615,8 +568,9 @@ async fn process_source( children.push(opts.id); } } - insert_node( + db.add_partial_node( &id, + index_path, Node { private: NodePrivate::default(), public: NodePublic { @@ -771,32 +725,10 @@ static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); async fn import_remote( id: String, host: &str, - db: &DataAcid, + db: &impl ImportStorage, session: &Arc<Session>, index_path: &[usize], ) -> anyhow::Result<()> { - let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - - let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); - node.push((index_path.to_vec(), n.clone())); - table.insert(id, Ser(node))?; - - drop(table); - txn.commit()?; - Ok(()) - }; - let insert_node_ext = move |id: &str, n: ExtendedNode| -> anyhow::Result<()> { - // TODO merge this - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_EXTENDED)?; - table.insert(id, Ser(n))?; - drop(table); - txn.commit()?; - Ok(()) - }; - let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); info!("loading federated node {id:?}"); @@ -837,8 +769,8 @@ async fn import_remote( } debug!("adding {id}"); - insert_node(&id, node.clone())?; - insert_node_ext(&id, node_ext)?; + db.add_partial_node(&id, index_path, node.clone())?; + db.add_partial_node_ext(&id, index_path, node_ext)?; let mut children: FuturesUnordered<_> = node .public |