/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2024 metamuffin */ #![feature(lazy_cell)] pub mod infojson; pub mod tmdb; pub mod trakt; use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; use base64::Engine; use futures::{stream::FuturesUnordered, StreamExt}; use jellybase::{ assetfed::AssetInner, cache::{async_cache_file, cache_memory}, database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT}, federation::Federation, CONF, SECRETS, }; use jellyclient::Session; use jellycommon::{ Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, }; use jellymatroska::read::EbmlReader; use jellyremuxer::metadata::import_metadata; use log::{debug, info, warn}; use regex::Regex; use std::{ cmp::Ordering, collections::HashSet, ffi::OsStr, fs::File, hash::RandomState, io::BufReader, ops::Deref, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; use tmdb::{parse_release_date, Tmdb}; use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; use trakt::Trakt; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); struct Apis { trakt: Option, tmdb: Option, } pub fn is_importing() -> bool { IMPORT_SEM.available_permits() <= 0 } pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; { let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; if !table.is_empty()? { info!("clearing temporary node tree from an aborted last import..."); table.drain::<&str>(..)?; } drop(table); txn.commit()?; } let ap = Apis { trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), }; info!("loading sources..."); import_path(CONF.library_path.clone(), vec![], db, fed, &ap) .await .context("indexing")?; info!("removing old nodes..."); { let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE)?; table.drain::<&str>(..)?; drop(table); txn.commit()?; } info!("merging nodes..."); merge_nodes(db).context("merging nodes")?; info!("generating paths..."); generate_node_paths(db).context("generating paths")?; info!("clearing temporary node tree..."); { let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; table.drain::<&str>(..)?; drop(table); txn.commit()?; } info!("import completed"); drop(permit); Ok(()) } pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> { let txn_read = db.inner.begin_read()?; let t_node_import = txn_read.open_table(T_NODE_IMPORT)?; for r in t_node_import.iter()? { let (id, nodes) = r?; let mut nodes = nodes.value().0; nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); let mut node = nodes .into_iter() .map(|(_, x)| x) .reduce(|x, y| merge_node(x, y).unwrap()) .unwrap(); node.public.id = Some(id.value().to_owned()); node.public.path = vec![]; // will be reconstructed in the next pass node.public.federated = None; // TODO this discardes a lot of information. maybe change this. if let Some(media) = &node.public.media { for t in &media.tracks { if let Some(host) = t.federated.first() { if host != &CONF.hostname { node.public.federated = Some(host.to_string()) } } } } { let txn_write = db.inner.begin_write()?; let mut t_node = txn_write.open_table(T_NODE)?; t_node.insert(id.value(), Ser(node))?; drop(t_node); txn_write.commit()?; } } Ok(()) } pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> { fn traverse(db: &DataAcid, c: String, mut path: Vec) -> anyhow::Result<()> { let node = { let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE)?; let mut node = table .get(&*c)? .ok_or(anyhow!( "missing child when generating paths: {c:?} at {path:?}" ))? .value() .0; if node.public.path.is_empty() { node.public.path = path.clone(); } table.insert(c.as_str(), Ser(node.clone()))?; drop(table); txn.commit()?; node }; path.push(c); for c in node.public.children { traverse(db, c, path.clone())?; } Ok(()) } traverse(db, "library".to_string(), vec![])?; Ok(()) } fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { if x.is_empty() { Ordering::Greater } else if y.is_empty() { Ordering::Less } else { match x[0].cmp(&y[0]) { o @ (Ordering::Less | Ordering::Greater) => o, Ordering::Equal => compare_index_path(&x[1..], &y[1..]), } } } #[async_recursion] async fn import_path( path: PathBuf, mut index_path: Vec, db: &DataAcid, fed: &Federation, ap: &Apis, ) -> anyhow::Result<()> { if path.is_dir() { let mut children_paths = path .read_dir()? .map(Result::unwrap) .filter_map(|e| { if e.path().extension() == Some(&OsStr::new("yaml")) || e.path().extension() == Some(&OsStr::new("jelly")) || e.metadata().unwrap().is_dir() { Some(e.path()) } else { None } }) .collect::>(); children_paths.sort(); let mut children: FuturesUnordered<_> = children_paths .into_iter() .enumerate() .map(|(i, p)| { import_path( p.clone(), { let mut path = index_path.clone(); path.push(i); path }, db, fed, ap, ) }) .collect(); while let Some(k) = children.next().await { k? } } else { info!("reading {path:?}"); let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? } else { serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? }; for (i, s) in opts.sources.into_iter().enumerate() { index_path.push(i); process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) .await .context(anyhow!("processing source in {path:?}"))?; index_path.pop(); } } Ok(()) } static SEM_IMPORT: Semaphore = Semaphore::const_new(2); #[async_recursion] async fn process_source( id: String, s: ImportSource, path: &Path, index_path: &[usize], db: &DataAcid, fed: &Federation, ap: &Apis, ) -> anyhow::Result<()> { let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); node.push((index_path.to_vec(), n.clone())); table.insert(id, Ser(node))?; drop(table); txn.commit()?; Ok(()) }; let insert_node_ext = move |id: &str, n: ExtendedNode| -> anyhow::Result<()> { // TODO merge this let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_EXTENDED)?; table.insert(id, Ser(n))?; drop(table); txn.commit()?; Ok(()) }; match s { ImportSource::Override(mut n) => { if let Some(backdrop) = n.private.backdrop.clone() { n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); } if let Some(poster) = n.private.poster.clone() { n.public.poster = Some(AssetInner::Library(poster).ser()); } insert_node(&id, n)? } ImportSource::Trakt { id: tid, kind } => { info!("trakt {id}"); let trakt = ap .trakt .as_ref() .ok_or(anyhow!("trakt api key is required"))?; let trakt_object = trakt .lookup(kind, tid, true) .await .context("looking up metadata")?; let trakt_people = trakt .people(kind, tid, true) .await .context("looking up people")?; let mut node = Node::default(); let mut node_ext = ExtendedNode::default(); { node.public.title = Some(trakt_object.title.to_owned()); if let Some(overview) = &trakt_object.overview { node.public.description = Some(overview.to_owned()) } if let Some(tagline) = &trakt_object.tagline { node.public.tagline = Some(tagline.to_owned()) } if let Some(rating) = &trakt_object.rating { node.public.ratings.insert(Rating::Trakt, *rating); } for p in trakt_people.cast.iter() { node_ext .people .entry(PeopleGroup::Cast) .or_default() .push(p.a()) } for (group, people) in trakt_people.crew.iter() { for p in people { node_ext.people.entry(group.a()).or_default().push(p.a()) } } // TODO lazy assets for (_, ps) in &mut node_ext.people { for p in ps { if let Some(id) = p.person.ids.tmdb { if let Some(tmdb) = &ap.tmdb { let k = tmdb.person_image(id).await?; if let Some(prof) = k.profiles.get(0) { p.person.headshot = Some( AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), ); } } } } } } insert_node(&id, node)?; insert_node_ext(&id, node_ext)?; if let Some(tid) = trakt_object.ids.tmdb { if let Some(kind) = match kind { TraktKind::Movie => Some(TmdbKind::Movie), TraktKind::Show => Some(TmdbKind::Tv), TraktKind::Season => Some(TmdbKind::Tv), // TODO TraktKind::Episode | TraktKind::Person | TraktKind::User => None, } { let mut index_path = index_path.to_vec(); index_path.push(1); process_source( id, ImportSource::Tmdb { id: tid, kind }, path, &index_path, db, fed, ap, ) .await?; } } } ImportSource::Tmdb { id: tid, kind } => { info!("tmdb {id}"); let tmdb = ap .tmdb .as_ref() .ok_or(anyhow!("tmdb api key is required"))?; let details = tmdb.details(kind, tid).await?; let mut node = Node::default(); // TODO lazy assets if let Some(poster) = &details.poster_path { node.public.poster = Some(AssetInner::Cache(tmdb.image(&poster).await?).ser()); } if let Some(backdrop) = &details.backdrop_path { node.public.backdrop = Some(AssetInner::Cache(tmdb.image(&backdrop).await?).ser()); } node.public.tagline = details.tagline.clone(); node.public.title = details.title.clone(); node.public.description = Some(details.overview.clone()); node.public .ratings .insert(Rating::Tmdb, details.vote_average); if let Some(date) = details.release_date.clone() { node.public.release_date = parse_release_date(&date).context("parsing release date")?; } insert_node(&id, node)?; } ImportSource::Media { path: mpath, ignore_attachments, ignore_chapters, ignore_metadata, } => { info!("media import {mpath:?}"); let abspath = CONF.media_path.join(&mpath); if !abspath.exists() { bail!("media missing at {abspath:?}"); } if abspath.is_dir() { let mut node = Node::default(); for f in abspath.read_dir()? { let f = f?; let child_path = f.path(); if child_path.is_dir() || matches!( child_path.extension().map(|o| o.to_str().unwrap()), Some("mks" | "mka" | "mkv" | "webm") ) { let inf_id = infer_id_from_path(&child_path).context("inferring child id")?; process_source( inf_id.clone(), ImportSource::Media { path: mpath.join(f.file_name()), ignore_attachments, ignore_chapters, ignore_metadata, }, path, index_path, db, fed, ap, ) .await .context(anyhow!("recursive media import: {:?}", f.path()))?; node.public.children.push(inf_id); } } insert_node(&id, node)?; } else if abspath.is_file() { let _permit = SEM_IMPORT.acquire().await.unwrap(); let metadata = { let abspath = abspath.clone(); spawn_blocking(move || { cache_memory(&["probe", abspath.to_str().unwrap()], || { let input = File::open(&abspath).context("opening media file")?; let mut input = EbmlReader::new(BufReader::new(input)); import_metadata(&mut input) }) }) } .await? .context(anyhow!("probing {abspath:?}"))? .deref() .to_owned(); let mut node = Node::default(); if !ignore_metadata { node.public.title = metadata.title; node.public.description = metadata.description; node.public.tagline = metadata.tagline; } node.public.media = Some(MediaInfo { duration: metadata.duration, tracks: metadata.tracks, chapters: if ignore_chapters { vec![] } else { metadata.chapters }, }); node.private.source = Some( metadata .track_sources .into_iter() .map(|mut ts| { ts.path = mpath.to_owned(); TrackSource::Local(ts) }) .collect(), ); if !ignore_attachments { if let Some((filename, data)) = metadata.cover { node.public.poster = Some( AssetInner::Cache( async_cache_file( &["att-cover", mpath.to_str().unwrap(), &filename], |mut f| async move { f.write_all(&data).await?; Ok(()) }, ) .await?, ) .ser(), ) }; if let Some(infojson) = metadata.infojson { let infojson: infojson::YVideo = serde_json::from_str(&infojson).context("parsing infojson")?; node.public.kind = Some( if infojson.duration.unwrap_or(0.) < 120. && infojson.aspect_ratio.unwrap_or(2.) < 1. { NodeKind::ShortFormVideo } else { NodeKind::Video }, ); node.public.title = Some(infojson.title); node.public.description = Some(infojson.description); node.public.tagline = Some(infojson.webpage_url); node.public .ratings .insert(Rating::YoutubeViews, infojson.view_count as f64); node.public.release_date = Some( infojson::parse_upload_date(&infojson.upload_date) .context("parsing upload date")?, ); node.public.ratings.extend( infojson .like_count .map(|l| (Rating::YoutubeLikes, l as f64)), ); } } drop(_permit); insert_node(&id, node)?; } else { warn!("non file/dir import ignored: {abspath:?}") } } ImportSource::Federated { host } => { info!("federated import of {id:?} from {host:?}"); let session = fed.get_session(&host).await.context("creating session")?; import_remote(id.clone(), &host, db, &session, index_path) .await .context("federated import")? } ImportSource::AutoChildren { path: cpath } => { info!("auto children at {path:?}"); let paths = cpath .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) .read_dir()? .map(Result::unwrap) .map(|e| e.path()) .filter(|e| { e.extension() == Some(OsStr::new("yaml")) || e.extension() == Some(&OsStr::new("jelly")) }); let mut children = Vec::new(); for p in paths { let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { serde_json::from_reader(File::open(&p)?)? } else { serde_yaml::from_reader(File::open(&p)?)? }; if opts.id != id { children.push(opts.id); } } insert_node( &id, Node { private: NodePrivate::default(), public: NodePublic { children, ..Default::default() }, }, )?; } } Ok(()) } const RE_YOUTUBE_ID: LazyLock = LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); pub fn infer_id_from_path(path: &Path) -> anyhow::Result { let f = path .file_stem() .ok_or(anyhow!("no filename"))? .to_str() .ok_or(anyhow!("non utf8 filename"))?; if let Some(mat) = RE_YOUTUBE_ID.captures(f) { let id = mat.get(1).unwrap().as_str(); return Ok(format!("youtube-{id}")); } let mut fsan = String::with_capacity(f.len()); for c in f.chars() { fsan.extend(match c { 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), ' ' => Some('-'), _ => None, }); } Ok(fsan) } fn merge_node(x: Node, y: Node) -> anyhow::Result { let (media, source) = match ( x.public.media, y.public.media, x.private.source, y.private.source, ) { (Some(x), Some(y), Some(sx), Some(sy)) => { let k = merge_media(x, y, sx, sy); (Some(k.0), Some(k.1)) } (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), (None, None, None, None) => (None, None), _ => bail!("invalid node. source and media dont agree."), }; Ok(Node { public: NodePublic { kind: x.public.kind.or(y.public.kind), title: x.public.title.or(y.public.title), id: x.public.id.or(y.public.id), path: vec![], children: merge_children(x.public.children, y.public.children), tagline: x.public.tagline.or(y.public.tagline), description: x.public.description.or(y.public.description), release_date: x.public.release_date.or(y.public.release_date), index: x.public.index.or(y.public.index), media, ratings: x .public .ratings .into_iter() .chain(y.public.ratings) .collect(), federated: None, poster: x.public.poster.or(y.public.poster), backdrop: x.public.backdrop.or(y.public.backdrop), }, private: NodePrivate { id: x.private.id.or(y.private.id), source, backdrop: None, poster: None, }, }) } fn merge_children(mut a: Vec, b: Vec) -> Vec { let acont = HashSet::<_, RandomState>::from_iter(a.clone()); for el in b { if !acont.contains(&el) { a.push(el) } } a } fn merge_media( x: MediaInfo, y: MediaInfo, sx: Vec, sy: Vec, ) -> (MediaInfo, Vec) { let mut tracks: Vec = Vec::new(); let mut source: Vec = Vec::new(); for (t, s) in x .tracks .into_iter() .zip(sx.into_iter()) .chain(y.tracks.into_iter().zip(sy.into_iter())) { let mut remove = None; let mut skip = false; for (i, ot) in tracks.iter().enumerate() { if t.name == ot.name && t.kind == ot.kind && t.language == ot.language && t.codec == ot.codec { if t.federated.len() < ot.federated.len() { remove = Some(i); } else { skip = true; } } } if let Some(r) = remove { tracks.swap_remove(r); source.swap_remove(r); } if !skip { tracks.push(t); source.push(s); } } ( MediaInfo { duration: x.duration * 0.5 + y.duration * 0.5, tracks, chapters: if x.chapters.len() > y.chapters.len() { x.chapters } else { y.chapters }, }, source, ) } static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); #[async_recursion] async fn import_remote( id: String, host: &str, db: &DataAcid, session: &Arc, index_path: &[usize], ) -> anyhow::Result<()> { let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_IMPORT)?; let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); node.push((index_path.to_vec(), n.clone())); table.insert(id, Ser(node))?; drop(table); txn.commit()?; Ok(()) }; let insert_node_ext = move |id: &str, n: ExtendedNode| -> anyhow::Result<()> { // TODO merge this let txn = db.inner.begin_write()?; let mut table = txn.open_table(T_NODE_EXTENDED)?; table.insert(id, Ser(n))?; drop(table); txn.commit()?; Ok(()) }; let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); info!("loading federated node {id:?}"); let mut node = session.node(&id).await.context("fetching remote node")?; let mut node_ext = session .node_extended(&id) .await .context("fetching extended remote node")?; let track_sources = if let Some(media) = &mut node.media { let mut track_sources = Vec::new(); for (i, t) in media.tracks.iter_mut().enumerate() { t.federated.push(host.to_owned()); track_sources.push(TrackSource::Remote(i)) } Some(track_sources) } else { None }; drop(_permit); let mut node = Node { public: node.clone(), private: NodePrivate { backdrop: None, poster: None, id: None, source: track_sources, }, }; make_opt_asset_federated(host, &mut node.public.backdrop)?; make_opt_asset_federated(host, &mut node.public.poster)?; for (_, g) in &mut node_ext.people { for a in g { make_opt_asset_federated(host, &mut a.person.headshot)?; } } debug!("adding {id}"); insert_node(&id, node.clone())?; insert_node_ext(&id, node_ext)?; let mut children: FuturesUnordered<_> = node .public .children .iter() .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) .collect(); while let Some(r) = children.next().await { r?; } Ok(()) } pub fn make_opt_asset_federated(host: &str, p: &mut Option) -> anyhow::Result<()> { if let Some(a) = p { make_asset_federated(host, a)? } Ok(()) } pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; *p = AssetInner::Federated { host: host.to_owned(), asset: data, } .ser(); Ok(()) }