aboutsummaryrefslogtreecommitdiff
path: root/import/src
diff options
context:
space:
mode:
Diffstat (limited to 'import/src')
-rw-r--r--import/src/lib.rs42
-rw-r--r--import/src/reporting.rs20
2 files changed, 36 insertions, 26 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 9129a70..b31d356 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -9,9 +9,8 @@ pub mod helpers;
pub mod plugins;
pub mod reporting;
-use crate::{
- plugins::{ImportPlugin, PluginContext, infojson::is_info_json, init_plugins, misc::is_cover},
- reporting::IMPORT_PROGRESS,
+use crate::plugins::{
+ ImportPlugin, PluginContext, infojson::is_info_json, init_plugins, misc::is_cover,
};
use anyhow::{Context, Result, anyhow};
use jellycache::{Cache, HashKey};
@@ -78,7 +77,7 @@ pub struct ImportConfig {
pub db: Arc<dyn Database>,
}
-fn node_slug_query(slug: &str) -> Query {
+fn node_slug_query<'a>(slug: &'a str) -> Query<'a> {
Query {
filter: Filter::Match(jellyobject::Path(vec![NO_SLUG.0]), slug.into()),
sort: Sort::None,
@@ -152,7 +151,10 @@ pub async fn import_wrap(ic: ImportConfig, incremental: bool) -> Result<()> {
}
fn import(ic: ImportConfig, rt: &Handle, incremental: bool) -> Result<()> {
+ reporting::set_stage(format!("Initializing Plugins"), 0);
let plugins = init_plugins(&ic.config.api);
+
+ reporting::set_stage(format!("Indexing files"), 0);
let files = Mutex::new(Vec::new());
import_traverse(
&ic.config.media_path,
@@ -166,35 +168,27 @@ fn import(ic: ImportConfig, rt: &Handle, incremental: bool) -> Result<()> {
let mut nodes = Mutex::new(HashSet::new());
+ reporting::set_stage(format!("Importing files"), files.len());
files.into_par_iter().for_each(|(path, parent, iflags)| {
reporting::set_task(format!("unknown: {path:?}"));
import_file(&ic, &rt, &nodes, &plugins, &path, parent, iflags);
- IMPORT_PROGRESS
- .blocking_write()
- .as_mut()
- .unwrap()
- .finished_items += 1;
+ reporting::inc_finished();
reporting::set_task("idle".to_owned());
});
- IMPORT_PROGRESS
- .blocking_write()
- .as_mut()
- .unwrap()
- .finished_items = 0;
-
- while !nodes.get_mut().unwrap().is_empty() {
+ for n in 1.. {
+ let size = nodes.get_mut().unwrap().len();
+ if size == 0 {
+ break;
+ }
+ reporting::set_stage(format!("Processing nodes (pass #{n})"), size);
info!("process iter (size={})", nodes.get_mut().unwrap().len());
let mut cur_nodes = HashSet::new();
swap(nodes.get_mut().unwrap(), &mut cur_nodes);
cur_nodes.into_par_iter().for_each(|node| {
reporting::set_task(format!("unknown: {node}"));
process_node(&ic, &rt, &plugins, &nodes, node);
- IMPORT_PROGRESS
- .blocking_write()
- .as_mut()
- .unwrap()
- .finished_items += 1;
+ reporting::inc_finished();
reporting::set_task("idle".to_owned());
});
}
@@ -271,11 +265,7 @@ fn import_traverse(
}
}
- IMPORT_PROGRESS
- .blocking_write()
- .as_mut()
- .unwrap()
- .total_items += 1;
+ reporting::inc_total();
out.lock().unwrap().push((path.to_owned(), parent, iflags));
}
Ok(())
diff --git a/import/src/reporting.rs b/import/src/reporting.rs
index 61bd623..631be65 100644
--- a/import/src/reporting.rs
+++ b/import/src/reporting.rs
@@ -15,6 +15,7 @@ pub static IMPORT_PROGRESS: RwLock<Option<ImportProgress>> = RwLock::const_new(N
#[derive(Debug, Serialize, Clone)]
pub struct ImportProgress {
+ pub stage: String,
pub total_items: usize,
pub finished_items: usize,
pub tasks: Vec<String>,
@@ -23,6 +24,7 @@ pub struct ImportProgress {
pub(crate) fn start_import() {
*IMPORT_ERRORS.blocking_write() = Vec::new();
*IMPORT_PROGRESS.blocking_write() = Some(ImportProgress {
+ stage: "Initializing".to_string(),
total_items: 0,
finished_items: 0,
tasks: vec!["idle".to_string(); current_num_threads()],
@@ -32,6 +34,24 @@ pub(crate) fn end_import() {
*IMPORT_PROGRESS.blocking_write() = None;
}
+pub(crate) fn set_stage(name: String, total: usize) {
+ if let Some(p) = IMPORT_PROGRESS.blocking_write().as_mut() {
+ p.stage = name;
+ p.total_items = total;
+ p.finished_items = 0;
+ }
+}
+pub(crate) fn inc_finished() {
+ if let Some(p) = IMPORT_PROGRESS.blocking_write().as_mut() {
+ p.finished_items += 1;
+ }
+}
+pub(crate) fn inc_total() {
+ if let Some(p) = IMPORT_PROGRESS.blocking_write().as_mut() {
+ p.total_items += 1;
+ }
+}
+
pub(crate) fn set_task(task: String) {
let thread = current_thread_index().unwrap_or(0);
debug!("T#{thread:<3}| {task}");