aboutsummaryrefslogtreecommitdiff
path: root/import/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r--import/src/lib.rs435
1 files changed, 435 insertions, 0 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs
new file mode 100644
index 0000000..ed0af2d
--- /dev/null
+++ b/import/src/lib.rs
@@ -0,0 +1,435 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2023 metamuffin <metamuffin.org>
+*/
+#![feature(lazy_cell)]
+pub mod infojson;
+pub mod tmdb;
+
+use anyhow::{Context, Ok};
+use async_recursion::async_recursion;
+use futures::{stream::FuturesUnordered, StreamExt};
+use jellybase::{
+ cache::async_cache_file, database::Database, federation::Federation, AssetLocationExt, CONF,
+};
+use jellyclient::Session;
+use jellycommon::{
+ AssetLocation, AssetRole, ImportOptions, ImportSource, Node, NodePrivate, NodePublic,
+};
+use jellymatroska::read::EbmlReader;
+use jellyremuxer::import::import_metadata;
+use log::{debug, info};
+use std::{
+ cmp::Ordering,
+ ffi::OsStr,
+ fs::File,
+ io::BufReader,
+ os::unix::prelude::OsStrExt,
+ path::{Path, PathBuf},
+ sync::{Arc, LazyLock},
+};
+use tokio::{sync::Semaphore, task::spawn_blocking};
+
+static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
+
+pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> {
+ let permit = IMPORT_SEM.try_acquire()?;
+ info!("loading sources...");
+ import_path(CONF.library_path.clone(), vec![], db, fed)
+ .await
+ .context("indexing")?;
+ info!("merging nodes...");
+ merge_nodes(db).context("merging nodes")?;
+ info!("clearing temporary node tree");
+ db.node_import.clear()?;
+ info!("import completed");
+ drop(permit);
+ Ok(())
+}
+
+pub fn merge_nodes(db: &Database) -> anyhow::Result<()> {
+ for r in db.node_import.iter() {
+ let (id, mut nodes) = r?;
+
+ nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
+
+ let mut node = nodes
+ .into_iter()
+ .map(|(_, x)| x)
+ .reduce(|x, y| merge_node(x, y))
+ .unwrap();
+
+ node.public.id = Some(id.clone());
+ node.public.path = vec!["library".to_string()]; // TODO reconstruct from children
+
+ db.node.insert(&id, &node)?;
+ }
+ Ok(())
+}
+
+fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
+ if x.is_empty() {
+ Ordering::Greater
+ } else if y.is_empty() {
+ Ordering::Less
+ } else {
+ match x[0].cmp(&y[0]) {
+ o @ (Ordering::Less | Ordering::Greater) => o,
+ Ordering::Equal => compare_index_path(&x[1..], &y[1..]),
+ }
+ }
+}
+
+#[async_recursion]
+pub async fn import_path(
+ path: PathBuf,
+ index_path: Vec<usize>,
+ db: &Database,
+ fed: &Federation,
+) -> anyhow::Result<()> {
+ if path.is_dir() {
+ let mut children_paths = path
+ .read_dir()?
+ .map(Result::unwrap)
+ .filter_map(|e| {
+ if e.path().extension() == Some(&OsStr::from_bytes(b"yaml"))
+ || e.metadata().unwrap().is_dir()
+ {
+ Some(e.path())
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ children_paths.sort();
+
+ let mut children: FuturesUnordered<_> = children_paths
+ .into_iter()
+ .enumerate()
+ .map(|(i, p)| {
+ import_path(
+ p.clone(),
+ {
+ let mut path = index_path.clone();
+ path.push(i);
+ path
+ },
+ db,
+ fed,
+ )
+ })
+ .collect();
+
+ while let Some(k) = children.next().await {
+ k?
+ }
+ } else {
+ let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?;
+
+ for s in opts.sources {
+ process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?;
+ }
+ }
+ Ok(())
+}
+
+async fn process_source(
+ id: String,
+ s: ImportSource,
+ path: &Path,
+ index_path: &[usize],
+ db: &Database,
+ fed: &Federation,
+) -> anyhow::Result<()> {
+ let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
+ db.node_import.fetch_and_update(id, |l| {
+ let mut l = l.unwrap_or_default();
+ l.push((index_path.to_vec(), n.clone()));
+ Some(l)
+ })?;
+ Ok(())
+ };
+ match s {
+ ImportSource::Override(n) => insert_node(&id, n)?,
+ ImportSource::Tmdb(_) => todo!(),
+ ImportSource::Media { location } => {
+ let media_path = location.path();
+
+ let metadata = spawn_blocking(move || {
+ let input = BufReader::new(File::open(&media_path).unwrap());
+ let mut input = EbmlReader::new(input);
+ import_metadata(&mut input)
+ })
+ .await??;
+
+ // if let Some(cover) = metadata.cover {
+ // let pu = path.join(format!(
+ // "cover.{}",
+ // match mime.as_str() {
+ // "image/webp" => "webp",
+ // "image/jpeg" => "jpeg",
+ // "image/png" => "png",
+ // _ => {
+ // warn!("unknown mime, just using webp");
+ // "webp"
+ // }
+ // }
+ // ));
+ // if !pu.exists() {
+ // let mut f = tokio::fs::File::create(&pu).await?;
+ // f.write_all(&data).await?;
+ // }
+ // }
+
+ let node = Node {
+ public: NodePublic {
+ title: metadata.title,
+ description: metadata.description,
+ tagline: metadata.tagline,
+ ..Default::default()
+ },
+ private: NodePrivate::default(),
+ };
+ insert_node(&id, node)?;
+ }
+ ImportSource::Federated { host } => {
+ let session = fed.get_session(&host).await.context("creating session")?;
+
+ import_remote(id, &host, db, &session, index_path)
+ .await
+ .context("federated import")?
+ }
+ ImportSource::AutoChildren => {
+ // TODO dont forget to update path of children
+ }
+ }
+ Ok(())
+}
+
+fn merge_node(x: Node, y: Node) -> Node {
+ Node {
+ public: NodePublic {
+ kind: x.public.kind.or(y.public.kind),
+ title: x.public.title.or(y.public.title),
+ id: x.public.id.or(y.public.id),
+ path: vec![],
+ children: if x.public.children.is_empty() {
+ x.public.children
+ } else {
+ y.public.children
+ },
+ tagline: x.public.tagline.or(y.public.tagline),
+ description: x.public.description.or(y.public.description),
+ release_date: x.public.release_date.or(y.public.release_date),
+ index: x.public.index.or(y.public.index),
+ media: x.public.media.or(y.public.media), // TODO proper media merging
+ ratings: x
+ .public
+ .ratings
+ .into_iter()
+ .chain(y.public.ratings)
+ .collect(),
+ federated: x.public.federated.or(y.public.federated),
+ },
+ private: NodePrivate {
+ id: x.private.id.or(y.private.id),
+ poster: x.private.poster.or(y.private.poster),
+ backdrop: x.private.backdrop.or(y.private.backdrop),
+ source: x.private.source.or(y.private.source), // TODO here too
+ },
+ }
+}
+
+// #[async_recursion]
+// pub async fn import_path(
+// path: PathBuf,
+// db: &Database,
+// fed: &Federation,
+// mut node_path: Vec<String>,
+// ) -> anyhow::Result<(Vec<String>, usize)> {
+// if path.is_dir() {
+// let mpath = path.join("directory.json");
+// let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| {
+// if e.path().extension() == Some(&OsStr::from_bytes(b"jelly"))
+// || e.metadata().unwrap().is_dir()
+// {
+// Some(e.path())
+// } else {
+// None
+// }
+// });
+// let identifier = if mpath.exists() {
+// path.file_name().unwrap().to_str().unwrap().to_string()
+// } else {
+// node_path
+// .last()
+// .cloned()
+// .ok_or(anyhow!("non-root node requires parent"))?
+// };
+
+// node_path.push(identifier.clone());
+// let mut all: FuturesUnordered<_> = children_paths
+// .into_iter()
+// .map(|p| import_path(p.clone(), db, fed, node_path.clone()).map_err(|e| (p, e)))
+// .collect();
+// node_path.pop(); // we will set the dirs path later and need it to not be included
+
+// let mut children_ids = Vec::new();
+// let mut errs = 0;
+// while let Some(k) = all.next().await {
+// match k {
+// core::result::Result::Ok((els, errs2)) => {
+// errs += errs2;
+// children_ids.extend(els)
+// }
+// Err((p, e)) => {
+// errs += 1;
+// error!("import of {p:?} failed: {e:?}")
+// }
+// }
+// }
+// if mpath.exists() {
+// let mut node: Node =
+// serde_json::from_reader(File::open(mpath).context("metadata missing")?)?;
+
+// node.public.children = children_ids;
+// node.public.path = node_path;
+// node.public.id = Some(identifier.to_owned());
+// info!("adding {identifier}");
+// db.node.insert(&identifier, &node)?;
+// Ok((vec![identifier], errs))
+// } else {
+// Ok((children_ids, errs))
+// }
+// } else if path.is_file() {
+// info!("loading {path:?}");
+// let datafile = File::open(path.clone()).context("cant load metadata")?;
+// let mut node: Node = serde_json::from_reader(datafile).context("invalid metadata")?;
+// let identifier = node.private.id.clone().unwrap_or_else(|| {
+// path.file_name()
+// .unwrap()
+// .to_str()
+// .unwrap()
+// .strip_suffix(".json")
+// .unwrap()
+// .to_string()
+// });
+
+// let idents = if let Some(io) = node.private.import.take() {
+// let session = fed
+// .get_session(&io.host)
+// .await
+// .context("creating session")?;
+
+// import_remote(io, db, &session, identifier.clone(), node_path)
+// .await
+// .context("federated import")?
+// } else {
+// debug!("adding {identifier}");
+// node.public.path = node_path;
+// node.public.id = Some(identifier.to_owned());
+// let did_insert = db.node.insert(&identifier, &node)?.is_none();
+// if did_insert {
+// vec![identifier]
+// } else {
+// vec![]
+// }
+// };
+// Ok((idents, 0))
+// } else {
+// bail!("did somebody really put a fifo or socket in the library?!")
+// }
+// }
+
+static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(16));
+
+#[async_recursion]
+async fn import_remote(
+ id: String,
+ host: &str,
+ db: &Database,
+ session: &Arc<Session>,
+ index_path: &[usize],
+) -> anyhow::Result<()> {
+ let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
+ db.node_import.fetch_and_update(id, |l| {
+ let mut l = l.unwrap_or_default();
+ l.push((index_path.to_vec(), n.clone()));
+ Some(l)
+ })?;
+ Ok(())
+ };
+ let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();
+ info!("loading federated node {id:?}");
+
+ let node = session.node(&id).await.context("fetching remote node")?;
+
+ if node.federated.as_ref() == Some(&CONF.hostname) {
+ return Ok(());
+ }
+
+ // TODO maybe use lazy download
+ let poster = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Poster).await?;
+ let backdrop =
+ cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Backdrop).await?;
+
+ drop(_permit);
+
+ let node = Node {
+ public: node.clone(),
+ private: NodePrivate {
+ backdrop: Some(backdrop),
+ poster: Some(poster),
+ id: None,
+ source: None, // TODO
+ },
+ };
+
+ debug!("adding {id}");
+ insert_node(&id, node.clone())?;
+
+ let mut children: FuturesUnordered<_> = node
+ .public
+ .children
+ .iter()
+ .map(|c| import_remote(c.to_owned(), host, db, session, index_path))
+ .collect();
+
+ while let Some(r) = children.next().await {
+ r?;
+ }
+
+ Ok(())
+}
+
+async fn cache_federation_asset(
+ session: Arc<Session>,
+ identifier: String,
+ role: AssetRole,
+) -> anyhow::Result<AssetLocation> {
+ async_cache_file(
+ &["fed-asset", role.as_str(), &identifier.clone()],
+ move |out| async move {
+ let session = session;
+ session
+ .node_asset(identifier.as_str(), role, 1024, out)
+ .await
+ },
+ )
+ .await
+}
+
+fn make_ident(s: &str) -> String {
+ let mut out = String::new();
+ for s in s.chars() {
+ match s {
+ 'a'..='z' | '0'..='9' => out.push(s),
+ 'A'..='Z' => out.push(s.to_ascii_lowercase()),
+ '-' | ' ' | '_' | ':' => out.push('-'),
+ _ => (),
+ }
+ }
+ out
+}