diff options
| author | metamuffin <metamuffin@disroot.org> | 2025-12-12 01:59:38 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2025-12-12 01:59:38 +0100 |
| commit | 3661af07620caad1bfa46fa6902054797433fc0f (patch) | |
| tree | 2ac2d68b06d16bf0f0a2242ad1e00905ef50006d /import/src/lib.rs | |
| parent | 9289a82e262e4acc58b37ba7e0fc29f85cb1ba7c (diff) | |
| download | jellything-3661af07620caad1bfa46fa6902054797433fc0f.tar jellything-3661af07620caad1bfa46fa6902054797433fc0f.tar.bz2 jellything-3661af07620caad1bfa46fa6902054797433fc0f.tar.zst | |
Custom import thread count and fixes
Diffstat (limited to 'import/src/lib.rs')
| -rw-r--r-- | import/src/lib.rs | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index 3b7c8fc..8e4c702 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -24,7 +24,10 @@ use jellyremuxer::{ matroska::{self, AttachedFile, Segment}, }; use log::info; -use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; +use rayon::{ + ThreadPoolBuilder, + iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}, +}; use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, @@ -41,6 +44,7 @@ use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking}; pub struct Config { media_path: PathBuf, api: ApiSecrets, + num_threads: usize } #[derive(Serialize, Deserialize, Debug, Default)] @@ -87,10 +91,18 @@ pub fn get_trakt() -> Result<Trakt> { pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> { let _sem = IMPORT_SEM.try_acquire().context("already importing")?; + let rt = Handle::current(); + let tp = ThreadPoolBuilder::new() + .thread_name(|n| format!("import #{n}")) + .num_threads(CONF.num_threads) + .build()?; + let jh = spawn_blocking(move || { - reporting::start_import(); - reporting::catch(import(&db, incremental)); - reporting::end_import(); + tp.install(|| { + reporting::start_import(); + reporting::catch(import(&db, &rt, incremental)); + reporting::end_import(); + }) }); let _ = jh.await; @@ -98,9 +110,8 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> { Ok(()) } -fn import(db: &Database, incremental: bool) -> Result<()> { +fn import(db: &Database, rt: &Handle, incremental: bool) -> Result<()> { let plugins = init_plugins(&CONF.api); - let files = Mutex::new(Vec::new()); import_traverse( &CONF.media_path, @@ -112,7 +123,6 @@ fn import(db: &Database, incremental: bool) -> Result<()> { )?; let files = files.into_inner().unwrap(); - let rt = Handle::current(); let mut nodes = Mutex::new(HashSet::new()); files.into_par_iter().for_each(|(path, parent, iflags)| { @@ -364,6 +374,12 @@ fn process_node( let inf = p.info(); if inf.handle_process { reporting::set_task(format!("{}(proc): {slug}", inf.name)); + let Some(data) = reporting::catch( + db.get_node(node) + .and_then(|e| e.ok_or(anyhow!("node missing"))), + ) else { + return; + }; reporting::catch( p.process( &ImportContext { |