/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ #![feature(lazy_cell)] pub mod infojson; pub mod tmdb; use anyhow::{anyhow, Context, Ok}; use async_recursion::async_recursion; use futures::{executor::block_on, stream::FuturesUnordered, StreamExt}; use jellybase::{ cache::{async_cache_file, cache_memory}, database::Database, federation::Federation, AssetLocationExt, CONF, }; use jellyclient::Session; use jellycommon::{ AssetLocation, AssetRole, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, NodePublic, Rating, TrackSource, }; use jellymatroska::read::EbmlReader; use jellyremuxer::import::import_metadata; use log::{debug, info}; use std::{ cmp::Ordering, ffi::OsStr, fs::File, io::BufReader, os::unix::prelude::OsStrExt, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; use tmdb::tmdb_image; use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; use crate::tmdb::TmdbKind; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; if !db.node_import.is_empty() { info!("clearing temporary node tree from an aborted last import..."); db.node_import.clear()?; } info!("loading sources..."); import_path(CONF.library_path.clone(), vec![], db, fed) .await .context("indexing")?; info!("merging nodes..."); merge_nodes(db).context("merging nodes")?; info!("generating paths..."); generate_node_paths(db).context("generating paths")?; info!("clearing temporary node tree..."); db.node_import.clear()?; info!("import completed"); drop(permit); Ok(()) } pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { for r in db.node_import.iter() { let (id, mut nodes) = r?; nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); let mut node = nodes .into_iter() .map(|(_, x)| x) .reduce(|x, y| merge_node(x, y)) .unwrap(); node.public.id = Some(id.clone()); node.public.path = vec![]; // will be reconstructed in the next pass db.node.insert(&id, &node)?; } Ok(()) } pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> { fn traverse(db: &Database, c: String, mut path: Vec) -> anyhow::Result<()> { let node = db .node .update_and_fetch(&c, |mut nc| { if let Some(nc) = &mut nc { if nc.public.path.is_empty() { nc.public.path = path.clone(); } } nc })? .ok_or(anyhow!("node missing"))?; path.push(c); for c in node.public.children { traverse(db, c, path.clone())?; } Ok(()) } traverse(db, "library".to_string(), vec![])?; Ok(()) } fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { if x.is_empty() { Ordering::Greater } else if y.is_empty() { Ordering::Less } else { match x[0].cmp(&y[0]) { o @ (Ordering::Less | Ordering::Greater) => o, Ordering::Equal => compare_index_path(&x[1..], &y[1..]), } } } #[async_recursion] pub async fn import_path( path: PathBuf, index_path: Vec, db: &Database, fed: &Federation, ) -> anyhow::Result<()> { if path.is_dir() { let mut children_paths = path .read_dir()? .map(Result::unwrap) .filter_map(|e| { if e.path().extension() == Some(&OsStr::from_bytes(b"yaml")) || e.metadata().unwrap().is_dir() { Some(e.path()) } else { None } }) .collect::>(); children_paths.sort(); let mut children: FuturesUnordered<_> = children_paths .into_iter() .enumerate() .map(|(i, p)| { import_path( p.clone(), { let mut path = index_path.clone(); path.push(i); path }, db, fed, ) }) .collect(); while let Some(k) = children.next().await { k? } } else { let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?; for s in opts.sources { process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?; } } Ok(()) } async fn process_source( id: String, s: ImportSource, path: &Path, index_path: &[usize], db: &Database, fed: &Federation, ) -> anyhow::Result<()> { let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); l.push((index_path.to_vec(), n.clone())); Some(l) })?; Ok(()) }; match s { ImportSource::Override(n) => insert_node(&id, n)?, ImportSource::Tmdb { id: tid } => { let key = CONF .tmdb_api_key .as_ref() .ok_or(anyhow!("no tmdb api key"))?; let details = tokio::task::spawn_blocking(move || { cache_memory(&["tmdb-details", &format!("{tid}")], || { block_on(tmdb::tmdb_details(TmdbKind::Movie, tid, key)) }) }) .await??; let mut node = Node::default(); if let Some(poster) = &details.poster_path { node.private.poster = Some( async_cache_file( &["tmdb-asset", "poster", &format!("{tid}")], |mut f| async move { Ok(f.write_all(&tmdb_image(&poster).await?).await?) }, ) .await?, ); } if let Some(backdrop) = &details.backdrop_path { node.private.backdrop = Some( async_cache_file( &["tmdb-asset", "backdrop", &format!("{tid}")], |mut f| async move { Ok(f.write_all(&tmdb_image(&backdrop).await?).await?) }, ) .await?, ); } node.public.tagline = details.tagline.clone(); node.public.title = details.title.clone(); node.public.description = Some(details.overview.clone()); node.public .ratings .insert(Rating::Tmdb, details.vote_average); insert_node(&id, node)?; } ImportSource::Media { location, ignore_attachments, ignore_chapters, ignore_metadata, } => { // TODO use ignore options let media_path = location.path(); let metadata = spawn_blocking(move || { let input = BufReader::new(File::open(&location.path()).context("opening media file")?); let mut input = EbmlReader::new(input); import_metadata(&mut input) }) .await??; let mut node = Node::default(); if !ignore_metadata { node.public.title = metadata.title; node.public.description = metadata.description; node.public.tagline = metadata.tagline; } node.public.media = Some(MediaInfo { duration: metadata.duration, tracks: metadata.tracks, chapters: if ignore_chapters { vec![] } else { metadata.chapters }, }); node.private.source = Some( metadata .track_sources .into_iter() .map(|mut ts| { ts.path = media_path.to_owned(); TrackSource::Local(ts) }) .collect(), ); if !ignore_attachments { if let Some((filename, data)) = metadata.cover { node.private.poster = Some( async_cache_file( &["att-cover", media_path.to_str().unwrap(), &filename], |mut f| async move { f.write_all(&data).await?; Ok(()) }, ) .await?, ); }; if let Some(infojson) = metadata.infojson { let infojson: infojson::YVideo = serde_json::from_str(&infojson).context("parsing infojson")?; node.public.kind = Some(NodeKind::Video); node.public.title = Some(infojson.title); node.public.description = Some(infojson.description); node.public.tagline = Some(infojson.webpage_url); node.public .ratings .insert(Rating::YoutubeViews, infojson.view_count as f64); node.public.ratings.extend( infojson .like_count .map(|l| (Rating::YoutubeLikes, l as f64)), ); } } insert_node(&id, node)?; } ImportSource::Federated { host } => { let session = fed.get_session(&host).await.context("creating session")?; import_remote(id, &host, db, &session, index_path) .await .context("federated import")? } ImportSource::AutoChildren { path: cpath } => { let paths = cpath .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) .read_dir()? .map(Result::unwrap) .map(|e| e.path()) .filter(|e| e.extension() == Some(&OsStr::from_bytes(b"yaml"))); let mut children = Vec::new(); for p in paths { let opts: ImportOptions = serde_yaml::from_reader(File::open(&p)?)?; if opts.id != id { children.push(opts.id); } } insert_node( &id, Node { private: NodePrivate::default(), public: NodePublic { children, ..Default::default() }, }, )?; } } Ok(()) } fn merge_node(x: Node, y: Node) -> Node { Node { public: NodePublic { kind: x.public.kind.or(y.public.kind), title: x.public.title.or(y.public.title), id: x.public.id.or(y.public.id), path: vec![], children: x .public .children .into_iter() .chain(y.public.children) .collect(), tagline: x.public.tagline.or(y.public.tagline), description: x.public.description.or(y.public.description), release_date: x.public.release_date.or(y.public.release_date), index: x.public.index.or(y.public.index), media: x.public.media.or(y.public.media), // TODO proper media merging ratings: x .public .ratings .into_iter() .chain(y.public.ratings) .collect(), federated: x.public.federated.or(y.public.federated), }, private: NodePrivate { id: x.private.id.or(y.private.id), poster: x.private.poster.or(y.private.poster), backdrop: x.private.backdrop.or(y.private.backdrop), source: x.private.source.or(y.private.source), // TODO here too }, } } static SEM_REMOTE_IMPORT: LazyLock = LazyLock::new(|| Semaphore::new(16)); #[async_recursion] async fn import_remote( id: String, host: &str, db: &Database, session: &Arc, index_path: &[usize], ) -> anyhow::Result<()> { let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); l.push((index_path.to_vec(), n.clone())); Some(l) })?; Ok(()) }; let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); info!("loading federated node {id:?}"); let mut node = session.node(&id).await.context("fetching remote node")?; // if node.federated.as_ref() == Some(&CONF.hostname) { // return Ok(()); // } let track_sources = if let Some(media) = &mut node.media { let mut track_sources = Vec::new(); for (i, t) in media.tracks.iter_mut().enumerate() { t.federated.push(host.to_owned()); track_sources.push(TrackSource::Remote(i)) } Some(track_sources) } else { None }; // TODO maybe use lazy download let poster = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Poster).await?; let backdrop = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Backdrop).await?; drop(_permit); let node = Node { public: node.clone(), private: NodePrivate { backdrop: Some(backdrop), poster: Some(poster), id: None, source: track_sources, }, }; debug!("adding {id}"); insert_node(&id, node.clone())?; let mut children: FuturesUnordered<_> = node .public .children .iter() .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) .collect(); while let Some(r) = children.next().await { r?; } Ok(()) } async fn cache_federation_asset( session: Arc, identifier: String, role: AssetRole, ) -> anyhow::Result { async_cache_file( &["fed-asset", role.as_str(), &identifier.clone()], move |out| async move { let session = session; session .node_asset(identifier.as_str(), role, 1024, out) .await }, ) .await } // fn make_ident(s: &str) -> String { // let mut out = String::new(); // for s in s.chars() { // match s { // 'a'..='z' | '0'..='9' => out.push(s), // 'A'..='Z' => out.push(s.to_ascii_lowercase()), // '-' | ' ' | '_' | ':' => out.push('-'), // _ => (), // } // } // out // }