aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-12-12 01:59:38 +0100
committermetamuffin <metamuffin@disroot.org>2025-12-12 01:59:38 +0100
commit3661af07620caad1bfa46fa6902054797433fc0f (patch)
tree2ac2d68b06d16bf0f0a2242ad1e00905ef50006d /import/src/lib.rs
parent9289a82e262e4acc58b37ba7e0fc29f85cb1ba7c (diff)
downloadjellything-3661af07620caad1bfa46fa6902054797433fc0f.tar
jellything-3661af07620caad1bfa46fa6902054797433fc0f.tar.bz2
jellything-3661af07620caad1bfa46fa6902054797433fc0f.tar.zst
Custom import thread count and fixes
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs30
1 files changed, 23 insertions, 7 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 3b7c8fc..8e4c702 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -24,7 +24,10 @@ use jellyremuxer::{
matroska::{self, AttachedFile, Segment},
};
use log::info;
-use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
+use rayon::{
+ ThreadPoolBuilder,
+ iter::{IntoParallelIterator, ParallelBridge, ParallelIterator},
+};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
@@ -41,6 +44,7 @@ use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking};
pub struct Config {
media_path: PathBuf,
api: ApiSecrets,
+ num_threads: usize
}
#[derive(Serialize, Deserialize, Debug, Default)]
@@ -87,10 +91,18 @@ pub fn get_trakt() -> Result<Trakt> {
pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
let _sem = IMPORT_SEM.try_acquire().context("already importing")?;
+ let rt = Handle::current();
+ let tp = ThreadPoolBuilder::new()
+ .thread_name(|n| format!("import #{n}"))
+ .num_threads(CONF.num_threads)
+ .build()?;
+
let jh = spawn_blocking(move || {
- reporting::start_import();
- reporting::catch(import(&db, incremental));
- reporting::end_import();
+ tp.install(|| {
+ reporting::start_import();
+ reporting::catch(import(&db, &rt, incremental));
+ reporting::end_import();
+ })
});
let _ = jh.await;
@@ -98,9 +110,8 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
Ok(())
}
-fn import(db: &Database, incremental: bool) -> Result<()> {
+fn import(db: &Database, rt: &Handle, incremental: bool) -> Result<()> {
let plugins = init_plugins(&CONF.api);
-
let files = Mutex::new(Vec::new());
import_traverse(
&CONF.media_path,
@@ -112,7 +123,6 @@ fn import(db: &Database, incremental: bool) -> Result<()> {
)?;
let files = files.into_inner().unwrap();
- let rt = Handle::current();
let mut nodes = Mutex::new(HashSet::new());
files.into_par_iter().for_each(|(path, parent, iflags)| {
@@ -364,6 +374,12 @@ fn process_node(
let inf = p.info();
if inf.handle_process {
reporting::set_task(format!("{}(proc): {slug}", inf.name));
+ let Some(data) = reporting::catch(
+ db.get_node(node)
+ .and_then(|e| e.ok_or(anyhow!("node missing"))),
+ ) else {
+ return;
+ };
reporting::catch(
p.process(
&ImportContext {