diff options
Diffstat (limited to 'import/src/lib.rs')
| -rw-r--r-- | import/src/lib.rs | 218 |
1 files changed, 131 insertions, 87 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index 2d8d987..56da625 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -14,7 +14,10 @@ use crate::{ }; use anyhow::{Context, Result, anyhow}; use jellycache::{HashKey, cache_memory, cache_store}; -use jellycommon::jellyobject::{self, ObjectBuffer, Tag, TypedTag}; +use jellycommon::{ + jellyobject::{self, ObjectBuffer, Path as TagPath, fields}, + *, +}; use jellydb::{ backends::Database, query::{Filter, Query, Sort}, @@ -33,14 +36,11 @@ use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fs::{File, read_to_string}, - hash::{DefaultHasher, Hash}, - marker::PhantomData, mem::swap, path::{Path, PathBuf}, sync::{Arc, LazyLock, Mutex}, time::UNIX_EPOCH, }; -use std::{fmt::Display, hash::Hasher}; use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking}; #[rustfmt::skip] @@ -83,28 +83,21 @@ pub fn is_importing() -> bool { IMPORT_SEM.available_permits() == 0 } -#[derive(Debug, Clone)] -pub struct NodeID(pub String); -impl Display for NodeID { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } +#[derive(Clone)] +pub struct DatabaseTables { + pub db: Arc<dyn Database>, + pub nodes: Arc<Table>, + pub import_meta: Arc<Table>, } -const NODE_ID: TypedTag<&str> = TypedTag(Tag(0x8123), PhantomData); - -#[derive(Clone)] -struct DatabaseTables { - db: Arc<dyn Database>, - nodes: Arc<Table>, +fields! { + IM_PATH: &str = 0x11001 "path"; + IM_MTIME: u64 = 0x11002 "mtime"; } -fn node_id_query(node: &NodeID) -> Query { +fn node_slug_query(slug: &str) -> Query { Query { - filter: Filter::Match( - jellyobject::Path(vec![NODE_ID.0]), - node.0.as_bytes().to_vec(), - ), + filter: Filter::Match(jellyobject::Path(vec![NO_SLUG.0]), slug.as_bytes().to_vec()), sort: Sort::None, } } @@ -123,33 +116,35 @@ impl DatabaseTables { })?; Ok(()) } - pub fn update_node_by_nodeid( + pub fn update_node_slug( &self, - node: NodeID, + slug: &str, mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, - ) -> Result<()> { + ) -> Result<RowNum> { + let mut row = 0; self.db.write_transaction(&mut |txn| { - let node = match self.nodes.query_single(txn, node_id_query(&node)) { + row = match self.nodes.query_single(txn, node_slug_query(slug)) { Some(r) => r, None => self .nodes - .insert(txn, ObjectBuffer::new(&mut [(NODE_ID.0, &node.0.as_str())]))?, + .insert(txn, ObjectBuffer::new(&mut [(NO_SLUG.0, &slug)]))?, }; - let ob_before = self.nodes.get(txn, node)?.unwrap(); - let mut hash_before = DefaultHasher::new(); - ob_before.hash(&mut hash_before); - - let ob_after = update(ob_before); - - let mut hash_after = DefaultHasher::new(); - ob_after.hash(&mut hash_after); - if hash_before.finish() != hash_after.finish() { - self.nodes.update(txn, node, ob_after)?; - } + let node = self.nodes.get(txn, row)?.unwrap(); + let node = update(node); + let node = node.as_object().insert(NO_SLUG, slug); + self.nodes.update(txn, row, node)?; Ok(()) })?; - Ok(()) + Ok(row) + } + pub fn get_node(&self, node: RowNum) -> Result<Option<ObjectBuffer>> { + let mut buf = None; + self.db.read_transaction(&mut |txn| { + buf = self.nodes.get(txn, node)?; + Ok(()) + })?; + Ok(buf) } } @@ -237,15 +232,14 @@ fn import_traverse( path: &Path, dba: &DatabaseTables, incremental: bool, - parent: Option<NodeID>, + parent: Option<RowNum>, mut iflags: InheritedFlags, - out: &Mutex<Vec<(PathBuf, NodeID, InheritedFlags)>>, + out: &Mutex<Vec<(PathBuf, RowNum, InheritedFlags)>>, ) -> Result<()> { if path.is_dir() { reporting::set_task(format!("indexing {path:?}")); let slug = get_node_slug(path).unwrap(); - let node = NodeID::from_slug(&slug); // Some flags need to applied immediatly because they are inherited if let Ok(content) = read_to_string(path.join("flags")) { @@ -259,18 +253,23 @@ fn import_traverse( } } - dba.update_node_by_nodeid(node, |n| { - if parent != NodeID::MIN { - n.parents.insert(parent); + let row = dba.update_node_slug(&slug, |mut node| { + if let Some(parent) = parent { + node = node.as_object().extend(NO_PARENT, [parent]); } - n.slug = slug; - n.visibility = iflags.visibility; + if iflags.hidden { + node = node.as_object().insert(NO_VISIBILITY, VISI_HIDDEN); + } + if iflags.reduced { + node = node.as_object().insert(NO_VISIBILITY, VISI_REDUCED); + } + node })?; path.read_dir()?.par_bridge().try_for_each(|e| { let path = e?.path(); reporting::catch( - import_traverse(&path, dba, incremental, node, iflags, out) + import_traverse(&path, dba, incremental, Some(row), iflags, out) .context(anyhow!("index {:?}", path.file_name().unwrap())), ); anyhow::Ok(()) @@ -278,15 +277,12 @@ fn import_traverse( return Ok(()); } - if path.is_file() { - let meta = path.metadata()?; - let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); - + if path.is_file() + && let Some(parent) = parent + { if incremental { - if let Some(last_mtime) = dba.get_import_file_mtime(path)? { - if last_mtime >= mtime { - return Ok(()); - } + if compare_mtime(dba, path)? { + return Ok(()); } } @@ -303,10 +299,10 @@ fn import_traverse( fn import_file( dba: &DatabaseTables, rt: &Handle, - pending_nodes: &Mutex<HashSet<NodeID>>, + pending_nodes: &Mutex<HashSet<RowNum>>, plugins: &[Box<dyn ImportPlugin>], path: &Path, - parent: NodeID, + parent: RowNum, iflags: InheritedFlags, ) { let mut all_ok = true; @@ -341,17 +337,20 @@ fn import_file( if filename.ends_with("mkv") || filename.ends_with("mka") || filename.ends_with("mks") { let slug = get_node_slug(path).unwrap(); - let node = NodeID::from_slug(&slug); - pending_nodes.lock().unwrap().insert(node); - all_ok &= reporting::catch(db.update_node_init(node, |node| { - node.slug = slug; - if parent != NodeID::MIN { - node.parents.insert(parent); + let Some(row) = reporting::catch(dba.update_node_slug(&slug, |mut node| { + node = node.as_object().extend(NO_PARENT, [parent]); + if iflags.hidden { + node = node.as_object().insert(NO_VISIBILITY, VISI_HIDDEN); } - node.visibility = iflags.visibility; - })) - .is_some(); + if iflags.reduced { + node = node.as_object().insert(NO_VISIBILITY, VISI_REDUCED); + } + node + })) else { + return; + }; + pending_nodes.lock().unwrap().insert(row); let flags = filename .split(".") @@ -368,7 +367,7 @@ fn import_file( if inf.handle_instruction { reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( - p.instruction(&ct, node, line) + p.instruction(&ct, row, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); @@ -387,7 +386,7 @@ fn import_file( if inf.handle_media { reporting::set_task(format!("{}(media): {path:?}", inf.name)); all_ok &= reporting::catch( - p.media(&ct, node, path, &seg) + p.media(&ct, row, path, &seg) .context(anyhow!("{}(media) {path:?}", inf.name)), ) .is_some(); @@ -408,7 +407,7 @@ fn import_file( } if all_ok { - reporting::catch(update_mtime(db, path).context("updating mtime")); + reporting::catch(update_mtime(dba, path).context("updating mtime")); } } @@ -416,26 +415,22 @@ fn process_node( dba: &DatabaseTables, rt: &Handle, plugins: &[Box<dyn ImportPlugin>], - pending_nodes: &Mutex<HashSet<NodeID>>, - node: NodeID, + pending_nodes: &Mutex<HashSet<RowNum>>, + node: RowNum, ) { - let Some(data) = reporting::catch( - dba.get_node(node) - .and_then(|e| e.ok_or(anyhow!("node missing"))), - ) else { - return; - }; - let slug = &data.slug; + let mut slug = String::new(); + reporting::catch(dba.db.read_transaction(&mut |txn| { + let no = dba.nodes.get(txn, node)?.unwrap(); + if let Some(s) = no.as_object().get(NO_SLUG) { + slug = s.to_owned(); + } + Ok(()) + })); for p in plugins { let inf = p.info(); if inf.handle_process { reporting::set_task(format!("{}(proc): {slug}", inf.name)); - let Some(data) = reporting::catch( - dba.get_node(node) - .and_then(|e| e.ok_or(anyhow!("node missing"))), - ) else { - return; - }; + reporting::catch( p.process( &ImportContext { @@ -445,7 +440,6 @@ fn process_node( pending_nodes, }, node, - &data, ) .context(anyhow!("{}(proc) {slug}", inf.name)), ); @@ -453,10 +447,60 @@ fn process_node( } } -fn update_mtime(db: DatabaseTables, path: &Path) -> Result<()> { +fn compare_mtime(dba: &DatabaseTables, path: &Path) -> Result<bool> { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); - db.set_import_file_mtime(path, mtime)?; + let mut was_changed = false; + dba.db.read_transaction(&mut |txn| { + match dba.import_meta.query_single( + txn, + Query { + filter: Filter::Match( + TagPath(vec![IM_PATH.0]), + path.as_os_str().as_encoded_bytes().to_vec(), + ), + sort: Sort::None, + }, + ) { + None => was_changed = true, + Some(row) => { + let meta = dba.import_meta.get(txn, row)?.unwrap(); + let prev_mtime = meta.as_object().get(IM_MTIME).unwrap_or_default(); + was_changed = mtime > prev_mtime + } + } + Ok(()) + })?; + Ok(was_changed) +} + +fn update_mtime(dba: &DatabaseTables, path: &Path) -> Result<()> { + let meta = path.metadata()?; + let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); + dba.db.write_transaction(&mut |txn| { + let row = match dba.import_meta.query_single( + txn, + Query { + filter: Filter::Match( + TagPath(vec![IM_PATH.0]), + path.as_os_str().as_encoded_bytes().to_vec(), + ), + sort: Sort::None, + }, + ) { + Some(row) => row, + None => dba.import_meta.insert( + txn, + ObjectBuffer::new(&mut [(IM_PATH.0, &path.as_os_str().as_encoded_bytes())]), + )?, + }; + + let mut ob = dba.import_meta.get(txn, row)?.unwrap(); + ob = ob.as_object().insert(IM_MTIME, mtime); + dba.import_meta.update(txn, row, ob)?; + + Ok(()) + })?; Ok(()) } |