aboutsummaryrefslogtreecommitdiff
path: root/import/src
diff options
context:
space:
mode:
Diffstat (limited to 'import/src')
-rw-r--r--import/src/db.rs165
-rw-r--r--import/src/lib.rs198
2 files changed, 230 insertions, 133 deletions
diff --git a/import/src/db.rs b/import/src/db.rs
new file mode 100644
index 0000000..88c5601
--- /dev/null
+++ b/import/src/db.rs
@@ -0,0 +1,165 @@
+use std::collections::HashMap;
+
+use anyhow::anyhow;
+use jellybase::database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT};
+use jellycommon::{ExtendedNode, Node};
+use log::info;
+use std::sync::RwLock;
+
+pub(crate) trait ImportStorage: Sync {
+ fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>;
+ fn add_partial_node_ext(
+ &self,
+ id: &str,
+ index_path: &[usize],
+ node: ExtendedNode,
+ ) -> anyhow::Result<()>;
+
+ fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>>;
+ fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>;
+
+ fn remove_prev_nodes(&self) -> anyhow::Result<()>;
+ fn finish(&self) -> anyhow::Result<()>;
+}
+
+pub(crate) struct DatabaseStorage<'a> {
+ pub db: &'a DataAcid,
+}
+impl<'a> DatabaseStorage<'a> {
+ pub fn new(db: &'a DataAcid) -> Self {
+ Self { db }
+ }
+}
+impl ImportStorage for DatabaseStorage<'_> {
+ fn remove_prev_nodes(&self) -> anyhow::Result<()> {
+ info!("removing old nodes...");
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+ fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
+ let txn = self.db.inner.begin_read()?;
+ let table = txn.open_table(T_NODE_IMPORT)?;
+ let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?;
+ Ok(value.value().0)
+ }
+ fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
+ let txn_write = self.db.inner.begin_write()?;
+ let mut t_node = txn_write.open_table(T_NODE)?;
+ t_node.insert(id, Ser(node))?;
+ drop(t_node);
+ txn_write.commit()?;
+ Ok(())
+ }
+
+ fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+
+ let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
+ parts.push((index_path.to_vec(), node.clone()));
+ table.insert(id, Ser(parts))?;
+
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ fn add_partial_node_ext(
+ &self,
+ id: &str,
+ _index_path: &[usize],
+ node: ExtendedNode,
+ ) -> anyhow::Result<()> {
+ // TODO merge this
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_EXTENDED)?;
+ table.insert(id, Ser(node))?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ fn finish(&self) -> anyhow::Result<()> {
+ info!("clearing temporary node tree...");
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+}
+
+pub(crate) struct MemoryStorage<'a> {
+ pub db: &'a DataAcid,
+ pub parts: RwLock<HashMap<String, Vec<(Vec<usize>, Node)>>>,
+}
+impl<'a> MemoryStorage<'a> {
+ pub fn new(db: &'a DataAcid) -> Self {
+ Self {
+ db,
+ parts: Default::default(),
+ }
+ }
+}
+impl ImportStorage for MemoryStorage<'_> {
+ fn remove_prev_nodes(&self) -> anyhow::Result<()> {
+ info!("removing old nodes...");
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+ fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
+ Ok(self
+ .parts
+ .read()
+ .unwrap()
+ .get(id)
+ .ok_or(anyhow!("node parts not found"))?
+ .to_owned())
+ }
+ fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
+ let txn_write = self.db.inner.begin_write()?;
+ let mut t_node = txn_write.open_table(T_NODE)?;
+ t_node.insert(id, Ser(node))?;
+ drop(t_node);
+ txn_write.commit()?;
+ Ok(())
+ }
+
+ fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
+ self.parts
+ .write()
+ .unwrap()
+ .entry(id.to_owned())
+ .or_default()
+ .push((index_path.to_owned(), node));
+ Ok(())
+ }
+
+ fn add_partial_node_ext(
+ &self,
+ id: &str,
+ _index_path: &[usize],
+ node: ExtendedNode,
+ ) -> anyhow::Result<()> {
+ // TODO merge this
+ let txn = self.db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_EXTENDED)?;
+ table.insert(id, Ser(node))?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ fn finish(&self) -> anyhow::Result<()> {
+ Ok(())
+ }
+}
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 274b63d..40648d0 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -4,6 +4,7 @@
Copyright (C) 2024 metamuffin <metamuffin.org>
*/
#![feature(lazy_cell)]
+pub mod db;
pub mod infojson;
pub mod tmdb;
pub mod trakt;
@@ -11,11 +12,12 @@ pub mod trakt;
use anyhow::{anyhow, bail, Context, Ok};
use async_recursion::async_recursion;
use base64::Engine;
+use db::{DatabaseStorage, ImportStorage, MemoryStorage};
use futures::{stream::FuturesUnordered, StreamExt};
use jellybase::{
assetfed::AssetInner,
cache::{async_cache_file, cache_memory},
- database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT},
+ database::{DataAcid, ReadableTable, T_NODE_IMPORT},
federation::Federation,
CONF, SECRETS,
};
@@ -73,94 +75,68 @@ pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> {
tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
};
+ if CONF.use_in_memory_import_storage {
+ import_inner(&MemoryStorage::new(db), fed, &ap).await?;
+ } else {
+ import_inner(&DatabaseStorage::new(db), fed, &ap).await?;
+ }
+
+ drop(permit);
+ Ok(())
+}
+
+pub(crate) async fn import_inner(
+ db: &impl ImportStorage,
+ fed: &Federation,
+ ap: &Apis,
+) -> anyhow::Result<()> {
info!("loading sources...");
- import_path(CONF.library_path.clone(), vec![], db, fed, &ap)
+ import_path(CONF.library_path.clone(), vec![], db, fed, ap)
.await
.context("indexing")?;
- info!("removing old nodes...");
- {
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE)?;
- table.drain::<&str>(..)?;
- drop(table);
- txn.commit()?;
- }
+ db.remove_prev_nodes()?;
info!("merging nodes...");
- merge_nodes(db).context("merging nodes")?;
- info!("generating paths...");
- generate_node_paths(db).context("generating paths")?;
- info!("clearing temporary node tree...");
- {
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
- table.drain::<&str>(..)?;
- drop(table);
- txn.commit()?;
- }
+ generate_node_paths(db).context("merging nodes")?;
+ db.finish()?;
info!("import completed");
- drop(permit);
Ok(())
}
-pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> {
- let txn_read = db.inner.begin_read()?;
- let t_node_import = txn_read.open_table(T_NODE_IMPORT)?;
- for r in t_node_import.iter()? {
- let (id, nodes) = r?;
- let mut nodes = nodes.value().0;
+fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<()> {
+ // TODO mark nodes done to allow recursion
+ fn traverse(
+ db: &impl ImportStorage,
+ id: String,
+ mut path: Vec<String>,
+ parent_title: &str,
+ ) -> anyhow::Result<()> {
+ let node = {
+ let mut parts = db
+ .get_partial_parts(&id)
+ .context(anyhow!("path = {path:?}"))?;
- nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
+ parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
- let mut node = nodes
- .into_iter()
- .map(|(_, x)| x)
- .reduce(|x, y| merge_node(x, y).unwrap())
- .unwrap();
+ let mut node = parts
+ .into_iter()
+ .map(|(_, x)| x)
+ .reduce(|x, y| merge_node(x, y).unwrap())
+ .unwrap();
- node.public.id = Some(id.value().to_owned());
- node.public.path = vec![]; // will be reconstructed in the next pass
- node.public.federated = None;
+ node.public.id = Some(id.to_owned());
+ node.public.path = vec![]; // will be reconstructed in the next pass
+ node.public.federated = None;
- // TODO this discardes a lot of information. maybe change this.
- if let Some(media) = &node.public.media {
- for t in &media.tracks {
- if let Some(host) = t.federated.first() {
- if host != &CONF.hostname {
- node.public.federated = Some(host.to_string())
+ // TODO this discardes a lot of information. maybe change this.
+ if let Some(media) = &node.public.media {
+ for t in &media.tracks {
+ if let Some(host) = t.federated.first() {
+ if host != &CONF.hostname {
+ node.public.federated = Some(host.to_string())
+ }
}
}
}
- }
-
- {
- let txn_write = db.inner.begin_write()?;
- let mut t_node = txn_write.open_table(T_NODE)?;
- t_node.insert(id.value(), Ser(node))?;
- drop(t_node);
- txn_write.commit()?;
- }
- }
- Ok(())
-}
-
-pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> {
- fn traverse(
- db: &DataAcid,
- c: String,
- mut path: Vec<String>,
- parent_title: &str,
- ) -> anyhow::Result<()> {
- let node = {
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE)?;
-
- let mut node = table
- .get(&*c)?
- .ok_or(anyhow!(
- "missing child when generating paths: {c:?} at {path:?}"
- ))?
- .value()
- .0;
if node.public.path.is_empty() {
node.public.path = path.clone();
@@ -182,14 +158,12 @@ pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> {
_ => None,
};
- table.insert(c.as_str(), Ser(node.clone()))?;
+ db.insert_complete_node(&id, node.clone())?;
- drop(table);
- txn.commit()?;
node
};
- path.push(c);
+ path.push(id);
let ps = node.public.title.unwrap_or_default();
for c in node.public.children {
traverse(db, c, path.clone(), &ps)?;
@@ -217,7 +191,7 @@ fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
async fn import_path(
path: PathBuf,
mut index_path: Vec<usize>,
- db: &DataAcid,
+ db: &impl ImportStorage,
fed: &Federation,
ap: &Apis,
) -> anyhow::Result<()> {
@@ -288,31 +262,10 @@ async fn process_source(
s: ImportSource,
path: &Path,
index_path: &[usize],
- db: &DataAcid,
+ db: &impl ImportStorage,
fed: &Federation,
ap: &Apis,
) -> anyhow::Result<()> {
- let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> {
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
-
- let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
- node.push((index_path.to_vec(), n.clone()));
- table.insert(id, Ser(node))?;
-
- drop(table);
- txn.commit()?;
- Ok(())
- };
- let insert_node_ext = move |id: &str, n: ExtendedNode| -> anyhow::Result<()> {
- // TODO merge this
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_EXTENDED)?;
- table.insert(id, Ser(n))?;
- drop(table);
- txn.commit()?;
- Ok(())
- };
match s {
ImportSource::Override(mut n) => {
if let Some(backdrop) = n.private.backdrop.clone() {
@@ -321,7 +274,7 @@ async fn process_source(
if let Some(poster) = n.private.poster.clone() {
n.public.poster = Some(AssetInner::Library(poster).ser());
}
- insert_node(&id, n)?
+ db.add_partial_node(&id, index_path, n)?
}
ImportSource::Trakt { id: tid, kind } => {
info!("trakt {id}");
@@ -386,8 +339,8 @@ async fn process_source(
}
}
}
- insert_node(&id, node)?;
- insert_node_ext(&id, node_ext)?;
+ db.add_partial_node(&id, index_path, node)?;
+ db.add_partial_node_ext(&id, index_path, node_ext)?;
if let Some(tid) = trakt_object.ids.tmdb {
if let Some(kind) = match kind {
@@ -441,7 +394,7 @@ async fn process_source(
parse_release_date(&date).context("parsing release date")?;
}
- insert_node(&id, node)?;
+ db.add_partial_node(&id, index_path, node)?;
}
ImportSource::Media {
path: mpath,
@@ -486,7 +439,7 @@ async fn process_source(
node.public.children.push(inf_id);
}
}
- insert_node(&id, node)?;
+ db.add_partial_node(&id, index_path, node)?;
} else if abspath.is_file() {
let _permit = SEM_IMPORT.acquire().await.unwrap();
let metadata = {
@@ -579,7 +532,7 @@ async fn process_source(
}
}
drop(_permit);
- insert_node(&id, node)?;
+ db.add_partial_node(&id, index_path, node)?;
} else {
warn!("non file/dir import ignored: {abspath:?}")
}
@@ -615,8 +568,9 @@ async fn process_source(
children.push(opts.id);
}
}
- insert_node(
+ db.add_partial_node(
&id,
+ index_path,
Node {
private: NodePrivate::default(),
public: NodePublic {
@@ -771,32 +725,10 @@ static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16);
async fn import_remote(
id: String,
host: &str,
- db: &DataAcid,
+ db: &impl ImportStorage,
session: &Arc<Session>,
index_path: &[usize],
) -> anyhow::Result<()> {
- let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> {
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
-
- let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
- node.push((index_path.to_vec(), n.clone()));
- table.insert(id, Ser(node))?;
-
- drop(table);
- txn.commit()?;
- Ok(())
- };
- let insert_node_ext = move |id: &str, n: ExtendedNode| -> anyhow::Result<()> {
- // TODO merge this
- let txn = db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_EXTENDED)?;
- table.insert(id, Ser(n))?;
- drop(table);
- txn.commit()?;
- Ok(())
- };
-
let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();
info!("loading federated node {id:?}");
@@ -837,8 +769,8 @@ async fn import_remote(
}
debug!("adding {id}");
- insert_node(&id, node.clone())?;
- insert_node_ext(&id, node_ext)?;
+ db.add_partial_node(&id, index_path, node.clone())?;
+ db.add_partial_node_ext(&id, index_path, node_ext)?;
let mut children: FuturesUnordered<_> = node
.public