/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ #![feature(lazy_cell)] pub mod infojson; pub mod tmdb; use crate::tmdb::TmdbKind; use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; use futures::{executor::block_on, stream::FuturesUnordered, StreamExt}; use jellybase::{ cache::{async_cache_file, cache_memory}, database::Database, federation::Federation, AssetLocationExt, CONF, }; use jellyclient::Session; use jellycommon::{ AssetLocation, AssetRole, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, NodePublic, Rating, TrackSource, }; use jellymatroska::read::EbmlReader; use jellyremuxer::import::import_metadata; use log::{debug, info, warn}; use regex::Regex; use std::{ cmp::Ordering, collections::HashSet, ffi::OsStr, fs::File, hash::RandomState, io::BufReader, ops::Deref, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; use tmdb::{parse_release_date, tmdb_image}; use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; if !db.node_import.is_empty() { info!("clearing temporary node tree from an aborted last import..."); db.node_import.clear()?; } info!("loading sources..."); import_path(CONF.library_path.clone(), vec![], db, fed) .await .context("indexing")?; info!("removing old nodes..."); db.node.clear()?; info!("merging nodes..."); merge_nodes(db).context("merging nodes")?; info!("generating paths..."); generate_node_paths(db).context("generating paths")?; info!("clearing temporary node tree..."); db.node_import.clear()?; info!("import completed"); drop(permit); Ok(()) } pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { for r in db.node_import.iter() { let (id, mut nodes) = r?; nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); let mut node = nodes .into_iter() .map(|(_, x)| x) .reduce(|x, y| merge_node(x, y)) .unwrap(); node.public.id = Some(id.clone()); node.public.path = vec![]; // will be reconstructed in the next pass db.node.insert(&id, &node)?; } Ok(()) } pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> { fn traverse(db: &Database, c: String, mut path: Vec) -> anyhow::Result<()> { let node = db .node .update_and_fetch(&c, |mut nc| { if let Some(nc) = &mut nc { if nc.public.path.is_empty() { nc.public.path = path.clone(); } } nc })? .ok_or(anyhow!("node {c:?} missing"))?; path.push(c); for c in node.public.children { traverse(db, c, path.clone())?; } Ok(()) } traverse(db, "library".to_string(), vec![])?; Ok(()) } fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { if x.is_empty() { Ordering::Greater } else if y.is_empty() { Ordering::Less } else { match x[0].cmp(&y[0]) { o @ (Ordering::Less | Ordering::Greater) => o, Ordering::Equal => compare_index_path(&x[1..], &y[1..]), } } } #[async_recursion] pub async fn import_path( path: PathBuf, index_path: Vec, db: &Database, fed: &Federation, ) -> anyhow::Result<()> { if path.is_dir() { let mut children_paths = path .read_dir()? .map(Result::unwrap) .filter_map(|e| { if e.path().extension() == Some(&OsStr::new("yaml")) || e.path().extension() == Some(&OsStr::new("jelly")) || e.metadata().unwrap().is_dir() { Some(e.path()) } else { None } }) .collect::>(); children_paths.sort(); let mut children: FuturesUnordered<_> = children_paths .into_iter() .enumerate() .map(|(i, p)| { import_path( p.clone(), { let mut path = index_path.clone(); path.push(i); path }, db, fed, ) }) .collect(); while let Some(k) = children.next().await { k? } } else { let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? } else { serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? }; for s in opts.sources { process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?; } } Ok(()) } static SEM_IMPORT: Semaphore = Semaphore::const_new(2); #[async_recursion] async fn process_source( id: String, s: ImportSource, path: &Path, index_path: &[usize], db: &Database, fed: &Federation, ) -> anyhow::Result<()> { let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); l.push((index_path.to_vec(), n.clone())); Some(l) })?; Ok(()) }; match s { ImportSource::Override(n) => insert_node(&id, n)?, ImportSource::Tmdb { id: tid } => { let key = CONF .tmdb_api_key .as_ref() .ok_or(anyhow!("no tmdb api key"))?; let details = tokio::task::spawn_blocking(move || { cache_memory(&["tmdb-details", &format!("{tid}")], || { block_on(tmdb::tmdb_details(TmdbKind::Movie, tid, key)) }) }) .await??; let mut node = Node::default(); if let Some(poster) = &details.poster_path { node.private.poster = Some( async_cache_file( &["tmdb-asset", "poster", &format!("{tid}")], |mut f| async move { Ok(f.write_all(&tmdb_image(&poster).await?).await?) }, ) .await?, ); } if let Some(backdrop) = &details.backdrop_path { node.private.backdrop = Some( async_cache_file( &["tmdb-asset", "backdrop", &format!("{tid}")], |mut f| async move { Ok(f.write_all(&tmdb_image(&backdrop).await?).await?) }, ) .await?, ); } node.public.tagline = details.tagline.clone(); node.public.title = details.title.clone(); node.public.description = Some(details.overview.clone()); node.public .ratings .insert(Rating::Tmdb, details.vote_average); if let Some(date) = details.release_date.clone() { node.public.release_date = Some(parse_release_date(&date).context("parsing release date")?); } insert_node(&id, node)?; } ImportSource::Media { location, ignore_attachments, ignore_chapters, ignore_metadata, } => { let media_path = location.path(); if media_path.is_dir() { let mut node = Node::default(); for f in media_path.read_dir()? { let f = f?; let child_path = f.path(); let inf_id = infer_id_from_path(&child_path).context("inferring child id")?; if &inf_id == "archive" { continue; } process_source( inf_id.clone(), ImportSource::Media { location: match &location { AssetLocation::Media(p) => { AssetLocation::Media(p.join(f.file_name())) } _ => bail!("non media path media"), }, ignore_attachments, ignore_chapters, ignore_metadata, }, path, index_path, db, fed, ) .await .context(anyhow!("recursive media import: {:?}", f.path()))?; node.public.children.push(inf_id); } insert_node(&id, node)?; } else if media_path.is_file() { let _permit = SEM_IMPORT.acquire().await.unwrap(); let location_path = location.path(); let metadata = { spawn_blocking(move || { cache_memory( &["mkv-probe", location.path().to_str().unwrap()], move || { let input = BufReader::new( File::open(&location.path()).context("opening media file")?, ); let mut input = EbmlReader::new(input); import_metadata(&mut input) }, ) }) } .await? .context(anyhow!("probing {location_path:?}"))? .deref() .to_owned(); let mut node = Node::default(); if !ignore_metadata { node.public.title = metadata.title; node.public.description = metadata.description; node.public.tagline = metadata.tagline; } node.public.media = Some(MediaInfo { duration: metadata.duration, tracks: metadata.tracks, chapters: if ignore_chapters { vec![] } else { metadata.chapters }, }); node.private.source = Some( metadata .track_sources .into_iter() .map(|mut ts| { ts.path = media_path.to_owned(); TrackSource::Local(ts) }) .collect(), ); if !ignore_attachments { if let Some((filename, data)) = metadata.cover { node.private.poster = Some( async_cache_file( &["att-cover", media_path.to_str().unwrap(), &filename], |mut f| async move { f.write_all(&data).await?; Ok(()) }, ) .await?, ); }; if let Some(infojson) = metadata.infojson { let infojson: infojson::YVideo = serde_json::from_str(&infojson).context("parsing infojson")?; node.public.kind = Some(NodeKind::Video); node.public.title = Some(infojson.title); node.public.description = Some(infojson.description); node.public.tagline = Some(infojson.webpage_url); node.public .ratings .insert(Rating::YoutubeViews, infojson.view_count as f64); node.public.release_date = Some( infojson::parse_upload_date(&infojson.upload_date) .context("parsing upload date")?, ); node.public.ratings.extend( infojson .like_count .map(|l| (Rating::YoutubeLikes, l as f64)), ); } } drop(_permit); insert_node(&id, node)?; } else { warn!("non file/dir import ignored: {media_path:?}") } } ImportSource::Federated { host } => { let session = fed.get_session(&host).await.context("creating session")?; import_remote(id.clone(), &host, db, &session, index_path) .await .context("federated import")? } ImportSource::AutoChildren { path: cpath } => { let paths = cpath .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) .read_dir()? .map(Result::unwrap) .map(|e| e.path()) .filter(|e| { e.extension() == Some(OsStr::new("yaml")) || e.extension() == Some(&OsStr::new("jelly")) }); let mut children = Vec::new(); for p in paths { let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { serde_json::from_reader(File::open(&p)?)? } else { serde_yaml::from_reader(File::open(&p)?)? }; if opts.id != id { children.push(opts.id); } } insert_node( &id, Node { private: NodePrivate::default(), public: NodePublic { children, ..Default::default() }, }, )?; } } Ok(()) } const RE_YOUTUBE_ID: LazyLock = LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); pub fn infer_id_from_path(path: &Path) -> anyhow::Result { let f = path .file_stem() .ok_or(anyhow!("no filename"))? .to_str() .ok_or(anyhow!("non utf8 filename"))?; if let Some(mat) = RE_YOUTUBE_ID.captures(f) { let id = mat.get(1).unwrap().as_str(); return Ok(format!("youtube-{id}")); } let mut fsan = String::with_capacity(f.len()); for c in f.chars() { fsan.extend(match c { 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), ' ' => Some('-'), _ => None, }); } Ok(fsan) } fn merge_node(x: Node, y: Node) -> Node { Node { public: NodePublic { kind: x.public.kind.or(y.public.kind), title: x.public.title.or(y.public.title), id: x.public.id.or(y.public.id), path: vec![], children: merge_children(x.public.children, y.public.children), tagline: x.public.tagline.or(y.public.tagline), description: x.public.description.or(y.public.description), release_date: x.public.release_date.or(y.public.release_date), index: x.public.index.or(y.public.index), media: x.public.media.or(y.public.media), // TODO proper media merging ratings: x .public .ratings .into_iter() .chain(y.public.ratings) .collect(), federated: x.public.federated.or(y.public.federated), }, private: NodePrivate { id: x.private.id.or(y.private.id), poster: x.private.poster.or(y.private.poster), backdrop: x.private.backdrop.or(y.private.backdrop), source: x.private.source.or(y.private.source), // TODO here too }, } } fn merge_children(mut a: Vec, b: Vec) -> Vec { let acont = HashSet::<_, RandomState>::from_iter(a.clone()); for el in b { if !acont.contains(&el) { a.push(el) } } a } static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); #[async_recursion] async fn import_remote( id: String, host: &str, db: &Database, session: &Arc, index_path: &[usize], ) -> anyhow::Result<()> { let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); l.push((index_path.to_vec(), n.clone())); Some(l) })?; Ok(()) }; let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); info!("loading federated node {id:?}"); let mut node = session.node(&id).await.context("fetching remote node")?; // if node.federated.as_ref() == Some(&CONF.hostname) { // return Ok(()); // } let track_sources = if let Some(media) = &mut node.media { let mut track_sources = Vec::new(); for (i, t) in media.tracks.iter_mut().enumerate() { t.federated.push(host.to_owned()); track_sources.push(TrackSource::Remote(i)) } Some(track_sources) } else { None }; // TODO maybe use lazy download let poster = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Poster).await?; let backdrop = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Backdrop).await?; drop(_permit); let node = Node { public: node.clone(), private: NodePrivate { backdrop: Some(backdrop), poster: Some(poster), id: None, source: track_sources, }, }; debug!("adding {id}"); insert_node(&id, node.clone())?; let mut children: FuturesUnordered<_> = node .public .children .iter() .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) .collect(); while let Some(r) = children.next().await { r?; } Ok(()) } async fn cache_federation_asset( session: Arc, identifier: String, role: AssetRole, ) -> anyhow::Result { async_cache_file( &["fed-asset", role.as_str(), &identifier.clone()], move |out| async move { let session = session; session .node_asset(identifier.as_str(), role, 1024, out) .await }, ) .await } // fn make_ident(s: &str) -> String { // let mut out = String::new(); // for s in s.chars() { // match s { // 'a'..='z' | '0'..='9' => out.push(s), // 'A'..='Z' => out.push(s.to_ascii_lowercase()), // '-' | ' ' | '_' | ':' => out.push('-'), // _ => (), // } // } // out // }