/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ #![feature(lazy_cell)] pub mod infojson; pub mod tmdb; use anyhow::{anyhow, Context, Ok}; use async_recursion::async_recursion; use futures::{stream::FuturesUnordered, StreamExt}; use jellybase::{ cache::async_cache_file, database::Database, federation::Federation, AssetLocationExt, CONF, }; use jellyclient::Session; use jellycommon::{ AssetLocation, AssetRole, ImportOptions, ImportSource, Node, NodePrivate, NodePublic, }; use jellymatroska::read::EbmlReader; use jellyremuxer::import::import_metadata; use log::{debug, info}; use std::{ cmp::Ordering, ffi::OsStr, fs::File, io::BufReader, os::unix::prelude::OsStrExt, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; use tokio::{sync::Semaphore, task::spawn_blocking}; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; info!("loading sources..."); import_path(CONF.library_path.clone(), vec![], db, fed) .await .context("indexing")?; info!("merging nodes..."); merge_nodes(db).context("merging nodes")?; info!("generating paths..."); generate_node_paths(db).context("generating paths")?; info!("clearing temporary node tree..."); db.node_import.clear()?; info!("import completed"); drop(permit); Ok(()) } pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { for r in db.node_import.iter() { let (id, mut nodes) = r?; nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); let mut node = nodes .into_iter() .map(|(_, x)| x) .reduce(|x, y| merge_node(x, y)) .unwrap(); node.public.id = Some(id.clone()); node.public.path = vec![]; // will be reconstructed in the next pass db.node.insert(&id, &node)?; } Ok(()) } pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> { fn traverse(db: &Database, c: String, mut path: Vec) -> anyhow::Result<()> { let node = db .node .update_and_fetch(&c, |mut nc| { if let Some(nc) = &mut nc { if nc.public.path.is_empty() { nc.public.path = path.clone(); } } nc })? .ok_or(anyhow!("node missing"))?; path.push(c); for c in node.public.children { traverse(db, c, path.clone())?; } Ok(()) } traverse(db, "library".to_string(), vec![])?; Ok(()) } fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { if x.is_empty() { Ordering::Greater } else if y.is_empty() { Ordering::Less } else { match x[0].cmp(&y[0]) { o @ (Ordering::Less | Ordering::Greater) => o, Ordering::Equal => compare_index_path(&x[1..], &y[1..]), } } } #[async_recursion] pub async fn import_path( path: PathBuf, index_path: Vec, db: &Database, fed: &Federation, ) -> anyhow::Result<()> { if path.is_dir() { let mut children_paths = path .read_dir()? .map(Result::unwrap) .filter_map(|e| { if e.path().extension() == Some(&OsStr::from_bytes(b"yaml")) || e.metadata().unwrap().is_dir() { Some(e.path()) } else { None } }) .collect::>(); children_paths.sort(); let mut children: FuturesUnordered<_> = children_paths .into_iter() .enumerate() .map(|(i, p)| { import_path( p.clone(), { let mut path = index_path.clone(); path.push(i); path }, db, fed, ) }) .collect(); while let Some(k) = children.next().await { k? } } else { let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?; for s in opts.sources { process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?; } } Ok(()) } async fn process_source( id: String, s: ImportSource, path: &Path, index_path: &[usize], db: &Database, fed: &Federation, ) -> anyhow::Result<()> { let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); l.push((index_path.to_vec(), n.clone())); Some(l) })?; Ok(()) }; match s { ImportSource::Override(n) => insert_node(&id, n)?, ImportSource::Tmdb(_) => todo!(), ImportSource::Media { location, .. } => { // TODO use ignore options let media_path = location.path(); let metadata = spawn_blocking(move || { let input = BufReader::new(File::open(&media_path).unwrap()); let mut input = EbmlReader::new(input); import_metadata(&mut input) }) .await??; // if let Some(cover) = metadata.cover { // let pu = path.join(format!( // "cover.{}", // match mime.as_str() { // "image/webp" => "webp", // "image/jpeg" => "jpeg", // "image/png" => "png", // _ => { // warn!("unknown mime, just using webp"); // "webp" // } // } // )); // if !pu.exists() { // let mut f = tokio::fs::File::create(&pu).await?; // f.write_all(&data).await?; // } // } let node = Node { public: NodePublic { title: metadata.title, description: metadata.description, tagline: metadata.tagline, ..Default::default() }, private: NodePrivate::default(), }; insert_node(&id, node)?; } ImportSource::Federated { host } => { let session = fed.get_session(&host).await.context("creating session")?; import_remote(id, &host, db, &session, index_path) .await .context("federated import")? } ImportSource::AutoChildren { path: cpath } => { let paths = cpath .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) .read_dir()? .map(Result::unwrap) .map(|e| e.path()) .filter(|e| e.extension() == Some(&OsStr::from_bytes(b"yaml"))); let mut children = Vec::new(); for p in paths { let opts: ImportOptions = serde_yaml::from_reader(File::open(&p)?)?; if opts.id != id { children.push(opts.id); } } insert_node( &id, Node { private: NodePrivate::default(), public: NodePublic { children, ..Default::default() }, }, )?; } } Ok(()) } fn merge_node(x: Node, y: Node) -> Node { Node { public: NodePublic { kind: x.public.kind.or(y.public.kind), title: x.public.title.or(y.public.title), id: x.public.id.or(y.public.id), path: vec![], children: x .public .children .into_iter() .chain(y.public.children) .collect(), tagline: x.public.tagline.or(y.public.tagline), description: x.public.description.or(y.public.description), release_date: x.public.release_date.or(y.public.release_date), index: x.public.index.or(y.public.index), media: x.public.media.or(y.public.media), // TODO proper media merging ratings: x .public .ratings .into_iter() .chain(y.public.ratings) .collect(), federated: x.public.federated.or(y.public.federated), }, private: NodePrivate { id: x.private.id.or(y.private.id), poster: x.private.poster.or(y.private.poster), backdrop: x.private.backdrop.or(y.private.backdrop), source: x.private.source.or(y.private.source), // TODO here too }, } } // #[async_recursion] // pub async fn import_path( // path: PathBuf, // db: &Database, // fed: &Federation, // mut node_path: Vec, // ) -> anyhow::Result<(Vec, usize)> { // if path.is_dir() { // let mpath = path.join("directory.json"); // let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| { // if e.path().extension() == Some(&OsStr::from_bytes(b"jelly")) // || e.metadata().unwrap().is_dir() // { // Some(e.path()) // } else { // None // } // }); // let identifier = if mpath.exists() { // path.file_name().unwrap().to_str().unwrap().to_string() // } else { // node_path // .last() // .cloned() // .ok_or(anyhow!("non-root node requires parent"))? // }; // node_path.push(identifier.clone()); // let mut all: FuturesUnordered<_> = children_paths // .into_iter() // .map(|p| import_path(p.clone(), db, fed, node_path.clone()).map_err(|e| (p, e))) // .collect(); // node_path.pop(); // we will set the dirs path later and need it to not be included // let mut children_ids = Vec::new(); // let mut errs = 0; // while let Some(k) = all.next().await { // match k { // core::result::Result::Ok((els, errs2)) => { // errs += errs2; // children_ids.extend(els) // } // Err((p, e)) => { // errs += 1; // error!("import of {p:?} failed: {e:?}") // } // } // } // if mpath.exists() { // let mut node: Node = // serde_json::from_reader(File::open(mpath).context("metadata missing")?)?; // node.public.children = children_ids; // node.public.path = node_path; // node.public.id = Some(identifier.to_owned()); // info!("adding {identifier}"); // db.node.insert(&identifier, &node)?; // Ok((vec![identifier], errs)) // } else { // Ok((children_ids, errs)) // } // } else if path.is_file() { // info!("loading {path:?}"); // let datafile = File::open(path.clone()).context("cant load metadata")?; // let mut node: Node = serde_json::from_reader(datafile).context("invalid metadata")?; // let identifier = node.private.id.clone().unwrap_or_else(|| { // path.file_name() // .unwrap() // .to_str() // .unwrap() // .strip_suffix(".json") // .unwrap() // .to_string() // }); // let idents = if let Some(io) = node.private.import.take() { // let session = fed // .get_session(&io.host) // .await // .context("creating session")?; // import_remote(io, db, &session, identifier.clone(), node_path) // .await // .context("federated import")? // } else { // debug!("adding {identifier}"); // node.public.path = node_path; // node.public.id = Some(identifier.to_owned()); // let did_insert = db.node.insert(&identifier, &node)?.is_none(); // if did_insert { // vec![identifier] // } else { // vec![] // } // }; // Ok((idents, 0)) // } else { // bail!("did somebody really put a fifo or socket in the library?!") // } // } static SEM_REMOTE_IMPORT: LazyLock = LazyLock::new(|| Semaphore::new(16)); #[async_recursion] async fn import_remote( id: String, host: &str, db: &Database, session: &Arc, index_path: &[usize], ) -> anyhow::Result<()> { let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); l.push((index_path.to_vec(), n.clone())); Some(l) })?; Ok(()) }; let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); info!("loading federated node {id:?}"); let node = session.node(&id).await.context("fetching remote node")?; if node.federated.as_ref() == Some(&CONF.hostname) { return Ok(()); } // TODO maybe use lazy download let poster = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Poster).await?; let backdrop = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Backdrop).await?; drop(_permit); let node = Node { public: node.clone(), private: NodePrivate { backdrop: Some(backdrop), poster: Some(poster), id: None, source: None, // TODO }, }; debug!("adding {id}"); insert_node(&id, node.clone())?; let mut children: FuturesUnordered<_> = node .public .children .iter() .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) .collect(); while let Some(r) = children.next().await { r?; } Ok(()) } async fn cache_federation_asset( session: Arc, identifier: String, role: AssetRole, ) -> anyhow::Result { async_cache_file( &["fed-asset", role.as_str(), &identifier.clone()], move |out| async move { let session = session; session .node_asset(identifier.as_str(), role, 1024, out) .await }, ) .await } fn make_ident(s: &str) -> String { let mut out = String::new(); for s in s.chars() { match s { 'a'..='z' | '0'..='9' => out.push(s), 'A'..='Z' => out.push(s.to_ascii_lowercase()), '-' | ' ' | '_' | ':' => out.push('-'), _ => (), } } out }