diff options
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r-- | import/src/lib.rs | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs new file mode 100644 index 0000000..3698f79 --- /dev/null +++ b/import/src/lib.rs @@ -0,0 +1,376 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ +#![feature(lazy_cell)] +use anyhow::{Context, Ok}; +use async_recursion::async_recursion; +use futures::{stream::FuturesUnordered, StreamExt}; +use jellybase::{ + cache::async_cache_file, database::Database, federation::Federation, AssetLocationExt, CONF, +}; +use jellyclient::Session; +use jellycommon::{ + AssetLocation, AssetRole, ImportOptions, ImportSource, Node, NodePrivate, NodePublic, +}; +use log::{debug, info}; +use std::{ + cmp::Ordering, + ffi::OsStr, + fs::File, + os::unix::prelude::OsStrExt, + path::{Path, PathBuf}, + sync::{Arc, LazyLock}, +}; +use tokio::sync::Semaphore; + +static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); + +pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { + let permit = IMPORT_SEM.try_acquire()?; + info!("loading sources..."); + import_path(CONF.library_path.clone(), vec![], db, fed) + .await + .context("indexing")?; + info!("merging nodes..."); + merge_nodes(db).context("merging nodes")?; + info!("clearing temporary node tree"); + db.node_import.clear()?; + info!("import completed"); + drop(permit); + Ok(()) +} + +pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { + for r in db.node_import.iter() { + let (id, mut nodes) = r?; + + nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); + + let node = nodes + .into_iter() + .map(|(_, x)| x) + .reduce(|x, y| merge_node(x, y)) + .unwrap(); + + db.node.insert(&id, &node)?; + } + Ok(()) +} + +fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { + if x.is_empty() { + Ordering::Greater + } else if y.is_empty() { + Ordering::Less + } else { + match x[0].cmp(&y[0]) { + o @ (Ordering::Less | Ordering::Greater) => o, + Ordering::Equal => compare_index_path(&x[1..], &y[1..]), + } + } +} + +#[async_recursion] +pub async fn import_path( + path: PathBuf, + index_path: Vec<usize>, + db: &Database, + fed: &Federation, +) -> anyhow::Result<()> { + if path.is_dir() { + let mut children_paths = path + .read_dir()? + .map(Result::unwrap) + .filter_map(|e| { + if e.path().extension() == Some(&OsStr::from_bytes(b"yaml")) + || e.metadata().unwrap().is_dir() + { + Some(e.path()) + } else { + None + } + }) + .collect::<Vec<_>>(); + + children_paths.sort(); + + let mut children: FuturesUnordered<_> = children_paths + .into_iter() + .enumerate() + .map(|(i, p)| { + import_path( + p.clone(), + { + let mut path = index_path.clone(); + path.push(i); + path + }, + db, + fed, + ) + }) + .collect(); + + while let Some(k) = children.next().await { + k? + } + } else { + let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?; + + for s in opts.sources { + process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?; + } + } + Ok(()) +} + +async fn process_source( + id: String, + s: ImportSource, + path: &Path, + index_path: &[usize], + db: &Database, + fed: &Federation, +) -> anyhow::Result<()> { + let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { + db.node_import.fetch_and_update(id, |l| { + let mut l = l.unwrap_or_default(); + l.push((index_path.to_vec(), n.clone())); + Some(l) + })?; + Ok(()) + }; + match s { + ImportSource::Override(n) => insert_node(&id, n)?, + ImportSource::Tmdb(_) => todo!(), + ImportSource::Media { location } => { + let path = location.path(); + } + ImportSource::Federated { host } => { + let session = fed.get_session(&host).await.context("creating session")?; + + import_remote(id, &host, db, &session, index_path) + .await + .context("federated import")? + } + ImportSource::AutoChildren => { + // TODO dont forget to update path of children + } + } + Ok(()) +} + +fn merge_node(x: Node, y: Node) -> Node { + Node { + public: NodePublic { + kind: x.public.kind.or(y.public.kind), + title: x.public.title.or(y.public.title), + id: x.public.id.or(y.public.id), + path: vec![], + children: if x.public.children.is_empty() { + x.public.children + } else { + y.public.children + }, + tagline: x.public.tagline.or(y.public.tagline), + description: x.public.description.or(y.public.description), + release_date: x.public.release_date.or(y.public.release_date), + index: x.public.index.or(y.public.index), + media: x.public.media.or(y.public.media), // TODO proper media merging + ratings: x + .public + .ratings + .into_iter() + .chain(y.public.ratings) + .collect(), + federated: x.public.federated.or(y.public.federated), + }, + private: NodePrivate { + id: x.private.id.or(y.private.id), + poster: x.private.poster.or(y.private.poster), + backdrop: x.private.backdrop.or(y.private.backdrop), + source: x.private.source.or(y.private.source), // TODO here too + }, + } +} + +// #[async_recursion] +// pub async fn import_path( +// path: PathBuf, +// db: &Database, +// fed: &Federation, +// mut node_path: Vec<String>, +// ) -> anyhow::Result<(Vec<String>, usize)> { +// if path.is_dir() { +// let mpath = path.join("directory.json"); +// let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| { +// if e.path().extension() == Some(&OsStr::from_bytes(b"jelly")) +// || e.metadata().unwrap().is_dir() +// { +// Some(e.path()) +// } else { +// None +// } +// }); +// let identifier = if mpath.exists() { +// path.file_name().unwrap().to_str().unwrap().to_string() +// } else { +// node_path +// .last() +// .cloned() +// .ok_or(anyhow!("non-root node requires parent"))? +// }; + +// node_path.push(identifier.clone()); +// let mut all: FuturesUnordered<_> = children_paths +// .into_iter() +// .map(|p| import_path(p.clone(), db, fed, node_path.clone()).map_err(|e| (p, e))) +// .collect(); +// node_path.pop(); // we will set the dirs path later and need it to not be included + +// let mut children_ids = Vec::new(); +// let mut errs = 0; +// while let Some(k) = all.next().await { +// match k { +// core::result::Result::Ok((els, errs2)) => { +// errs += errs2; +// children_ids.extend(els) +// } +// Err((p, e)) => { +// errs += 1; +// error!("import of {p:?} failed: {e:?}") +// } +// } +// } +// if mpath.exists() { +// let mut node: Node = +// serde_json::from_reader(File::open(mpath).context("metadata missing")?)?; + +// node.public.children = children_ids; +// node.public.path = node_path; +// node.public.id = Some(identifier.to_owned()); +// info!("adding {identifier}"); +// db.node.insert(&identifier, &node)?; +// Ok((vec![identifier], errs)) +// } else { +// Ok((children_ids, errs)) +// } +// } else if path.is_file() { +// info!("loading {path:?}"); +// let datafile = File::open(path.clone()).context("cant load metadata")?; +// let mut node: Node = serde_json::from_reader(datafile).context("invalid metadata")?; +// let identifier = node.private.id.clone().unwrap_or_else(|| { +// path.file_name() +// .unwrap() +// .to_str() +// .unwrap() +// .strip_suffix(".json") +// .unwrap() +// .to_string() +// }); + +// let idents = if let Some(io) = node.private.import.take() { +// let session = fed +// .get_session(&io.host) +// .await +// .context("creating session")?; + +// import_remote(io, db, &session, identifier.clone(), node_path) +// .await +// .context("federated import")? +// } else { +// debug!("adding {identifier}"); +// node.public.path = node_path; +// node.public.id = Some(identifier.to_owned()); +// let did_insert = db.node.insert(&identifier, &node)?.is_none(); +// if did_insert { +// vec![identifier] +// } else { +// vec![] +// } +// }; +// Ok((idents, 0)) +// } else { +// bail!("did somebody really put a fifo or socket in the library?!") +// } +// } + +static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(16)); + +#[async_recursion] +async fn import_remote( + id: String, + host: &str, + db: &Database, + session: &Arc<Session>, + index_path: &[usize], +) -> anyhow::Result<()> { + let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { + db.node_import.fetch_and_update(id, |l| { + let mut l = l.unwrap_or_default(); + l.push((index_path.to_vec(), n.clone())); + Some(l) + })?; + Ok(()) + }; + let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); + info!("loading federated node {id:?}"); + + let node = session.node(&id).await.context("fetching remote node")?; + + if node.federated.as_ref() == Some(&CONF.hostname) { + return Ok(()); + } + + // TODO maybe use lazy download + let poster = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Poster).await?; + let backdrop = + cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Backdrop).await?; + + drop(_permit); + + let node = Node { + public: node.clone(), + private: NodePrivate { + backdrop: Some(backdrop), + poster: Some(poster), + id: None, + source: None, // TODO + }, + }; + + debug!("adding {id}"); + insert_node(&id, node.clone())?; + + let mut children: FuturesUnordered<_> = node + .public + .children + .iter() + .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) + .collect(); + + while let Some(r) = children.next().await { + r?; + } + + Ok(()) +} + +async fn cache_federation_asset( + session: Arc<Session>, + identifier: String, + role: AssetRole, +) -> anyhow::Result<AssetLocation> { + async_cache_file( + &["fed-asset", role.as_str(), &identifier.clone()], + move |out| async move { + let session = session; + session + .node_asset(identifier.as_str(), role, 1024, out) + .await + }, + ) + .await +} |