diff options
Diffstat (limited to 'import/src/lib.rs')
| -rw-r--r-- | import/src/lib.rs | 105 |
1 files changed, 64 insertions, 41 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index ad929fa..2d8d987 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -14,7 +14,7 @@ use crate::{ }; use anyhow::{Context, Result, anyhow}; use jellycache::{HashKey, cache_memory, cache_store}; -use jellycommon::jellyobject::{self, Object, ObjectBuffer, Tag, TypedTag}; +use jellycommon::jellyobject::{self, ObjectBuffer, Tag, TypedTag}; use jellydb::{ backends::Database, query::{Filter, Query, Sort}, @@ -33,12 +33,14 @@ use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fs::{File, read_to_string}, + hash::{DefaultHasher, Hash}, marker::PhantomData, mem::swap, path::{Path, PathBuf}, sync::{Arc, LazyLock, Mutex}, time::UNIX_EPOCH, }; +use std::{fmt::Display, hash::Hasher}; use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking}; #[rustfmt::skip] @@ -81,13 +83,20 @@ pub fn is_importing() -> bool { IMPORT_SEM.available_permits() == 0 } +#[derive(Debug, Clone)] pub struct NodeID(pub String); +impl Display for NodeID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} const NODE_ID: TypedTag<&str> = TypedTag(Tag(0x8123), PhantomData); +#[derive(Clone)] struct DatabaseTables { db: Arc<dyn Database>, - nodes: Table, + nodes: Arc<Table>, } fn node_id_query(node: &NodeID) -> Query { @@ -101,10 +110,23 @@ fn node_id_query(node: &NodeID) -> Query { } impl DatabaseTables { - pub fn update_node_init( + pub fn update_node( + &self, + node: RowNum, + mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, + ) -> Result<()> { + self.db.write_transaction(&mut |txn| { + let ob_before = self.nodes.get(txn, node)?.unwrap(); + let ob_after = update(ob_before); + self.nodes.update(txn, node, ob_after)?; + Ok(()) + })?; + Ok(()) + } + pub fn update_node_by_nodeid( &self, node: NodeID, - mut update: impl FnMut(Object) -> Option<ObjectBuffer>, + mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, ) -> Result<()> { self.db.write_transaction(&mut |txn| { let node = match self.nodes.query_single(txn, node_id_query(&node)) { @@ -113,9 +135,17 @@ impl DatabaseTables { .nodes .insert(txn, ObjectBuffer::new(&mut [(NODE_ID.0, &node.0.as_str())]))?, }; - let ob = self.nodes.get(txn, node)?.unwrap(); - if let Some(changed) = update(ob.as_object()) { - self.nodes.update(txn, node, changed)?; + + let ob_before = self.nodes.get(txn, node)?.unwrap(); + let mut hash_before = DefaultHasher::new(); + ob_before.hash(&mut hash_before); + + let ob_after = update(ob_before); + + let mut hash_after = DefaultHasher::new(); + ob_after.hash(&mut hash_after); + if hash_before.finish() != hash_after.finish() { + self.nodes.update(txn, node, ob_after)?; } Ok(()) })?; @@ -145,14 +175,14 @@ pub async fn import_wrap(db: DatabaseTables, incremental: bool) -> Result<()> { Ok(()) } -fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> { +fn import(dba: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> { let plugins = init_plugins(&CONF.api); let files = Mutex::new(Vec::new()); import_traverse( &CONF.media_path, - db, + &dba, incremental, - NodeID::MIN, + None, InheritedFlags::default(), &files, )?; @@ -162,7 +192,7 @@ fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> { files.into_par_iter().for_each(|(path, parent, iflags)| { reporting::set_task(format!("unknown: {path:?}")); - import_file(db, &rt, &nodes, &plugins, &path, parent, iflags); + import_file(&dba, &rt, &nodes, &plugins, &path, parent, iflags); IMPORT_PROGRESS .blocking_write() .as_mut() @@ -183,7 +213,7 @@ fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> { swap(nodes.get_mut().unwrap(), &mut cur_nodes); cur_nodes.into_par_iter().for_each(|node| { reporting::set_task(format!("unknown: {node}")); - process_node(db, &rt, &plugins, &nodes, node); + process_node(&dba, &rt, &plugins, &nodes, node); IMPORT_PROGRESS .blocking_write() .as_mut() @@ -196,27 +226,20 @@ fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> { Ok(()) } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub struct InheritedFlags { - visibility: Visibility, + hidden: bool, + reduced: bool, use_acoustid: bool, } -impl Default for InheritedFlags { - fn default() -> Self { - Self { - visibility: Visibility::Visible, - use_acoustid: false, - } - } -} fn import_traverse( path: &Path, - db: DatabaseTables, + dba: &DatabaseTables, incremental: bool, - parent: Option<RowNum>, + parent: Option<NodeID>, mut iflags: InheritedFlags, - out: &Mutex<Vec<(PathBuf, RowNum, InheritedFlags)>>, + out: &Mutex<Vec<(PathBuf, NodeID, InheritedFlags)>>, ) -> Result<()> { if path.is_dir() { reporting::set_task(format!("indexing {path:?}")); @@ -228,15 +251,15 @@ fn import_traverse( if let Ok(content) = read_to_string(path.join("flags")) { for flag in content.lines() { match flag.trim() { - "hidden" => iflags.visibility = iflags.visibility.min(Visibility::Hidden), - "reduced" => iflags.visibility = iflags.visibility.min(Visibility::Reduced), + "hidden" => iflags.hidden = true, + "reduced" => iflags.reduced = true, "use_acoustid" => iflags.use_acoustid = true, _ => (), } } } - db.update_node_init(node, |n| { + dba.update_node_by_nodeid(node, |n| { if parent != NodeID::MIN { n.parents.insert(parent); } @@ -247,7 +270,7 @@ fn import_traverse( path.read_dir()?.par_bridge().try_for_each(|e| { let path = e?.path(); reporting::catch( - import_traverse(&path, db, incremental, node, iflags, out) + import_traverse(&path, dba, incremental, node, iflags, out) .context(anyhow!("index {:?}", path.file_name().unwrap())), ); anyhow::Ok(()) @@ -260,7 +283,7 @@ fn import_traverse( let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); if incremental { - if let Some(last_mtime) = db.get_import_file_mtime(path)? { + if let Some(last_mtime) = dba.get_import_file_mtime(path)? { if last_mtime >= mtime { return Ok(()); } @@ -278,9 +301,9 @@ fn import_traverse( } fn import_file( - db: &Database, + dba: &DatabaseTables, rt: &Handle, - nodes: &Mutex<HashSet<NodeID>>, + pending_nodes: &Mutex<HashSet<NodeID>>, plugins: &[Box<dyn ImportPlugin>], path: &Path, parent: NodeID, @@ -288,10 +311,10 @@ fn import_file( ) { let mut all_ok = true; let ct = ImportContext { - db, + dba, rt, iflags, - pending_nodes: nodes, + pending_nodes, }; let filename = path.file_name().unwrap().to_string_lossy(); if filename == "flags" { @@ -300,7 +323,7 @@ fn import_file( else { return; }; - nodes.lock().unwrap().insert(parent); + pending_nodes.lock().unwrap().insert(parent); for line in content.lines() { for p in plugins { let inf = p.info(); @@ -320,7 +343,7 @@ fn import_file( let slug = get_node_slug(path).unwrap(); let node = NodeID::from_slug(&slug); - nodes.lock().unwrap().insert(node); + pending_nodes.lock().unwrap().insert(node); all_ok &= reporting::catch(db.update_node_init(node, |node| { node.slug = slug; if parent != NodeID::MIN { @@ -390,14 +413,14 @@ fn import_file( } fn process_node( - dba: DatabaseTables, + dba: &DatabaseTables, rt: &Handle, plugins: &[Box<dyn ImportPlugin>], - nodes: &Mutex<HashSet<NodeID>>, + pending_nodes: &Mutex<HashSet<NodeID>>, node: NodeID, ) { let Some(data) = reporting::catch( - db.get_node(node) + dba.get_node(node) .and_then(|e| e.ok_or(anyhow!("node missing"))), ) else { return; @@ -408,7 +431,7 @@ fn process_node( if inf.handle_process { reporting::set_task(format!("{}(proc): {slug}", inf.name)); let Some(data) = reporting::catch( - db.get_node(node) + dba.get_node(node) .and_then(|e| e.ok_or(anyhow!("node missing"))), ) else { return; @@ -419,7 +442,7 @@ fn process_node( dba, rt, iflags: InheritedFlags::default(), - pending_nodes: nodes, + pending_nodes, }, node, &data, |