aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs105
1 files changed, 64 insertions, 41 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index ad929fa..2d8d987 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -14,7 +14,7 @@ use crate::{
};
use anyhow::{Context, Result, anyhow};
use jellycache::{HashKey, cache_memory, cache_store};
-use jellycommon::jellyobject::{self, Object, ObjectBuffer, Tag, TypedTag};
+use jellycommon::jellyobject::{self, ObjectBuffer, Tag, TypedTag};
use jellydb::{
backends::Database,
query::{Filter, Query, Sort},
@@ -33,12 +33,14 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fs::{File, read_to_string},
+ hash::{DefaultHasher, Hash},
marker::PhantomData,
mem::swap,
path::{Path, PathBuf},
sync::{Arc, LazyLock, Mutex},
time::UNIX_EPOCH,
};
+use std::{fmt::Display, hash::Hasher};
use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking};
#[rustfmt::skip]
@@ -81,13 +83,20 @@ pub fn is_importing() -> bool {
IMPORT_SEM.available_permits() == 0
}
+#[derive(Debug, Clone)]
pub struct NodeID(pub String);
+impl Display for NodeID {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(&self.0)
+ }
+}
const NODE_ID: TypedTag<&str> = TypedTag(Tag(0x8123), PhantomData);
+#[derive(Clone)]
struct DatabaseTables {
db: Arc<dyn Database>,
- nodes: Table,
+ nodes: Arc<Table>,
}
fn node_id_query(node: &NodeID) -> Query {
@@ -101,10 +110,23 @@ fn node_id_query(node: &NodeID) -> Query {
}
impl DatabaseTables {
- pub fn update_node_init(
+ pub fn update_node(
+ &self,
+ node: RowNum,
+ mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer,
+ ) -> Result<()> {
+ self.db.write_transaction(&mut |txn| {
+ let ob_before = self.nodes.get(txn, node)?.unwrap();
+ let ob_after = update(ob_before);
+ self.nodes.update(txn, node, ob_after)?;
+ Ok(())
+ })?;
+ Ok(())
+ }
+ pub fn update_node_by_nodeid(
&self,
node: NodeID,
- mut update: impl FnMut(Object) -> Option<ObjectBuffer>,
+ mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer,
) -> Result<()> {
self.db.write_transaction(&mut |txn| {
let node = match self.nodes.query_single(txn, node_id_query(&node)) {
@@ -113,9 +135,17 @@ impl DatabaseTables {
.nodes
.insert(txn, ObjectBuffer::new(&mut [(NODE_ID.0, &node.0.as_str())]))?,
};
- let ob = self.nodes.get(txn, node)?.unwrap();
- if let Some(changed) = update(ob.as_object()) {
- self.nodes.update(txn, node, changed)?;
+
+ let ob_before = self.nodes.get(txn, node)?.unwrap();
+ let mut hash_before = DefaultHasher::new();
+ ob_before.hash(&mut hash_before);
+
+ let ob_after = update(ob_before);
+
+ let mut hash_after = DefaultHasher::new();
+ ob_after.hash(&mut hash_after);
+ if hash_before.finish() != hash_after.finish() {
+ self.nodes.update(txn, node, ob_after)?;
}
Ok(())
})?;
@@ -145,14 +175,14 @@ pub async fn import_wrap(db: DatabaseTables, incremental: bool) -> Result<()> {
Ok(())
}
-fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> {
+fn import(dba: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> {
let plugins = init_plugins(&CONF.api);
let files = Mutex::new(Vec::new());
import_traverse(
&CONF.media_path,
- db,
+ &dba,
incremental,
- NodeID::MIN,
+ None,
InheritedFlags::default(),
&files,
)?;
@@ -162,7 +192,7 @@ fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> {
files.into_par_iter().for_each(|(path, parent, iflags)| {
reporting::set_task(format!("unknown: {path:?}"));
- import_file(db, &rt, &nodes, &plugins, &path, parent, iflags);
+ import_file(&dba, &rt, &nodes, &plugins, &path, parent, iflags);
IMPORT_PROGRESS
.blocking_write()
.as_mut()
@@ -183,7 +213,7 @@ fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> {
swap(nodes.get_mut().unwrap(), &mut cur_nodes);
cur_nodes.into_par_iter().for_each(|node| {
reporting::set_task(format!("unknown: {node}"));
- process_node(db, &rt, &plugins, &nodes, node);
+ process_node(&dba, &rt, &plugins, &nodes, node);
IMPORT_PROGRESS
.blocking_write()
.as_mut()
@@ -196,27 +226,20 @@ fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> {
Ok(())
}
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, Default)]
pub struct InheritedFlags {
- visibility: Visibility,
+ hidden: bool,
+ reduced: bool,
use_acoustid: bool,
}
-impl Default for InheritedFlags {
- fn default() -> Self {
- Self {
- visibility: Visibility::Visible,
- use_acoustid: false,
- }
- }
-}
fn import_traverse(
path: &Path,
- db: DatabaseTables,
+ dba: &DatabaseTables,
incremental: bool,
- parent: Option<RowNum>,
+ parent: Option<NodeID>,
mut iflags: InheritedFlags,
- out: &Mutex<Vec<(PathBuf, RowNum, InheritedFlags)>>,
+ out: &Mutex<Vec<(PathBuf, NodeID, InheritedFlags)>>,
) -> Result<()> {
if path.is_dir() {
reporting::set_task(format!("indexing {path:?}"));
@@ -228,15 +251,15 @@ fn import_traverse(
if let Ok(content) = read_to_string(path.join("flags")) {
for flag in content.lines() {
match flag.trim() {
- "hidden" => iflags.visibility = iflags.visibility.min(Visibility::Hidden),
- "reduced" => iflags.visibility = iflags.visibility.min(Visibility::Reduced),
+ "hidden" => iflags.hidden = true,
+ "reduced" => iflags.reduced = true,
"use_acoustid" => iflags.use_acoustid = true,
_ => (),
}
}
}
- db.update_node_init(node, |n| {
+ dba.update_node_by_nodeid(node, |n| {
if parent != NodeID::MIN {
n.parents.insert(parent);
}
@@ -247,7 +270,7 @@ fn import_traverse(
path.read_dir()?.par_bridge().try_for_each(|e| {
let path = e?.path();
reporting::catch(
- import_traverse(&path, db, incremental, node, iflags, out)
+ import_traverse(&path, dba, incremental, node, iflags, out)
.context(anyhow!("index {:?}", path.file_name().unwrap())),
);
anyhow::Ok(())
@@ -260,7 +283,7 @@ fn import_traverse(
let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
if incremental {
- if let Some(last_mtime) = db.get_import_file_mtime(path)? {
+ if let Some(last_mtime) = dba.get_import_file_mtime(path)? {
if last_mtime >= mtime {
return Ok(());
}
@@ -278,9 +301,9 @@ fn import_traverse(
}
fn import_file(
- db: &Database,
+ dba: &DatabaseTables,
rt: &Handle,
- nodes: &Mutex<HashSet<NodeID>>,
+ pending_nodes: &Mutex<HashSet<NodeID>>,
plugins: &[Box<dyn ImportPlugin>],
path: &Path,
parent: NodeID,
@@ -288,10 +311,10 @@ fn import_file(
) {
let mut all_ok = true;
let ct = ImportContext {
- db,
+ dba,
rt,
iflags,
- pending_nodes: nodes,
+ pending_nodes,
};
let filename = path.file_name().unwrap().to_string_lossy();
if filename == "flags" {
@@ -300,7 +323,7 @@ fn import_file(
else {
return;
};
- nodes.lock().unwrap().insert(parent);
+ pending_nodes.lock().unwrap().insert(parent);
for line in content.lines() {
for p in plugins {
let inf = p.info();
@@ -320,7 +343,7 @@ fn import_file(
let slug = get_node_slug(path).unwrap();
let node = NodeID::from_slug(&slug);
- nodes.lock().unwrap().insert(node);
+ pending_nodes.lock().unwrap().insert(node);
all_ok &= reporting::catch(db.update_node_init(node, |node| {
node.slug = slug;
if parent != NodeID::MIN {
@@ -390,14 +413,14 @@ fn import_file(
}
fn process_node(
- dba: DatabaseTables,
+ dba: &DatabaseTables,
rt: &Handle,
plugins: &[Box<dyn ImportPlugin>],
- nodes: &Mutex<HashSet<NodeID>>,
+ pending_nodes: &Mutex<HashSet<NodeID>>,
node: NodeID,
) {
let Some(data) = reporting::catch(
- db.get_node(node)
+ dba.get_node(node)
.and_then(|e| e.ok_or(anyhow!("node missing"))),
) else {
return;
@@ -408,7 +431,7 @@ fn process_node(
if inf.handle_process {
reporting::set_task(format!("{}(proc): {slug}", inf.name));
let Some(data) = reporting::catch(
- db.get_node(node)
+ dba.get_node(node)
.and_then(|e| e.ok_or(anyhow!("node missing"))),
) else {
return;
@@ -419,7 +442,7 @@ fn process_node(
dba,
rt,
iflags: InheritedFlags::default(),
- pending_nodes: nodes,
+ pending_nodes,
},
node,
&data,