aboutsummaryrefslogtreecommitdiff
path: root/import
diff options
context:
space:
mode:
Diffstat (limited to 'import')
-rw-r--r--import/src/lib.rs30
-rw-r--r--import/src/plugins/infojson.rs7
-rw-r--r--import/src/plugins/misc.rs13
-rw-r--r--import/src/plugins/trakt.rs4
4 files changed, 42 insertions, 12 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 3b7c8fc..8e4c702 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -24,7 +24,10 @@ use jellyremuxer::{
matroska::{self, AttachedFile, Segment},
};
use log::info;
-use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
+use rayon::{
+ ThreadPoolBuilder,
+ iter::{IntoParallelIterator, ParallelBridge, ParallelIterator},
+};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
@@ -41,6 +44,7 @@ use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking};
pub struct Config {
media_path: PathBuf,
api: ApiSecrets,
+ num_threads: usize
}
#[derive(Serialize, Deserialize, Debug, Default)]
@@ -87,10 +91,18 @@ pub fn get_trakt() -> Result<Trakt> {
pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
let _sem = IMPORT_SEM.try_acquire().context("already importing")?;
+ let rt = Handle::current();
+ let tp = ThreadPoolBuilder::new()
+ .thread_name(|n| format!("import #{n}"))
+ .num_threads(CONF.num_threads)
+ .build()?;
+
let jh = spawn_blocking(move || {
- reporting::start_import();
- reporting::catch(import(&db, incremental));
- reporting::end_import();
+ tp.install(|| {
+ reporting::start_import();
+ reporting::catch(import(&db, &rt, incremental));
+ reporting::end_import();
+ })
});
let _ = jh.await;
@@ -98,9 +110,8 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
Ok(())
}
-fn import(db: &Database, incremental: bool) -> Result<()> {
+fn import(db: &Database, rt: &Handle, incremental: bool) -> Result<()> {
let plugins = init_plugins(&CONF.api);
-
let files = Mutex::new(Vec::new());
import_traverse(
&CONF.media_path,
@@ -112,7 +123,6 @@ fn import(db: &Database, incremental: bool) -> Result<()> {
)?;
let files = files.into_inner().unwrap();
- let rt = Handle::current();
let mut nodes = Mutex::new(HashSet::new());
files.into_par_iter().for_each(|(path, parent, iflags)| {
@@ -364,6 +374,12 @@ fn process_node(
let inf = p.info();
if inf.handle_process {
reporting::set_task(format!("{}(proc): {slug}", inf.name));
+ let Some(data) = reporting::catch(
+ db.get_node(node)
+ .and_then(|e| e.ok_or(anyhow!("node missing"))),
+ ) else {
+ return;
+ };
reporting::catch(
p.process(
&ImportContext {
diff --git a/import/src/plugins/infojson.rs b/import/src/plugins/infojson.rs
index 5c3645c..03db70c 100644
--- a/import/src/plugins/infojson.rs
+++ b/import/src/plugins/infojson.rs
@@ -273,8 +273,9 @@ impl ImportPlugin for Infojson {
}
fn clean_uploader_name(mut s: &str) -> &str {
- s = s.strip_suffix(" - Videos").unwrap_or(s);
- s = s.strip_suffix(" - Topic").unwrap_or(s);
- s = s.strip_prefix("Uploads from ").unwrap_or(s);
+ s = s.strip_suffix(" - Videos").unwrap_or(s); // youtube
+ s = s.strip_suffix(" - Topic").unwrap_or(s); // youtube
+ s = s.strip_prefix("Uploads from ").unwrap_or(s); // youtube
+ s = s.strip_prefix("Discography of ").unwrap_or(s); // bandcamp
s
}
diff --git a/import/src/plugins/misc.rs b/import/src/plugins/misc.rs
index 0444d0c..4a4851e 100644
--- a/import/src/plugins/misc.rs
+++ b/import/src/plugins/misc.rs
@@ -122,6 +122,19 @@ impl ImportPlugin for General {
Ok(())
})?;
}
+ if let Some(title) = line.strip_prefix("title=") {
+ ct.db.update_node_init(node, |node| {
+ node.title = Some(title.to_owned());
+ Ok(())
+ })?;
+ }
+ if let Some(index) = line.strip_prefix("index=") {
+ let index = index.parse().context("parse index")?;
+ ct.db.update_node_init(node, |node| {
+ node.index = Some(index);
+ Ok(())
+ })?;
+ }
Ok(())
}
}
diff --git a/import/src/plugins/trakt.rs b/import/src/plugins/trakt.rs
index 48a97ae..1268e56 100644
--- a/import/src/plugins/trakt.rs
+++ b/import/src/plugins/trakt.rs
@@ -409,14 +409,14 @@ impl ImportPlugin for Trakt {
Ok(())
}
fn process(&self, ct: &ImportContext, node: NodeID, data: &Node) -> Result<()> {
- self.process_show(ct, node, data)?;
+ self.process_primary(ct, node, data)?;
self.process_episode(ct, node, data)?;
Ok(())
}
}
impl Trakt {
- fn process_show(&self, ct: &ImportContext, node: NodeID, data: &Node) -> Result<()> {
+ fn process_primary(&self, ct: &ImportContext, node: NodeID, data: &Node) -> Result<()> {
let (trakt_kind, trakt_id): (_, u64) =
if let Some(id) = data.identifiers.get(&IdentifierType::TraktShow) {
(TraktKind::Show, id.parse()?)