/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2024 metamuffin */ pub mod infojson; pub mod tmdb; pub mod trakt; // use anyhow::{anyhow, bail, Context, Error, Ok}; // use async_recursion::async_recursion; // use base64::Engine; // use db::{DatabaseStorage, ImportStorage, MemoryStorage}; // use futures::{stream::FuturesUnordered, StreamExt}; // use jellybase::{ // assetfed::AssetInner, // cache::{async_cache_file, cache_memory}, // database::DataAcid, // federation::Federation, // CONF, SECRETS, // }; // use jellyclient::Session; // use jellycommon::{ // chrono::{DateTime, Datelike}, // Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, // NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, // }; // use jellymatroska::read::EbmlReader; // use jellyremuxer::metadata::import_metadata; // use log::{debug, info, warn}; // use regex::Regex; // use std::{ // cmp::Ordering, // collections::HashSet, // ffi::OsStr, // fs::File, // hash::RandomState, // io::BufReader, // ops::Deref, // path::{Path, PathBuf}, // sync::{Arc, LazyLock}, // }; // use tmdb::{parse_release_date, Tmdb}; // use tokio::{ // io::AsyncWriteExt, // sync::{RwLock, Semaphore}, // task::spawn_blocking, // }; // use trakt::Trakt; // static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); // pub static IMPORT_ERRORS: RwLock> = RwLock::const_new(Vec::new()); // static RE_EPISODE_FILENAME: LazyLock = // LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); // static RE_YOUTUBE_ID: LazyLock = // LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); // struct Apis { // trakt: Option, // tmdb: Option, // } // pub fn is_importing() -> bool { // IMPORT_SEM.available_permits() == 0 // } // pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { // let permit = IMPORT_SEM.try_acquire()?; // let ap = Apis { // trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), // tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), // }; // let e = if CONF.use_in_memory_import_storage { // import_inner(&MemoryStorage::new(db), fed, &ap).await // } else { // import_inner(&DatabaseStorage::new(db), fed, &ap).await // }; // let e = match e { // Result::Ok(e) => e, // Result::Err(e) => vec![e], // }; // *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); // drop(permit); // Ok(()) // } // pub(crate) async fn import_inner( // db: &impl ImportStorage, // fed: &Federation, // ap: &Apis, // ) -> anyhow::Result> { // db.pre_clean()?; // info!("loading sources..."); // let mut errors = Vec::new(); // match import_path(CONF.library_path.clone(), vec![], db, fed, ap) // .await // .context("indexing") // { // Result::Ok(o) => errors.extend(o), // Result::Err(e) => errors.push(e), // }; // db.remove_prev_nodes()?; // info!("merging nodes..."); // match generate_node_paths(db).context("merging nodes") { // Result::Ok(o) => errors.extend(o), // Result::Err(e) => errors.push(e), // } // db.finish()?; // info!("import completed"); // Ok(errors) // } // fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result> { // // TODO mark nodes done to allow recursion // fn traverse( // db: &impl ImportStorage, // id: String, // mut path: Vec, // parent_title: &str, // ) -> anyhow::Result> { // let mut errors = Vec::new(); // let node = { // let mut parts = db // .get_partial_parts(&id) // .context(anyhow!("path = {path:?}"))?; // parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); // let mut node = parts // .into_iter() // .map(|(_, x)| x) // .reduce(|x, y| merge_node(x, y).unwrap()) // .unwrap(); // node.public.id = Some(id.to_owned()); // node.public.path = vec![]; // will be reconstructed in the next pass // node.public.federated = None; // // TODO this discardes a lot of information. maybe change this. // if let Some(media) = &node.public.media { // for t in &media.tracks { // if let Some(host) = t.federated.first() { // if host != &CONF.hostname { // node.public.federated = Some(host.to_string()) // } // } // } // } // if node.public.path.is_empty() { // node.public.path = path.clone(); // } // node.public.subtitle = match node.public.kind.unwrap_or_default() { // NodeKind::Movie => node.public.release_date.map(|date| { // format!( // "{}", // DateTime::from_timestamp_millis(date) // .unwrap() // .date_naive() // .year() // ) // }), // NodeKind::Season // | NodeKind::Episode // | NodeKind::ShortFormVideo // | NodeKind::Video => Some(parent_title.to_string()), // _ => None, // }; // db.insert_complete_node(&id, node.clone())?; // node // }; // path.push(id); // let ps = node.public.title.unwrap_or_default(); // for c in node.public.children { // match traverse(db, c, path.clone(), &ps) { // Result::Ok(o) => errors.extend(o), // Result::Err(e) => errors.push(e), // } // } // Ok(errors) // } // traverse(db, "library".to_string(), vec![], "Root") // } // fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { // if x.is_empty() { // Ordering::Greater // } else if y.is_empty() { // Ordering::Less // } else { // match x[0].cmp(&y[0]) { // o @ (Ordering::Less | Ordering::Greater) => o, // Ordering::Equal => compare_index_path(&x[1..], &y[1..]), // } // } // } // #[async_recursion] // async fn import_path( // path: PathBuf, // mut index_path: Vec, // db: &impl ImportStorage, // fed: &Federation, // ap: &Apis, // ) -> anyhow::Result> { // let mut errors = Vec::new(); // if path.is_dir() { // let mut children_paths = path // .read_dir()? // .map(Result::unwrap) // .filter_map(|e| { // if e.path().extension() == Some(OsStr::new("yaml")) // || e.path().extension() == Some(OsStr::new("jelly")) // || e.metadata().unwrap().is_dir() // { // Some(e.path()) // } else { // None // } // }) // .collect::>(); // children_paths.sort(); // let mut children: FuturesUnordered<_> = children_paths // .into_iter() // .enumerate() // .map(|(i, p)| { // import_path( // p.clone(), // { // let mut path = index_path.clone(); // path.push(i); // path // }, // db, // fed, // ap, // ) // }) // .collect(); // while let Some(k) = children.next().await { // match k { // Result::Ok(o) => errors.extend(o), // Result::Err(e) => errors.push(e), // } // } // } else { // info!("reading {path:?}"); // let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; // let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { // serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? // } else { // serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? // }; // for (i, s) in opts.sources.into_iter().enumerate() { // index_path.push(i); // if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) // .await // .context(anyhow!("processing source in {path:?}")) // { // errors.push(e) // } // index_path.pop(); // } // } // Ok(errors) // } // static SEM_IMPORT: Semaphore = Semaphore::const_new(2); // #[async_recursion] // async fn process_source( // id: String, // s: ImportSource, // path: &Path, // index_path: &[usize], // db: &impl ImportStorage, // fed: &Federation, // ap: &Apis, // ) -> anyhow::Result> { // let mut errors = vec![]; // match s { // ImportSource::Override(mut n) => { // if let Some(backdrop) = n.private.backdrop.clone() { // n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); // } // if let Some(poster) = n.private.poster.clone() { // n.public.poster = Some(AssetInner::Library(poster).ser()); // } // db.add_partial_node(&id, index_path, n)? // } // ImportSource::Trakt { id: tid, kind } => { // info!("trakt {id}"); // let trakt = ap // .trakt // .as_ref() // .ok_or(anyhow!("trakt api key is required"))?; // let trakt_object = trakt // .lookup(kind, tid, true) // .await // .context("looking up metadata")?; // let trakt_people = trakt // .people(kind, tid, true) // .await // .context("looking up people")?; // let mut node = Node::default(); // let mut node_ext = ExtendedNode::default(); // { // node.public.kind = Some(match kind { // TraktKind::Movie => NodeKind::Movie, // TraktKind::Show => NodeKind::Show, // TraktKind::Season => NodeKind::Season, // TraktKind::Episode => NodeKind::Episode, // _ => bail!("unexpected kind for trakt import"), // }); // node.public.title = Some(trakt_object.title.to_owned()); // if let Some(overview) = &trakt_object.overview { // node.public.description = Some(overview.to_owned()) // } // if let Some(tagline) = &trakt_object.tagline { // node.public.tagline = Some(tagline.to_owned()) // } // if let Some(rating) = &trakt_object.rating { // node.public.ratings.insert(Rating::Trakt, *rating); // } // for p in trakt_people.cast.iter() { // node_ext // .people // .entry(PeopleGroup::Cast) // .or_default() // .push(p.a()) // } // for (group, people) in trakt_people.crew.iter() { // for p in people { // node_ext.people.entry(group.a()).or_default().push(p.a()) // } // } // // TODO lazy assets // for ps in node_ext.people.values_mut() { // for p in ps { // if let Some(id) = p.person.ids.tmdb { // if let Some(tmdb) = &ap.tmdb { // let k = tmdb.person_image(id).await?; // if let Some(prof) = k.profiles.first() { // p.person.headshot = Some( // AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), // ); // } // } // } // } // } // } // db.add_partial_node(&id, index_path, node)?; // db.add_partial_node_ext(&id, index_path, node_ext)?; // if let Some(tid) = trakt_object.ids.tmdb { // if let Some(kind) = match kind { // TraktKind::Movie => Some(TmdbKind::Movie), // TraktKind::Show => Some(TmdbKind::Tv), // TraktKind::Season => Some(TmdbKind::Tv), // TODO // TraktKind::Episode | TraktKind::Person | TraktKind::User => None, // } { // let mut index_path = index_path.to_vec(); // index_path.push(1); // match process_source( // id, // ImportSource::Tmdb { id: tid, kind }, // path, // &index_path, // db, // fed, // ap, // ) // .await // { // Result::Ok(o) => errors.extend(o), // Result::Err(e) => errors.push(e), // } // } // } // } // ImportSource::Tmdb { id: tid, kind } => { // info!("tmdb {id}"); // let tmdb = ap // .tmdb // .as_ref() // .ok_or(anyhow!("tmdb api key is required"))?; // let details = tmdb.details(kind, tid).await?; // let mut node = Node::default(); // // TODO lazy assets // if let Some(poster) = &details.poster_path { // node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser()); // } // if let Some(backdrop) = &details.backdrop_path { // node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser()); // } // node.public.tagline = details.tagline.clone(); // node.public.title = details.title.clone(); // node.public.description = Some(details.overview.clone()); // node.public // .ratings // .insert(Rating::Tmdb, details.vote_average); // if let Some(date) = details.release_date.clone() { // node.public.release_date = // parse_release_date(&date).context("parsing release date")?; // } // db.add_partial_node(&id, index_path, node)?; // } // ImportSource::Media { // path: mpath, // ignore_attachments, // ignore_chapters, // ignore_metadata, // } => { // info!("media import {mpath:?}"); // let abspath = CONF.media_path.join(&mpath); // if !abspath.exists() { // bail!("media missing at {abspath:?}"); // } // if abspath.is_dir() { // let mut node = Node::default(); // for f in abspath.read_dir()? { // let f = f?; // let child_path = f.path(); // if child_path.is_dir() // || matches!( // child_path.extension().map(|o| o.to_str().unwrap()), // Some("mks" | "mka" | "mkv" | "webm") // ) // { // let inf_id = // infer_id_from_path(&child_path).context("inferring child id")?; // match process_source( // inf_id.clone(), // ImportSource::Media { // path: mpath.join(f.file_name()), // ignore_attachments, // ignore_chapters, // ignore_metadata, // }, // path, // index_path, // db, // fed, // ap, // ) // .await // .context(anyhow!("recursive media import: {:?}", f.path())) // { // Result::Ok(o) => errors.extend(o), // Result::Err(e) => errors.push(e), // }; // node.public.children.push(inf_id); // } // } // db.add_partial_node(&id, index_path, node)?; // } else if abspath.is_file() { // let _permit = SEM_IMPORT.acquire().await.unwrap(); // let metadata = { // let abspath = abspath.clone(); // let mpath = mpath.to_owned(); // spawn_blocking(move || { // cache_memory(&["probe", mpath.to_str().unwrap()], || { // let input = File::open(&abspath).context("opening media file")?; // let mut input = EbmlReader::new(BufReader::new(input)); // import_metadata(&mut input) // }) // }) // } // .await? // .context(anyhow!("probing {abspath:?}"))? // .deref() // .to_owned(); // let mut node = Node::default(); // if !ignore_metadata { // if let Some(captures) = // RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap()) // { // node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok()); // if let Some(title) = captures.get(6) { // node.public.title = Some(title.as_str().to_string()); // } // } // node.public.title = metadata.title; // node.public.description = metadata.description; // node.public.tagline = metadata.tagline; // } // node.public.media = Some(MediaInfo { // duration: metadata.duration, // tracks: metadata.tracks, // chapters: if ignore_chapters { // vec![] // } else { // metadata.chapters // }, // }); // node.private.source = Some( // metadata // .track_sources // .into_iter() // .map(|mut ts| { // ts.path = mpath.to_owned(); // TrackSource::Local(ts) // }) // .collect(), // ); // if !ignore_attachments { // if let Some((filename, data)) = metadata.cover { // node.public.poster = Some( // AssetInner::Cache( // async_cache_file( // &["att-cover", mpath.to_str().unwrap(), &filename], // |mut f| async move { // f.write_all(&data).await?; // Ok(()) // }, // ) // .await?, // ) // .ser(), // ) // }; // if let Some(infojson) = metadata.infojson { // let infojson: infojson::YVideo = // serde_json::from_str(&infojson).context("parsing infojson")?; // node.public.kind = Some( // if infojson.duration.unwrap_or(0.) < 120. // && infojson.aspect_ratio.unwrap_or(2.) < 1. // { // NodeKind::ShortFormVideo // } else { // NodeKind::Video // }, // ); // node.public.title = Some(infojson.title); // node.public.description = Some(infojson.description); // node.public.tagline = Some(infojson.webpage_url); // node.public // .ratings // .insert(Rating::YoutubeViews, infojson.view_count as f64); // node.public.release_date = Some( // infojson::parse_upload_date(&infojson.upload_date) // .context("parsing upload date")?, // ); // node.public.ratings.extend( // infojson // .like_count // .map(|l| (Rating::YoutubeLikes, l as f64)), // ); // } // } // drop(_permit); // db.add_partial_node(&id, index_path, node)?; // } else { // warn!("non file/dir import ignored: {abspath:?}") // } // } // ImportSource::Federated { host } => { // info!("federated import of {id:?} from {host:?}"); // let session = fed.get_session(&host).await.context("creating session")?; // import_remote(id.clone(), &host, db, &session, index_path) // .await // .context("federated import")? // } // ImportSource::AutoChildren { path: cpath } => { // info!("auto children at {path:?}"); // let paths = cpath // .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) // .read_dir()? // .map(Result::unwrap) // .map(|e| e.path()) // .filter(|e| { // e.extension() == Some(OsStr::new("yaml")) // || e.extension() == Some(OsStr::new("jelly")) // }); // let mut children = Vec::new(); // for p in paths { // let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { // serde_json::from_reader(File::open(&p)?)? // } else { // serde_yaml::from_reader(File::open(&p)?)? // }; // if opts.id != id { // children.push(opts.id); // } // } // db.add_partial_node( // &id, // index_path, // Node { // private: NodePrivate::default(), // public: NodePublic { // children, // ..Default::default() // }, // }, // )?; // } // } // Ok(errors) // } // pub fn infer_id_from_path(path: &Path) -> anyhow::Result { // let f = path // .file_stem() // .ok_or(anyhow!("no filename"))? // .to_str() // .ok_or(anyhow!("non utf8 filename"))?; // if let Some(mat) = RE_YOUTUBE_ID.captures(f) { // let id = mat.get(1).unwrap().as_str(); // return Ok(format!("youtube-{id}")); // } // let mut fsan = String::with_capacity(f.len()); // for c in f.chars() { // fsan.extend(match c { // 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), // ' ' => Some('-'), // _ => None, // }); // } // Ok(fsan) // } // fn merge_node(x: Node, y: Node) -> anyhow::Result { // let (media, source) = match ( // x.public.media, // y.public.media, // x.private.source, // y.private.source, // ) { // (Some(x), Some(y), Some(sx), Some(sy)) => { // let k = merge_media(x, y, sx, sy); // (Some(k.0), Some(k.1)) // } // (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), // (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), // (None, None, None, None) => (None, None), // _ => bail!("invalid node. source and media dont agree."), // }; // Ok(Node { // public: NodePublic { // kind: x.public.kind.or(y.public.kind), // title: x.public.title.or(y.public.title), // subtitle: x.public.subtitle.or(y.public.subtitle), // id: x.public.id.or(y.public.id), // path: vec![], // children: merge_children(x.public.children, y.public.children), // tagline: x.public.tagline.or(y.public.tagline), // description: x.public.description.or(y.public.description), // release_date: x.public.release_date.or(y.public.release_date), // index: x.public.index.or(y.public.index), // media, // ratings: x // .public // .ratings // .into_iter() // .chain(y.public.ratings) // .collect(), // federated: None, // poster: x.public.poster.or(y.public.poster), // backdrop: x.public.backdrop.or(y.public.backdrop), // }, // private: NodePrivate { // id: x.private.id.or(y.private.id), // source, // backdrop: None, // poster: None, // }, // }) // } // fn merge_children(mut a: Vec, b: Vec) -> Vec { // let acont = HashSet::<_, RandomState>::from_iter(a.clone()); // for el in b { // if !acont.contains(&el) { // a.push(el) // } // } // a // } // fn merge_media( // x: MediaInfo, // y: MediaInfo, // sx: Vec, // sy: Vec, // ) -> (MediaInfo, Vec) { // let mut tracks: Vec = Vec::new(); // let mut source: Vec = Vec::new(); // for (t, s) in x // .tracks // .into_iter() // .zip(sx.into_iter()) // .chain(y.tracks.into_iter().zip(sy.into_iter())) // { // let mut remove = None; // let mut skip = false; // for (i, ot) in tracks.iter().enumerate() { // if t.name == ot.name // && t.kind == ot.kind // && t.language == ot.language // && t.codec == ot.codec // { // if t.federated.len() < ot.federated.len() { // remove = Some(i); // } else { // skip = true; // } // } // } // if let Some(r) = remove { // tracks.swap_remove(r); // source.swap_remove(r); // } // if !skip { // tracks.push(t); // source.push(s); // } // } // ( // MediaInfo { // duration: x.duration * 0.5 + y.duration * 0.5, // tracks, // chapters: if x.chapters.len() > y.chapters.len() { // x.chapters // } else { // y.chapters // }, // }, // source, // ) // } // static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); // #[async_recursion] // async fn import_remote( // id: String, // host: &str, // db: &impl ImportStorage, // session: &Arc, // index_path: &[usize], // ) -> anyhow::Result<()> { // let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); // info!("loading federated node {id:?}"); // let mut node = session.node(&id).await.context("fetching remote node")?; // let mut node_ext = session // .node_extended(&id) // .await // .context("fetching extended remote node")?; // let track_sources = if let Some(media) = &mut node.media { // let mut track_sources = Vec::new(); // for (i, t) in media.tracks.iter_mut().enumerate() { // t.federated.push(host.to_owned()); // track_sources.push(TrackSource::Remote(i)) // } // Some(track_sources) // } else { // None // }; // drop(_permit); // let mut node = Node { // public: node.clone(), // private: NodePrivate { // backdrop: None, // poster: None, // id: None, // source: track_sources, // }, // }; // make_opt_asset_federated(host, &mut node.public.backdrop)?; // make_opt_asset_federated(host, &mut node.public.poster)?; // for g in node_ext.people.values_mut() { // for a in g { // make_opt_asset_federated(host, &mut a.person.headshot)?; // } // } // debug!("adding {id}"); // db.add_partial_node(&id, index_path, node.clone())?; // db.add_partial_node_ext(&id, index_path, node_ext)?; // let mut children: FuturesUnordered<_> = node // .public // .children // .iter() // .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) // .collect(); // while let Some(r) = children.next().await { // r?; // } // Ok(()) // } // pub fn make_opt_asset_federated(host: &str, p: &mut Option) -> anyhow::Result<()> { // if let Some(a) = p { // make_asset_federated(host, a)? // } // Ok(()) // } // pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { // let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; // *p = AssetInner::Federated { // host: host.to_owned(), // asset: data, // } // .ser(); // Ok(()) // }