From cfaae9067c151d8db49b0fcbcaff04bc31176bd2 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 12 May 2024 11:46:16 +0200 Subject: mostly ignore errors when importing --- import/src/lib.rs | 108 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 39 deletions(-) (limited to 'import/src/lib.rs') diff --git a/import/src/lib.rs b/import/src/lib.rs index 3adfd85..49f709a 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -9,7 +9,7 @@ pub mod infojson; pub mod tmdb; pub mod trakt; -use anyhow::{anyhow, bail, Context, Ok}; +use anyhow::{anyhow, bail, Context, Error, Ok}; use async_recursion::async_recursion; use base64::Engine; use db::{DatabaseStorage, ImportStorage, MemoryStorage}; @@ -17,7 +17,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use jellybase::{ assetfed::AssetInner, cache::{async_cache_file, cache_memory}, - database::{redb::ReadableTableMetadata, DataAcid, T_NODE_IMPORT}, + database::DataAcid, federation::Federation, CONF, SECRETS, }; @@ -43,11 +43,17 @@ use std::{ sync::{Arc, LazyLock}, }; use tmdb::{parse_release_date, Tmdb}; -use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; +use tokio::{ + io::AsyncWriteExt, + sync::{RwLock, Semaphore}, + task::spawn_blocking, +}; use trakt::Trakt; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); +pub static IMPORT_ERRORS: RwLock> = RwLock::const_new(Vec::new()); + struct Apis { trakt: Option, tmdb: Option, @@ -59,27 +65,22 @@ pub fn is_importing() -> bool { pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; - { - let txn = db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - if !table.is_empty()? { - info!("clearing temporary node tree from an aborted last import..."); - table.retain(|_, _| false)?; - } - drop(table); - txn.commit()?; - } let ap = Apis { trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), }; - if CONF.use_in_memory_import_storage { - import_inner(&MemoryStorage::new(db), fed, &ap).await?; + let e = if CONF.use_in_memory_import_storage { + import_inner(&MemoryStorage::new(db), fed, &ap).await } else { - import_inner(&DatabaseStorage::new(db), fed, &ap).await?; - } + import_inner(&DatabaseStorage::new(db), fed, &ap).await + }; + let e = match e { + Result::Ok(e) => e, + Result::Err(e) => vec![e], + }; + *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); drop(permit); Ok(()) @@ -89,27 +90,37 @@ pub(crate) async fn import_inner( db: &impl ImportStorage, fed: &Federation, ap: &Apis, -) -> anyhow::Result<()> { +) -> anyhow::Result> { + db.pre_clean()?; info!("loading sources..."); - import_path(CONF.library_path.clone(), vec![], db, fed, ap) + let mut errors = Vec::new(); + match import_path(CONF.library_path.clone(), vec![], db, fed, ap) .await - .context("indexing")?; + .context("indexing") + { + Result::Ok(o) => errors.extend(o), + Result::Err(e) => errors.push(e), + }; db.remove_prev_nodes()?; info!("merging nodes..."); - generate_node_paths(db).context("merging nodes")?; + match generate_node_paths(db).context("merging nodes") { + Result::Ok(o) => errors.extend(o), + Result::Err(e) => errors.push(e), + } db.finish()?; info!("import completed"); - Ok(()) + Ok(errors) } -fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<()> { +fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result> { // TODO mark nodes done to allow recursion fn traverse( db: &impl ImportStorage, id: String, mut path: Vec, parent_title: &str, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { + let mut errors = Vec::new(); let node = { let mut parts = db .get_partial_parts(&id) @@ -166,12 +177,14 @@ fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<()> { path.push(id); let ps = node.public.title.unwrap_or_default(); for c in node.public.children { - traverse(db, c, path.clone(), &ps)?; + match traverse(db, c, path.clone(), &ps) { + Result::Ok(o) => errors.extend(o), + Result::Err(e) => errors.push(e), + } } - Ok(()) + Ok(errors) } - traverse(db, "library".to_string(), vec![], "Root")?; - Ok(()) + traverse(db, "library".to_string(), vec![], "Root") } fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { @@ -194,7 +207,8 @@ async fn import_path( db: &impl ImportStorage, fed: &Federation, ap: &Apis, -) -> anyhow::Result<()> { +) -> anyhow::Result> { + let mut errors = Vec::new(); if path.is_dir() { let mut children_paths = path .read_dir()? @@ -232,7 +246,10 @@ async fn import_path( .collect(); while let Some(k) = children.next().await { - k? + match k { + Result::Ok(o) => errors.extend(o), + Result::Err(e) => errors.push(e), + } } } else { info!("reading {path:?}"); @@ -245,13 +262,16 @@ async fn import_path( for (i, s) in opts.sources.into_iter().enumerate() { index_path.push(i); - process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) + if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) .await - .context(anyhow!("processing source in {path:?}"))?; + .context(anyhow!("processing source in {path:?}")) + { + errors.push(e) + } index_path.pop(); } } - Ok(()) + Ok(errors) } static SEM_IMPORT: Semaphore = Semaphore::const_new(2); @@ -265,7 +285,8 @@ async fn process_source( db: &impl ImportStorage, fed: &Federation, ap: &Apis, -) -> anyhow::Result<()> { +) -> anyhow::Result> { + let mut errors = vec![]; match s { ImportSource::Override(mut n) => { if let Some(backdrop) = n.private.backdrop.clone() { @@ -351,7 +372,7 @@ async fn process_source( } { let mut index_path = index_path.to_vec(); index_path.push(1); - process_source( + match process_source( id, ImportSource::Tmdb { id: tid, kind }, path, @@ -360,7 +381,11 @@ async fn process_source( fed, ap, ) - .await?; + .await + { + Result::Ok(o) => errors.extend(o), + Result::Err(e) => errors.push(e), + } } } } @@ -420,7 +445,8 @@ async fn process_source( { let inf_id = infer_id_from_path(&child_path).context("inferring child id")?; - process_source( + + match process_source( inf_id.clone(), ImportSource::Media { path: mpath.join(f.file_name()), @@ -435,7 +461,11 @@ async fn process_source( ap, ) .await - .context(anyhow!("recursive media import: {:?}", f.path()))?; + .context(anyhow!("recursive media import: {:?}", f.path())) + { + Result::Ok(o) => errors.extend(o), + Result::Err(e) => errors.push(e), + }; node.public.children.push(inf_id); } } @@ -581,7 +611,7 @@ async fn process_source( )?; } } - Ok(()) + Ok(errors) } const RE_YOUTUBE_ID: LazyLock = -- cgit v1.2.3-70-g09d2