aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-05-12 11:46:16 +0200
committermetamuffin <metamuffin@disroot.org>2024-05-12 11:46:16 +0200
commitcfaae9067c151d8db49b0fcbcaff04bc31176bd2 (patch)
tree4b2727167d58f58c06384b9302404b1d2f7d54e8 /import/src/lib.rs
parentbf0e9de8eb801ef58a3820b663d30bb55762970e (diff)
downloadjellything-cfaae9067c151d8db49b0fcbcaff04bc31176bd2.tar
jellything-cfaae9067c151d8db49b0fcbcaff04bc31176bd2.tar.bz2
jellything-cfaae9067c151d8db49b0fcbcaff04bc31176bd2.tar.zst
mostly ignore errors when importing
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs108
1 files changed, 69 insertions, 39 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 3adfd85..49f709a 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -9,7 +9,7 @@ pub mod infojson;
pub mod tmdb;
pub mod trakt;
-use anyhow::{anyhow, bail, Context, Ok};
+use anyhow::{anyhow, bail, Context, Error, Ok};
use async_recursion::async_recursion;
use base64::Engine;
use db::{DatabaseStorage, ImportStorage, MemoryStorage};
@@ -17,7 +17,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use jellybase::{
assetfed::AssetInner,
cache::{async_cache_file, cache_memory},
- database::{redb::ReadableTableMetadata, DataAcid, T_NODE_IMPORT},
+ database::DataAcid,
federation::Federation,
CONF, SECRETS,
};
@@ -43,11 +43,17 @@ use std::{
sync::{Arc, LazyLock},
};
use tmdb::{parse_release_date, Tmdb};
-use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking};
+use tokio::{
+ io::AsyncWriteExt,
+ sync::{RwLock, Semaphore},
+ task::spawn_blocking,
+};
use trakt::Trakt;
static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
+pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new());
+
struct Apis {
trakt: Option<Trakt>,
tmdb: Option<Tmdb>,
@@ -59,27 +65,22 @@ pub fn is_importing() -> bool {
pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> {
let permit = IMPORT_SEM.try_acquire()?;
- {
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
- if !table.is_empty()? {
- info!("clearing temporary node tree from an aborted last import...");
- table.retain(|_, _| false)?;
- }
- drop(table);
- txn.commit()?;
- }
let ap = Apis {
trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)),
tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
};
- if CONF.use_in_memory_import_storage {
- import_inner(&MemoryStorage::new(db), fed, &ap).await?;
+ let e = if CONF.use_in_memory_import_storage {
+ import_inner(&MemoryStorage::new(db), fed, &ap).await
} else {
- import_inner(&DatabaseStorage::new(db), fed, &ap).await?;
- }
+ import_inner(&DatabaseStorage::new(db), fed, &ap).await
+ };
+ let e = match e {
+ Result::Ok(e) => e,
+ Result::Err(e) => vec![e],
+ };
+ *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect();
drop(permit);
Ok(())
@@ -89,27 +90,37 @@ pub(crate) async fn import_inner(
db: &impl ImportStorage,
fed: &Federation,
ap: &Apis,
-) -> anyhow::Result<()> {
+) -> anyhow::Result<Vec<anyhow::Error>> {
+ db.pre_clean()?;
info!("loading sources...");
- import_path(CONF.library_path.clone(), vec![], db, fed, ap)
+ let mut errors = Vec::new();
+ match import_path(CONF.library_path.clone(), vec![], db, fed, ap)
.await
- .context("indexing")?;
+ .context("indexing")
+ {
+ Result::Ok(o) => errors.extend(o),
+ Result::Err(e) => errors.push(e),
+ };
db.remove_prev_nodes()?;
info!("merging nodes...");
- generate_node_paths(db).context("merging nodes")?;
+ match generate_node_paths(db).context("merging nodes") {
+ Result::Ok(o) => errors.extend(o),
+ Result::Err(e) => errors.push(e),
+ }
db.finish()?;
info!("import completed");
- Ok(())
+ Ok(errors)
}
-fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<()> {
+fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<Vec<Error>> {
// TODO mark nodes done to allow recursion
fn traverse(
db: &impl ImportStorage,
id: String,
mut path: Vec<String>,
parent_title: &str,
- ) -> anyhow::Result<()> {
+ ) -> anyhow::Result<Vec<Error>> {
+ let mut errors = Vec::new();
let node = {
let mut parts = db
.get_partial_parts(&id)
@@ -166,12 +177,14 @@ fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<()> {
path.push(id);
let ps = node.public.title.unwrap_or_default();
for c in node.public.children {
- traverse(db, c, path.clone(), &ps)?;
+ match traverse(db, c, path.clone(), &ps) {
+ Result::Ok(o) => errors.extend(o),
+ Result::Err(e) => errors.push(e),
+ }
}
- Ok(())
+ Ok(errors)
}
- traverse(db, "library".to_string(), vec![], "Root")?;
- Ok(())
+ traverse(db, "library".to_string(), vec![], "Root")
}
fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
@@ -194,7 +207,8 @@ async fn import_path(
db: &impl ImportStorage,
fed: &Federation,
ap: &Apis,
-) -> anyhow::Result<()> {
+) -> anyhow::Result<Vec<anyhow::Error>> {
+ let mut errors = Vec::new();
if path.is_dir() {
let mut children_paths = path
.read_dir()?
@@ -232,7 +246,10 @@ async fn import_path(
.collect();
while let Some(k) = children.next().await {
- k?
+ match k {
+ Result::Ok(o) => errors.extend(o),
+ Result::Err(e) => errors.push(e),
+ }
}
} else {
info!("reading {path:?}");
@@ -245,13 +262,16 @@ async fn import_path(
for (i, s) in opts.sources.into_iter().enumerate() {
index_path.push(i);
- process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap)
+ if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap)
.await
- .context(anyhow!("processing source in {path:?}"))?;
+ .context(anyhow!("processing source in {path:?}"))
+ {
+ errors.push(e)
+ }
index_path.pop();
}
}
- Ok(())
+ Ok(errors)
}
static SEM_IMPORT: Semaphore = Semaphore::const_new(2);
@@ -265,7 +285,8 @@ async fn process_source(
db: &impl ImportStorage,
fed: &Federation,
ap: &Apis,
-) -> anyhow::Result<()> {
+) -> anyhow::Result<Vec<anyhow::Error>> {
+ let mut errors = vec![];
match s {
ImportSource::Override(mut n) => {
if let Some(backdrop) = n.private.backdrop.clone() {
@@ -351,7 +372,7 @@ async fn process_source(
} {
let mut index_path = index_path.to_vec();
index_path.push(1);
- process_source(
+ match process_source(
id,
ImportSource::Tmdb { id: tid, kind },
path,
@@ -360,7 +381,11 @@ async fn process_source(
fed,
ap,
)
- .await?;
+ .await
+ {
+ Result::Ok(o) => errors.extend(o),
+ Result::Err(e) => errors.push(e),
+ }
}
}
}
@@ -420,7 +445,8 @@ async fn process_source(
{
let inf_id =
infer_id_from_path(&child_path).context("inferring child id")?;
- process_source(
+
+ match process_source(
inf_id.clone(),
ImportSource::Media {
path: mpath.join(f.file_name()),
@@ -435,7 +461,11 @@ async fn process_source(
ap,
)
.await
- .context(anyhow!("recursive media import: {:?}", f.path()))?;
+ .context(anyhow!("recursive media import: {:?}", f.path()))
+ {
+ Result::Ok(o) => errors.extend(o),
+ Result::Err(e) => errors.push(e),
+ };
node.public.children.push(inf_id);
}
}
@@ -581,7 +611,7 @@ async fn process_source(
)?;
}
}
- Ok(())
+ Ok(errors)
}
const RE_YOUTUBE_ID: LazyLock<Regex> =