aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs75
1 files changed, 61 insertions, 14 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
index aab0c70..ad929fa 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -14,8 +14,12 @@ use crate::{
};
use anyhow::{Context, Result, anyhow};
use jellycache::{HashKey, cache_memory, cache_store};
-use jellycommon::{NodeID, Visibility};
-use jellydb::Database;
+use jellycommon::jellyobject::{self, Object, ObjectBuffer, Tag, TypedTag};
+use jellydb::{
+ backends::Database,
+ query::{Filter, Query, Sort},
+ table::{RowNum, Table},
+};
use jellyremuxer::{
demuxers::create_demuxer_autodetect,
matroska::{self, AttachedFile, Segment},
@@ -29,6 +33,7 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fs::{File, read_to_string},
+ marker::PhantomData,
mem::swap,
path::{Path, PathBuf},
sync::{Arc, LazyLock, Mutex},
@@ -76,7 +81,49 @@ pub fn is_importing() -> bool {
IMPORT_SEM.available_permits() == 0
}
-pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
+pub struct NodeID(pub String);
+
+const NODE_ID: TypedTag<&str> = TypedTag(Tag(0x8123), PhantomData);
+
+struct DatabaseTables {
+ db: Arc<dyn Database>,
+ nodes: Table,
+}
+
+fn node_id_query(node: &NodeID) -> Query {
+ Query {
+ filter: Filter::Match(
+ jellyobject::Path(vec![NODE_ID.0]),
+ node.0.as_bytes().to_vec(),
+ ),
+ sort: Sort::None,
+ }
+}
+
+impl DatabaseTables {
+ pub fn update_node_init(
+ &self,
+ node: NodeID,
+ mut update: impl FnMut(Object) -> Option<ObjectBuffer>,
+ ) -> Result<()> {
+ self.db.write_transaction(&mut |txn| {
+ let node = match self.nodes.query_single(txn, node_id_query(&node)) {
+ Some(r) => r,
+ None => self
+ .nodes
+ .insert(txn, ObjectBuffer::new(&mut [(NODE_ID.0, &node.0.as_str())]))?,
+ };
+ let ob = self.nodes.get(txn, node)?.unwrap();
+ if let Some(changed) = update(ob.as_object()) {
+ self.nodes.update(txn, node, changed)?;
+ }
+ Ok(())
+ })?;
+ Ok(())
+ }
+}
+
+pub async fn import_wrap(db: DatabaseTables, incremental: bool) -> Result<()> {
let _sem = IMPORT_SEM.try_acquire().context("already importing")?;
let rt = Handle::current();
@@ -86,9 +133,9 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
.build()?;
let jh = spawn_blocking(move || {
- tp.install(|| {
+ tp.install(move || {
reporting::start_import();
- reporting::catch(import(&db, &rt, incremental));
+ reporting::catch(import(db, &rt, incremental));
reporting::end_import();
})
});
@@ -98,7 +145,7 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
Ok(())
}
-fn import(db: &Database, rt: &Handle, incremental: bool) -> Result<()> {
+fn import(db: DatabaseTables, rt: &Handle, incremental: bool) -> Result<()> {
let plugins = init_plugins(&CONF.api);
let files = Mutex::new(Vec::new());
import_traverse(
@@ -165,11 +212,11 @@ impl Default for InheritedFlags {
fn import_traverse(
path: &Path,
- db: &Database,
+ db: DatabaseTables,
incremental: bool,
- parent: NodeID,
+ parent: Option<RowNum>,
mut iflags: InheritedFlags,
- out: &Mutex<Vec<(PathBuf, NodeID, InheritedFlags)>>,
+ out: &Mutex<Vec<(PathBuf, RowNum, InheritedFlags)>>,
) -> Result<()> {
if path.is_dir() {
reporting::set_task(format!("indexing {path:?}"));
@@ -244,7 +291,7 @@ fn import_file(
db,
rt,
iflags,
- nodes,
+ pending_nodes: nodes,
};
let filename = path.file_name().unwrap().to_string_lossy();
if filename == "flags" {
@@ -343,7 +390,7 @@ fn import_file(
}
fn process_node(
- db: &Database,
+ dba: DatabaseTables,
rt: &Handle,
plugins: &[Box<dyn ImportPlugin>],
nodes: &Mutex<HashSet<NodeID>>,
@@ -369,10 +416,10 @@ fn process_node(
reporting::catch(
p.process(
&ImportContext {
- db,
+ dba,
rt,
iflags: InheritedFlags::default(),
- nodes,
+ pending_nodes: nodes,
},
node,
&data,
@@ -383,7 +430,7 @@ fn process_node(
}
}
-fn update_mtime(db: &Database, path: &Path) -> Result<()> {
+fn update_mtime(db: DatabaseTables, path: &Path) -> Result<()> {
let meta = path.metadata()?;
let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
db.set_import_file_mtime(path, mtime)?;