aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs218
1 files changed, 131 insertions, 87 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 2d8d987..56da625 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -14,7 +14,10 @@ use crate::{
};
use anyhow::{Context, Result, anyhow};
use jellycache::{HashKey, cache_memory, cache_store};
-use jellycommon::jellyobject::{self, ObjectBuffer, Tag, TypedTag};
+use jellycommon::{
+ jellyobject::{self, ObjectBuffer, Path as TagPath, fields},
+ *,
+};
use jellydb::{
backends::Database,
query::{Filter, Query, Sort},
@@ -33,14 +36,11 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fs::{File, read_to_string},
- hash::{DefaultHasher, Hash},
- marker::PhantomData,
mem::swap,
path::{Path, PathBuf},
sync::{Arc, LazyLock, Mutex},
time::UNIX_EPOCH,
};
-use std::{fmt::Display, hash::Hasher};
use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking};
#[rustfmt::skip]
@@ -83,28 +83,21 @@ pub fn is_importing() -> bool {
IMPORT_SEM.available_permits() == 0
}
-#[derive(Debug, Clone)]
-pub struct NodeID(pub String);
-impl Display for NodeID {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str(&self.0)
- }
+#[derive(Clone)]
+pub struct DatabaseTables {
+ pub db: Arc<dyn Database>,
+ pub nodes: Arc<Table>,
+ pub import_meta: Arc<Table>,
}
-const NODE_ID: TypedTag<&str> = TypedTag(Tag(0x8123), PhantomData);
-
-#[derive(Clone)]
-struct DatabaseTables {
- db: Arc<dyn Database>,
- nodes: Arc<Table>,
+fields! {
+ IM_PATH: &str = 0x11001 "path";
+ IM_MTIME: u64 = 0x11002 "mtime";
}
-fn node_id_query(node: &NodeID) -> Query {
+fn node_slug_query(slug: &str) -> Query {
Query {
- filter: Filter::Match(
- jellyobject::Path(vec![NODE_ID.0]),
- node.0.as_bytes().to_vec(),
- ),
+ filter: Filter::Match(jellyobject::Path(vec![NO_SLUG.0]), slug.as_bytes().to_vec()),
sort: Sort::None,
}
}
@@ -123,33 +116,35 @@ impl DatabaseTables {
})?;
Ok(())
}
- pub fn update_node_by_nodeid(
+ pub fn update_node_slug(
&self,
- node: NodeID,
+ slug: &str,
mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer,
- ) -> Result<()> {
+ ) -> Result<RowNum> {
+ let mut row = 0;
self.db.write_transaction(&mut |txn| {
- let node = match self.nodes.query_single(txn, node_id_query(&node)) {
+ row = match self.nodes.query_single(txn, node_slug_query(slug)) {
Some(r) => r,
None => self
.nodes
- .insert(txn, ObjectBuffer::new(&mut [(NODE_ID.0, &node.0.as_str())]))?,
+ .insert(txn, ObjectBuffer::new(&mut [(NO_SLUG.0, &slug)]))?,
};
- let ob_before = self.nodes.get(txn, node)?.unwrap();
- let mut hash_before = DefaultHasher::new();
- ob_before.hash(&mut hash_before);
-
- let ob_after = update(ob_before);
-
- let mut hash_after = DefaultHasher::new();
- ob_after.hash(&mut hash_after);
- if hash_before.finish() != hash_after.finish() {
- self.nodes.update(txn, node, ob_after)?;
- }
+ let node = self.nodes.get(txn, row)?.unwrap();
+ let node = update(node);
+ let node = node.as_object().insert(NO_SLUG, slug);
+ self.nodes.update(txn, row, node)?;
Ok(())
})?;
- Ok(())
+ Ok(row)
+ }
+ pub fn get_node(&self, node: RowNum) -> Result<Option<ObjectBuffer>> {
+ let mut buf = None;
+ self.db.read_transaction(&mut |txn| {
+ buf = self.nodes.get(txn, node)?;
+ Ok(())
+ })?;
+ Ok(buf)
}
}
@@ -237,15 +232,14 @@ fn import_traverse(
path: &Path,
dba: &DatabaseTables,
incremental: bool,
- parent: Option<NodeID>,
+ parent: Option<RowNum>,
mut iflags: InheritedFlags,
- out: &Mutex<Vec<(PathBuf, NodeID, InheritedFlags)>>,
+ out: &Mutex<Vec<(PathBuf, RowNum, InheritedFlags)>>,
) -> Result<()> {
if path.is_dir() {
reporting::set_task(format!("indexing {path:?}"));
let slug = get_node_slug(path).unwrap();
- let node = NodeID::from_slug(&slug);
// Some flags need to applied immediatly because they are inherited
if let Ok(content) = read_to_string(path.join("flags")) {
@@ -259,18 +253,23 @@ fn import_traverse(
}
}
- dba.update_node_by_nodeid(node, |n| {
- if parent != NodeID::MIN {
- n.parents.insert(parent);
+ let row = dba.update_node_slug(&slug, |mut node| {
+ if let Some(parent) = parent {
+ node = node.as_object().extend(NO_PARENT, [parent]);
}
- n.slug = slug;
- n.visibility = iflags.visibility;
+ if iflags.hidden {
+ node = node.as_object().insert(NO_VISIBILITY, VISI_HIDDEN);
+ }
+ if iflags.reduced {
+ node = node.as_object().insert(NO_VISIBILITY, VISI_REDUCED);
+ }
+ node
})?;
path.read_dir()?.par_bridge().try_for_each(|e| {
let path = e?.path();
reporting::catch(
- import_traverse(&path, dba, incremental, node, iflags, out)
+ import_traverse(&path, dba, incremental, Some(row), iflags, out)
.context(anyhow!("index {:?}", path.file_name().unwrap())),
);
anyhow::Ok(())
@@ -278,15 +277,12 @@ fn import_traverse(
return Ok(());
}
- if path.is_file() {
- let meta = path.metadata()?;
- let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
-
+ if path.is_file()
+ && let Some(parent) = parent
+ {
if incremental {
- if let Some(last_mtime) = dba.get_import_file_mtime(path)? {
- if last_mtime >= mtime {
- return Ok(());
- }
+ if compare_mtime(dba, path)? {
+ return Ok(());
}
}
@@ -303,10 +299,10 @@ fn import_traverse(
fn import_file(
dba: &DatabaseTables,
rt: &Handle,
- pending_nodes: &Mutex<HashSet<NodeID>>,
+ pending_nodes: &Mutex<HashSet<RowNum>>,
plugins: &[Box<dyn ImportPlugin>],
path: &Path,
- parent: NodeID,
+ parent: RowNum,
iflags: InheritedFlags,
) {
let mut all_ok = true;
@@ -341,17 +337,20 @@ fn import_file(
if filename.ends_with("mkv") || filename.ends_with("mka") || filename.ends_with("mks") {
let slug = get_node_slug(path).unwrap();
- let node = NodeID::from_slug(&slug);
- pending_nodes.lock().unwrap().insert(node);
- all_ok &= reporting::catch(db.update_node_init(node, |node| {
- node.slug = slug;
- if parent != NodeID::MIN {
- node.parents.insert(parent);
+ let Some(row) = reporting::catch(dba.update_node_slug(&slug, |mut node| {
+ node = node.as_object().extend(NO_PARENT, [parent]);
+ if iflags.hidden {
+ node = node.as_object().insert(NO_VISIBILITY, VISI_HIDDEN);
}
- node.visibility = iflags.visibility;
- }))
- .is_some();
+ if iflags.reduced {
+ node = node.as_object().insert(NO_VISIBILITY, VISI_REDUCED);
+ }
+ node
+ })) else {
+ return;
+ };
+ pending_nodes.lock().unwrap().insert(row);
let flags = filename
.split(".")
@@ -368,7 +367,7 @@ fn import_file(
if inf.handle_instruction {
reporting::set_task(format!("{}(inst): {path:?}", inf.name));
all_ok &= reporting::catch(
- p.instruction(&ct, node, line)
+ p.instruction(&ct, row, line)
.context(anyhow!("{}(inst) {path:?}", inf.name)),
)
.is_some();
@@ -387,7 +386,7 @@ fn import_file(
if inf.handle_media {
reporting::set_task(format!("{}(media): {path:?}", inf.name));
all_ok &= reporting::catch(
- p.media(&ct, node, path, &seg)
+ p.media(&ct, row, path, &seg)
.context(anyhow!("{}(media) {path:?}", inf.name)),
)
.is_some();
@@ -408,7 +407,7 @@ fn import_file(
}
if all_ok {
- reporting::catch(update_mtime(db, path).context("updating mtime"));
+ reporting::catch(update_mtime(dba, path).context("updating mtime"));
}
}
@@ -416,26 +415,22 @@ fn process_node(
dba: &DatabaseTables,
rt: &Handle,
plugins: &[Box<dyn ImportPlugin>],
- pending_nodes: &Mutex<HashSet<NodeID>>,
- node: NodeID,
+ pending_nodes: &Mutex<HashSet<RowNum>>,
+ node: RowNum,
) {
- let Some(data) = reporting::catch(
- dba.get_node(node)
- .and_then(|e| e.ok_or(anyhow!("node missing"))),
- ) else {
- return;
- };
- let slug = &data.slug;
+ let mut slug = String::new();
+ reporting::catch(dba.db.read_transaction(&mut |txn| {
+ let no = dba.nodes.get(txn, node)?.unwrap();
+ if let Some(s) = no.as_object().get(NO_SLUG) {
+ slug = s.to_owned();
+ }
+ Ok(())
+ }));
for p in plugins {
let inf = p.info();
if inf.handle_process {
reporting::set_task(format!("{}(proc): {slug}", inf.name));
- let Some(data) = reporting::catch(
- dba.get_node(node)
- .and_then(|e| e.ok_or(anyhow!("node missing"))),
- ) else {
- return;
- };
+
reporting::catch(
p.process(
&ImportContext {
@@ -445,7 +440,6 @@ fn process_node(
pending_nodes,
},
node,
- &data,
)
.context(anyhow!("{}(proc) {slug}", inf.name)),
);
@@ -453,10 +447,60 @@ fn process_node(
}
}
-fn update_mtime(db: DatabaseTables, path: &Path) -> Result<()> {
+fn compare_mtime(dba: &DatabaseTables, path: &Path) -> Result<bool> {
let meta = path.metadata()?;
let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
- db.set_import_file_mtime(path, mtime)?;
+ let mut was_changed = false;
+ dba.db.read_transaction(&mut |txn| {
+ match dba.import_meta.query_single(
+ txn,
+ Query {
+ filter: Filter::Match(
+ TagPath(vec![IM_PATH.0]),
+ path.as_os_str().as_encoded_bytes().to_vec(),
+ ),
+ sort: Sort::None,
+ },
+ ) {
+ None => was_changed = true,
+ Some(row) => {
+ let meta = dba.import_meta.get(txn, row)?.unwrap();
+ let prev_mtime = meta.as_object().get(IM_MTIME).unwrap_or_default();
+ was_changed = mtime > prev_mtime
+ }
+ }
+ Ok(())
+ })?;
+ Ok(was_changed)
+}
+
+fn update_mtime(dba: &DatabaseTables, path: &Path) -> Result<()> {
+ let meta = path.metadata()?;
+ let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
+ dba.db.write_transaction(&mut |txn| {
+ let row = match dba.import_meta.query_single(
+ txn,
+ Query {
+ filter: Filter::Match(
+ TagPath(vec![IM_PATH.0]),
+ path.as_os_str().as_encoded_bytes().to_vec(),
+ ),
+ sort: Sort::None,
+ },
+ ) {
+ Some(row) => row,
+ None => dba.import_meta.insert(
+ txn,
+ ObjectBuffer::new(&mut [(IM_PATH.0, &path.as_os_str().as_encoded_bytes())]),
+ )?,
+ };
+
+ let mut ob = dba.import_meta.get(txn, row)?.unwrap();
+ ob = ob.as_object().insert(IM_MTIME, mtime);
+ dba.import_meta.update(txn, row, ob)?;
+
+ Ok(())
+ })?;
Ok(())
}