diff options
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r-- | import/src/lib.rs | 1551 |
1 files changed, 775 insertions, 776 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index 89088e9..df23f7e 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -3,840 +3,839 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2024 metamuffin <metamuffin.org> */ -pub mod db; pub mod infojson; pub mod tmdb; pub mod trakt; -use anyhow::{anyhow, bail, Context, Error, Ok}; -use async_recursion::async_recursion; -use base64::Engine; -use db::{DatabaseStorage, ImportStorage, MemoryStorage}; -use futures::{stream::FuturesUnordered, StreamExt}; -use jellybase::{ - assetfed::AssetInner, - cache::{async_cache_file, cache_memory}, - database::DataAcid, - federation::Federation, - CONF, SECRETS, -}; -use jellyclient::Session; -use jellycommon::{ - chrono::{DateTime, Datelike}, - Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, - NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, -}; -use jellymatroska::read::EbmlReader; -use jellyremuxer::metadata::import_metadata; -use log::{debug, info, warn}; -use regex::Regex; -use std::{ - cmp::Ordering, - collections::HashSet, - ffi::OsStr, - fs::File, - hash::RandomState, - io::BufReader, - ops::Deref, - path::{Path, PathBuf}, - sync::{Arc, LazyLock}, -}; -use tmdb::{parse_release_date, Tmdb}; -use tokio::{ - io::AsyncWriteExt, - sync::{RwLock, Semaphore}, - task::spawn_blocking, -}; -use trakt::Trakt; +// use anyhow::{anyhow, bail, Context, Error, Ok}; +// use async_recursion::async_recursion; +// use base64::Engine; +// use db::{DatabaseStorage, ImportStorage, MemoryStorage}; +// use futures::{stream::FuturesUnordered, StreamExt}; +// use jellybase::{ +// assetfed::AssetInner, +// cache::{async_cache_file, cache_memory}, +// database::DataAcid, +// federation::Federation, +// CONF, SECRETS, +// }; +// use jellyclient::Session; +// use jellycommon::{ +// chrono::{DateTime, Datelike}, +// Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, +// NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, +// }; +// use jellymatroska::read::EbmlReader; +// use jellyremuxer::metadata::import_metadata; +// use log::{debug, info, warn}; +// use regex::Regex; +// use std::{ +// cmp::Ordering, +// collections::HashSet, +// ffi::OsStr, +// fs::File, +// hash::RandomState, +// io::BufReader, +// ops::Deref, +// path::{Path, PathBuf}, +// sync::{Arc, LazyLock}, +// }; +// use tmdb::{parse_release_date, Tmdb}; +// use tokio::{ +// io::AsyncWriteExt, +// sync::{RwLock, Semaphore}, +// task::spawn_blocking, +// }; +// use trakt::Trakt; -static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); -pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new()); +// static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); +// pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new()); -static RE_EPISODE_FILENAME: LazyLock<Regex> = - LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); -static RE_YOUTUBE_ID: LazyLock<Regex> = - LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); +// static RE_EPISODE_FILENAME: LazyLock<Regex> = +// LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); +// static RE_YOUTUBE_ID: LazyLock<Regex> = +// LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); -struct Apis { - trakt: Option<Trakt>, - tmdb: Option<Tmdb>, -} +// struct Apis { +// trakt: Option<Trakt>, +// tmdb: Option<Tmdb>, +// } -pub fn is_importing() -> bool { - IMPORT_SEM.available_permits() == 0 -} +// pub fn is_importing() -> bool { +// IMPORT_SEM.available_permits() == 0 +// } -pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { - let permit = IMPORT_SEM.try_acquire()?; +// pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { +// let permit = IMPORT_SEM.try_acquire()?; - let ap = Apis { - trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), - tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), - }; +// let ap = Apis { +// trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), +// tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), +// }; - let e = if CONF.use_in_memory_import_storage { - import_inner(&MemoryStorage::new(db), fed, &ap).await - } else { - import_inner(&DatabaseStorage::new(db), fed, &ap).await - }; - let e = match e { - Result::Ok(e) => e, - Result::Err(e) => vec![e], - }; - *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); +// let e = if CONF.use_in_memory_import_storage { +// import_inner(&MemoryStorage::new(db), fed, &ap).await +// } else { +// import_inner(&DatabaseStorage::new(db), fed, &ap).await +// }; +// let e = match e { +// Result::Ok(e) => e, +// Result::Err(e) => vec![e], +// }; +// *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); - drop(permit); - Ok(()) -} +// drop(permit); +// Ok(()) +// } -pub(crate) async fn import_inner( - db: &impl ImportStorage, - fed: &Federation, - ap: &Apis, -) -> anyhow::Result<Vec<anyhow::Error>> { - db.pre_clean()?; - info!("loading sources..."); - let mut errors = Vec::new(); - match import_path(CONF.library_path.clone(), vec![], db, fed, ap) - .await - .context("indexing") - { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - }; - db.remove_prev_nodes()?; - info!("merging nodes..."); - match generate_node_paths(db).context("merging nodes") { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - db.finish()?; - info!("import completed"); - Ok(errors) -} +// pub(crate) async fn import_inner( +// db: &impl ImportStorage, +// fed: &Federation, +// ap: &Apis, +// ) -> anyhow::Result<Vec<anyhow::Error>> { +// db.pre_clean()?; +// info!("loading sources..."); +// let mut errors = Vec::new(); +// match import_path(CONF.library_path.clone(), vec![], db, fed, ap) +// .await +// .context("indexing") +// { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// }; +// db.remove_prev_nodes()?; +// info!("merging nodes..."); +// match generate_node_paths(db).context("merging nodes") { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// db.finish()?; +// info!("import completed"); +// Ok(errors) +// } -fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<Vec<Error>> { - // TODO mark nodes done to allow recursion - fn traverse( - db: &impl ImportStorage, - id: String, - mut path: Vec<String>, - parent_title: &str, - ) -> anyhow::Result<Vec<Error>> { - let mut errors = Vec::new(); - let node = { - let mut parts = db - .get_partial_parts(&id) - .context(anyhow!("path = {path:?}"))?; +// fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<Vec<Error>> { +// // TODO mark nodes done to allow recursion +// fn traverse( +// db: &impl ImportStorage, +// id: String, +// mut path: Vec<String>, +// parent_title: &str, +// ) -> anyhow::Result<Vec<Error>> { +// let mut errors = Vec::new(); +// let node = { +// let mut parts = db +// .get_partial_parts(&id) +// .context(anyhow!("path = {path:?}"))?; - parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); +// parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); - let mut node = parts - .into_iter() - .map(|(_, x)| x) - .reduce(|x, y| merge_node(x, y).unwrap()) - .unwrap(); +// let mut node = parts +// .into_iter() +// .map(|(_, x)| x) +// .reduce(|x, y| merge_node(x, y).unwrap()) +// .unwrap(); - node.public.id = Some(id.to_owned()); - node.public.path = vec![]; // will be reconstructed in the next pass - node.public.federated = None; +// node.public.id = Some(id.to_owned()); +// node.public.path = vec![]; // will be reconstructed in the next pass +// node.public.federated = None; - // TODO this discardes a lot of information. maybe change this. - if let Some(media) = &node.public.media { - for t in &media.tracks { - if let Some(host) = t.federated.first() { - if host != &CONF.hostname { - node.public.federated = Some(host.to_string()) - } - } - } - } +// // TODO this discardes a lot of information. maybe change this. +// if let Some(media) = &node.public.media { +// for t in &media.tracks { +// if let Some(host) = t.federated.first() { +// if host != &CONF.hostname { +// node.public.federated = Some(host.to_string()) +// } +// } +// } +// } - if node.public.path.is_empty() { - node.public.path = path.clone(); - } - node.public.subtitle = match node.public.kind.unwrap_or_default() { - NodeKind::Movie => node.public.release_date.map(|date| { - format!( - "{}", - DateTime::from_timestamp_millis(date) - .unwrap() - .date_naive() - .year() - ) - }), - NodeKind::Season - | NodeKind::Episode - | NodeKind::ShortFormVideo - | NodeKind::Video => Some(parent_title.to_string()), - _ => None, - }; +// if node.public.path.is_empty() { +// node.public.path = path.clone(); +// } +// node.public.subtitle = match node.public.kind.unwrap_or_default() { +// NodeKind::Movie => node.public.release_date.map(|date| { +// format!( +// "{}", +// DateTime::from_timestamp_millis(date) +// .unwrap() +// .date_naive() +// .year() +// ) +// }), +// NodeKind::Season +// | NodeKind::Episode +// | NodeKind::ShortFormVideo +// | NodeKind::Video => Some(parent_title.to_string()), +// _ => None, +// }; - db.insert_complete_node(&id, node.clone())?; +// db.insert_complete_node(&id, node.clone())?; - node - }; +// node +// }; - path.push(id); - let ps = node.public.title.unwrap_or_default(); - for c in node.public.children { - match traverse(db, c, path.clone(), &ps) { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - } - Ok(errors) - } - traverse(db, "library".to_string(), vec![], "Root") -} +// path.push(id); +// let ps = node.public.title.unwrap_or_default(); +// for c in node.public.children { +// match traverse(db, c, path.clone(), &ps) { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// } +// Ok(errors) +// } +// traverse(db, "library".to_string(), vec![], "Root") +// } -fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { - if x.is_empty() { - Ordering::Greater - } else if y.is_empty() { - Ordering::Less - } else { - match x[0].cmp(&y[0]) { - o @ (Ordering::Less | Ordering::Greater) => o, - Ordering::Equal => compare_index_path(&x[1..], &y[1..]), - } - } -} +// fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { +// if x.is_empty() { +// Ordering::Greater +// } else if y.is_empty() { +// Ordering::Less +// } else { +// match x[0].cmp(&y[0]) { +// o @ (Ordering::Less | Ordering::Greater) => o, +// Ordering::Equal => compare_index_path(&x[1..], &y[1..]), +// } +// } +// } -#[async_recursion] -async fn import_path( - path: PathBuf, - mut index_path: Vec<usize>, - db: &impl ImportStorage, - fed: &Federation, - ap: &Apis, -) -> anyhow::Result<Vec<anyhow::Error>> { - let mut errors = Vec::new(); - if path.is_dir() { - let mut children_paths = path - .read_dir()? - .map(Result::unwrap) - .filter_map(|e| { - if e.path().extension() == Some(OsStr::new("yaml")) - || e.path().extension() == Some(OsStr::new("jelly")) - || e.metadata().unwrap().is_dir() - { - Some(e.path()) - } else { - None - } - }) - .collect::<Vec<_>>(); +// #[async_recursion] +// async fn import_path( +// path: PathBuf, +// mut index_path: Vec<usize>, +// db: &impl ImportStorage, +// fed: &Federation, +// ap: &Apis, +// ) -> anyhow::Result<Vec<anyhow::Error>> { +// let mut errors = Vec::new(); +// if path.is_dir() { +// let mut children_paths = path +// .read_dir()? +// .map(Result::unwrap) +// .filter_map(|e| { +// if e.path().extension() == Some(OsStr::new("yaml")) +// || e.path().extension() == Some(OsStr::new("jelly")) +// || e.metadata().unwrap().is_dir() +// { +// Some(e.path()) +// } else { +// None +// } +// }) +// .collect::<Vec<_>>(); - children_paths.sort(); +// children_paths.sort(); - let mut children: FuturesUnordered<_> = children_paths - .into_iter() - .enumerate() - .map(|(i, p)| { - import_path( - p.clone(), - { - let mut path = index_path.clone(); - path.push(i); - path - }, - db, - fed, - ap, - ) - }) - .collect(); +// let mut children: FuturesUnordered<_> = children_paths +// .into_iter() +// .enumerate() +// .map(|(i, p)| { +// import_path( +// p.clone(), +// { +// let mut path = index_path.clone(); +// path.push(i); +// path +// }, +// db, +// fed, +// ap, +// ) +// }) +// .collect(); - while let Some(k) = children.next().await { - match k { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - } - } else { - info!("reading {path:?}"); - let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; - let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { - serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? - } else { - serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? - }; +// while let Some(k) = children.next().await { +// match k { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// } +// } else { +// info!("reading {path:?}"); +// let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; +// let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { +// serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? +// } else { +// serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? +// }; - for (i, s) in opts.sources.into_iter().enumerate() { - index_path.push(i); - if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) - .await - .context(anyhow!("processing source in {path:?}")) - { - errors.push(e) - } - index_path.pop(); - } - } - Ok(errors) -} +// for (i, s) in opts.sources.into_iter().enumerate() { +// index_path.push(i); +// if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) +// .await +// .context(anyhow!("processing source in {path:?}")) +// { +// errors.push(e) +// } +// index_path.pop(); +// } +// } +// Ok(errors) +// } -static SEM_IMPORT: Semaphore = Semaphore::const_new(2); +// static SEM_IMPORT: Semaphore = Semaphore::const_new(2); -#[async_recursion] -async fn process_source( - id: String, - s: ImportSource, - path: &Path, - index_path: &[usize], - db: &impl ImportStorage, - fed: &Federation, - ap: &Apis, -) -> anyhow::Result<Vec<anyhow::Error>> { - let mut errors = vec![]; - match s { - ImportSource::Override(mut n) => { - if let Some(backdrop) = n.private.backdrop.clone() { - n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); - } - if let Some(poster) = n.private.poster.clone() { - n.public.poster = Some(AssetInner::Library(poster).ser()); - } - db.add_partial_node(&id, index_path, n)? - } - ImportSource::Trakt { id: tid, kind } => { - info!("trakt {id}"); - let trakt = ap - .trakt - .as_ref() - .ok_or(anyhow!("trakt api key is required"))?; - let trakt_object = trakt - .lookup(kind, tid, true) - .await - .context("looking up metadata")?; - let trakt_people = trakt - .people(kind, tid, true) - .await - .context("looking up people")?; +// #[async_recursion] +// async fn process_source( +// id: String, +// s: ImportSource, +// path: &Path, +// index_path: &[usize], +// db: &impl ImportStorage, +// fed: &Federation, +// ap: &Apis, +// ) -> anyhow::Result<Vec<anyhow::Error>> { +// let mut errors = vec![]; +// match s { +// ImportSource::Override(mut n) => { +// if let Some(backdrop) = n.private.backdrop.clone() { +// n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); +// } +// if let Some(poster) = n.private.poster.clone() { +// n.public.poster = Some(AssetInner::Library(poster).ser()); +// } +// db.add_partial_node(&id, index_path, n)? +// } +// ImportSource::Trakt { id: tid, kind } => { +// info!("trakt {id}"); +// let trakt = ap +// .trakt +// .as_ref() +// .ok_or(anyhow!("trakt api key is required"))?; +// let trakt_object = trakt +// .lookup(kind, tid, true) +// .await +// .context("looking up metadata")?; +// let trakt_people = trakt +// .people(kind, tid, true) +// .await +// .context("looking up people")?; - let mut node = Node::default(); - let mut node_ext = ExtendedNode::default(); - { - node.public.kind = Some(match kind { - TraktKind::Movie => NodeKind::Movie, - TraktKind::Show => NodeKind::Show, - TraktKind::Season => NodeKind::Season, - TraktKind::Episode => NodeKind::Episode, - _ => bail!("unexpected kind for trakt import"), - }); - node.public.title = Some(trakt_object.title.to_owned()); - if let Some(overview) = &trakt_object.overview { - node.public.description = Some(overview.to_owned()) - } - if let Some(tagline) = &trakt_object.tagline { - node.public.tagline = Some(tagline.to_owned()) - } - if let Some(rating) = &trakt_object.rating { - node.public.ratings.insert(Rating::Trakt, *rating); - } - for p in trakt_people.cast.iter() { - node_ext - .people - .entry(PeopleGroup::Cast) - .or_default() - .push(p.a()) - } - for (group, people) in trakt_people.crew.iter() { - for p in people { - node_ext.people.entry(group.a()).or_default().push(p.a()) - } - } - // TODO lazy assets - for ps in node_ext.people.values_mut() { - for p in ps { - if let Some(id) = p.person.ids.tmdb { - if let Some(tmdb) = &ap.tmdb { - let k = tmdb.person_image(id).await?; - if let Some(prof) = k.profiles.first() { - p.person.headshot = Some( - AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), - ); - } - } - } - } - } - } - db.add_partial_node(&id, index_path, node)?; - db.add_partial_node_ext(&id, index_path, node_ext)?; +// let mut node = Node::default(); +// let mut node_ext = ExtendedNode::default(); +// { +// node.public.kind = Some(match kind { +// TraktKind::Movie => NodeKind::Movie, +// TraktKind::Show => NodeKind::Show, +// TraktKind::Season => NodeKind::Season, +// TraktKind::Episode => NodeKind::Episode, +// _ => bail!("unexpected kind for trakt import"), +// }); +// node.public.title = Some(trakt_object.title.to_owned()); +// if let Some(overview) = &trakt_object.overview { +// node.public.description = Some(overview.to_owned()) +// } +// if let Some(tagline) = &trakt_object.tagline { +// node.public.tagline = Some(tagline.to_owned()) +// } +// if let Some(rating) = &trakt_object.rating { +// node.public.ratings.insert(Rating::Trakt, *rating); +// } +// for p in trakt_people.cast.iter() { +// node_ext +// .people +// .entry(PeopleGroup::Cast) +// .or_default() +// .push(p.a()) +// } +// for (group, people) in trakt_people.crew.iter() { +// for p in people { +// node_ext.people.entry(group.a()).or_default().push(p.a()) +// } +// } +// // TODO lazy assets +// for ps in node_ext.people.values_mut() { +// for p in ps { +// if let Some(id) = p.person.ids.tmdb { +// if let Some(tmdb) = &ap.tmdb { +// let k = tmdb.person_image(id).await?; +// if let Some(prof) = k.profiles.first() { +// p.person.headshot = Some( +// AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), +// ); +// } +// } +// } +// } +// } +// } +// db.add_partial_node(&id, index_path, node)?; +// db.add_partial_node_ext(&id, index_path, node_ext)?; - if let Some(tid) = trakt_object.ids.tmdb { - if let Some(kind) = match kind { - TraktKind::Movie => Some(TmdbKind::Movie), - TraktKind::Show => Some(TmdbKind::Tv), - TraktKind::Season => Some(TmdbKind::Tv), // TODO - TraktKind::Episode | TraktKind::Person | TraktKind::User => None, - } { - let mut index_path = index_path.to_vec(); - index_path.push(1); - match process_source( - id, - ImportSource::Tmdb { id: tid, kind }, - path, - &index_path, - db, - fed, - ap, - ) - .await - { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - } - } - } - ImportSource::Tmdb { id: tid, kind } => { - info!("tmdb {id}"); - let tmdb = ap - .tmdb - .as_ref() - .ok_or(anyhow!("tmdb api key is required"))?; +// if let Some(tid) = trakt_object.ids.tmdb { +// if let Some(kind) = match kind { +// TraktKind::Movie => Some(TmdbKind::Movie), +// TraktKind::Show => Some(TmdbKind::Tv), +// TraktKind::Season => Some(TmdbKind::Tv), // TODO +// TraktKind::Episode | TraktKind::Person | TraktKind::User => None, +// } { +// let mut index_path = index_path.to_vec(); +// index_path.push(1); +// match process_source( +// id, +// ImportSource::Tmdb { id: tid, kind }, +// path, +// &index_path, +// db, +// fed, +// ap, +// ) +// .await +// { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// } +// } +// } +// ImportSource::Tmdb { id: tid, kind } => { +// info!("tmdb {id}"); +// let tmdb = ap +// .tmdb +// .as_ref() +// .ok_or(anyhow!("tmdb api key is required"))?; - let details = tmdb.details(kind, tid).await?; +// let details = tmdb.details(kind, tid).await?; - let mut node = Node::default(); +// let mut node = Node::default(); - // TODO lazy assets - if let Some(poster) = &details.poster_path { - node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser()); - } - if let Some(backdrop) = &details.backdrop_path { - node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser()); - } +// // TODO lazy assets +// if let Some(poster) = &details.poster_path { +// node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser()); +// } +// if let Some(backdrop) = &details.backdrop_path { +// node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser()); +// } - node.public.tagline = details.tagline.clone(); - node.public.title = details.title.clone(); - node.public.description = Some(details.overview.clone()); - node.public - .ratings - .insert(Rating::Tmdb, details.vote_average); - if let Some(date) = details.release_date.clone() { - node.public.release_date = - parse_release_date(&date).context("parsing release date")?; - } +// node.public.tagline = details.tagline.clone(); +// node.public.title = details.title.clone(); +// node.public.description = Some(details.overview.clone()); +// node.public +// .ratings +// .insert(Rating::Tmdb, details.vote_average); +// if let Some(date) = details.release_date.clone() { +// node.public.release_date = +// parse_release_date(&date).context("parsing release date")?; +// } - db.add_partial_node(&id, index_path, node)?; - } - ImportSource::Media { - path: mpath, - ignore_attachments, - ignore_chapters, - ignore_metadata, - } => { - info!("media import {mpath:?}"); - let abspath = CONF.media_path.join(&mpath); - if !abspath.exists() { - bail!("media missing at {abspath:?}"); - } - if abspath.is_dir() { - let mut node = Node::default(); - for f in abspath.read_dir()? { - let f = f?; - let child_path = f.path(); - if child_path.is_dir() - || matches!( - child_path.extension().map(|o| o.to_str().unwrap()), - Some("mks" | "mka" | "mkv" | "webm") - ) - { - let inf_id = - infer_id_from_path(&child_path).context("inferring child id")?; +// db.add_partial_node(&id, index_path, node)?; +// } +// ImportSource::Media { +// path: mpath, +// ignore_attachments, +// ignore_chapters, +// ignore_metadata, +// } => { +// info!("media import {mpath:?}"); +// let abspath = CONF.media_path.join(&mpath); +// if !abspath.exists() { +// bail!("media missing at {abspath:?}"); +// } +// if abspath.is_dir() { +// let mut node = Node::default(); +// for f in abspath.read_dir()? { +// let f = f?; +// let child_path = f.path(); +// if child_path.is_dir() +// || matches!( +// child_path.extension().map(|o| o.to_str().unwrap()), +// Some("mks" | "mka" | "mkv" | "webm") +// ) +// { +// let inf_id = +// infer_id_from_path(&child_path).context("inferring child id")?; - match process_source( - inf_id.clone(), - ImportSource::Media { - path: mpath.join(f.file_name()), - ignore_attachments, - ignore_chapters, - ignore_metadata, - }, - path, - index_path, - db, - fed, - ap, - ) - .await - .context(anyhow!("recursive media import: {:?}", f.path())) - { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - }; - node.public.children.push(inf_id); - } - } - db.add_partial_node(&id, index_path, node)?; - } else if abspath.is_file() { - let _permit = SEM_IMPORT.acquire().await.unwrap(); - let metadata = { - let abspath = abspath.clone(); - let mpath = mpath.to_owned(); - spawn_blocking(move || { - cache_memory(&["probe", mpath.to_str().unwrap()], || { - let input = File::open(&abspath).context("opening media file")?; - let mut input = EbmlReader::new(BufReader::new(input)); - import_metadata(&mut input) - }) - }) - } - .await? - .context(anyhow!("probing {abspath:?}"))? - .deref() - .to_owned(); +// match process_source( +// inf_id.clone(), +// ImportSource::Media { +// path: mpath.join(f.file_name()), +// ignore_attachments, +// ignore_chapters, +// ignore_metadata, +// }, +// path, +// index_path, +// db, +// fed, +// ap, +// ) +// .await +// .context(anyhow!("recursive media import: {:?}", f.path())) +// { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// }; +// node.public.children.push(inf_id); +// } +// } +// db.add_partial_node(&id, index_path, node)?; +// } else if abspath.is_file() { +// let _permit = SEM_IMPORT.acquire().await.unwrap(); +// let metadata = { +// let abspath = abspath.clone(); +// let mpath = mpath.to_owned(); +// spawn_blocking(move || { +// cache_memory(&["probe", mpath.to_str().unwrap()], || { +// let input = File::open(&abspath).context("opening media file")?; +// let mut input = EbmlReader::new(BufReader::new(input)); +// import_metadata(&mut input) +// }) +// }) +// } +// .await? +// .context(anyhow!("probing {abspath:?}"))? +// .deref() +// .to_owned(); - let mut node = Node::default(); +// let mut node = Node::default(); - if !ignore_metadata { - if let Some(captures) = - RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap()) - { - node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok()); - if let Some(title) = captures.get(6) { - node.public.title = Some(title.as_str().to_string()); - } - } - node.public.title = metadata.title; - node.public.description = metadata.description; - node.public.tagline = metadata.tagline; - } - node.public.media = Some(MediaInfo { - duration: metadata.duration, - tracks: metadata.tracks, - chapters: if ignore_chapters { - vec![] - } else { - metadata.chapters - }, - }); - node.private.source = Some( - metadata - .track_sources - .into_iter() - .map(|mut ts| { - ts.path = mpath.to_owned(); - TrackSource::Local(ts) - }) - .collect(), - ); +// if !ignore_metadata { +// if let Some(captures) = +// RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap()) +// { +// node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok()); +// if let Some(title) = captures.get(6) { +// node.public.title = Some(title.as_str().to_string()); +// } +// } +// node.public.title = metadata.title; +// node.public.description = metadata.description; +// node.public.tagline = metadata.tagline; +// } +// node.public.media = Some(MediaInfo { +// duration: metadata.duration, +// tracks: metadata.tracks, +// chapters: if ignore_chapters { +// vec![] +// } else { +// metadata.chapters +// }, +// }); +// node.private.source = Some( +// metadata +// .track_sources +// .into_iter() +// .map(|mut ts| { +// ts.path = mpath.to_owned(); +// TrackSource::Local(ts) +// }) +// .collect(), +// ); - if !ignore_attachments { - if let Some((filename, data)) = metadata.cover { - node.public.poster = Some( - AssetInner::Cache( - async_cache_file( - &["att-cover", mpath.to_str().unwrap(), &filename], - |mut f| async move { - f.write_all(&data).await?; - Ok(()) - }, - ) - .await?, - ) - .ser(), - ) - }; +// if !ignore_attachments { +// if let Some((filename, data)) = metadata.cover { +// node.public.poster = Some( +// AssetInner::Cache( +// async_cache_file( +// &["att-cover", mpath.to_str().unwrap(), &filename], +// |mut f| async move { +// f.write_all(&data).await?; +// Ok(()) +// }, +// ) +// .await?, +// ) +// .ser(), +// ) +// }; - if let Some(infojson) = metadata.infojson { - let infojson: infojson::YVideo = - serde_json::from_str(&infojson).context("parsing infojson")?; +// if let Some(infojson) = metadata.infojson { +// let infojson: infojson::YVideo = +// serde_json::from_str(&infojson).context("parsing infojson")?; - node.public.kind = Some( - if infojson.duration.unwrap_or(0.) < 120. - && infojson.aspect_ratio.unwrap_or(2.) < 1. - { - NodeKind::ShortFormVideo - } else { - NodeKind::Video - }, - ); - node.public.title = Some(infojson.title); - node.public.description = Some(infojson.description); - node.public.tagline = Some(infojson.webpage_url); - node.public - .ratings - .insert(Rating::YoutubeViews, infojson.view_count as f64); - node.public.release_date = Some( - infojson::parse_upload_date(&infojson.upload_date) - .context("parsing upload date")?, - ); - node.public.ratings.extend( - infojson - .like_count - .map(|l| (Rating::YoutubeLikes, l as f64)), - ); - } - } - drop(_permit); - db.add_partial_node(&id, index_path, node)?; - } else { - warn!("non file/dir import ignored: {abspath:?}") - } - } - ImportSource::Federated { host } => { - info!("federated import of {id:?} from {host:?}"); - let session = fed.get_session(&host).await.context("creating session")?; +// node.public.kind = Some( +// if infojson.duration.unwrap_or(0.) < 120. +// && infojson.aspect_ratio.unwrap_or(2.) < 1. +// { +// NodeKind::ShortFormVideo +// } else { +// NodeKind::Video +// }, +// ); +// node.public.title = Some(infojson.title); +// node.public.description = Some(infojson.description); +// node.public.tagline = Some(infojson.webpage_url); +// node.public +// .ratings +// .insert(Rating::YoutubeViews, infojson.view_count as f64); +// node.public.release_date = Some( +// infojson::parse_upload_date(&infojson.upload_date) +// .context("parsing upload date")?, +// ); +// node.public.ratings.extend( +// infojson +// .like_count +// .map(|l| (Rating::YoutubeLikes, l as f64)), +// ); +// } +// } +// drop(_permit); +// db.add_partial_node(&id, index_path, node)?; +// } else { +// warn!("non file/dir import ignored: {abspath:?}") +// } +// } +// ImportSource::Federated { host } => { +// info!("federated import of {id:?} from {host:?}"); +// let session = fed.get_session(&host).await.context("creating session")?; - import_remote(id.clone(), &host, db, &session, index_path) - .await - .context("federated import")? - } - ImportSource::AutoChildren { path: cpath } => { - info!("auto children at {path:?}"); - let paths = cpath - .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) - .read_dir()? - .map(Result::unwrap) - .map(|e| e.path()) - .filter(|e| { - e.extension() == Some(OsStr::new("yaml")) - || e.extension() == Some(OsStr::new("jelly")) - }); +// import_remote(id.clone(), &host, db, &session, index_path) +// .await +// .context("federated import")? +// } +// ImportSource::AutoChildren { path: cpath } => { +// info!("auto children at {path:?}"); +// let paths = cpath +// .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) +// .read_dir()? +// .map(Result::unwrap) +// .map(|e| e.path()) +// .filter(|e| { +// e.extension() == Some(OsStr::new("yaml")) +// || e.extension() == Some(OsStr::new("jelly")) +// }); - let mut children = Vec::new(); - for p in paths { - let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { - serde_json::from_reader(File::open(&p)?)? - } else { - serde_yaml::from_reader(File::open(&p)?)? - }; - if opts.id != id { - children.push(opts.id); - } - } - db.add_partial_node( - &id, - index_path, - Node { - private: NodePrivate::default(), - public: NodePublic { - children, - ..Default::default() - }, - }, - )?; - } - } - Ok(errors) -} +// let mut children = Vec::new(); +// for p in paths { +// let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { +// serde_json::from_reader(File::open(&p)?)? +// } else { +// serde_yaml::from_reader(File::open(&p)?)? +// }; +// if opts.id != id { +// children.push(opts.id); +// } +// } +// db.add_partial_node( +// &id, +// index_path, +// Node { +// private: NodePrivate::default(), +// public: NodePublic { +// children, +// ..Default::default() +// }, +// }, +// )?; +// } +// } +// Ok(errors) +// } -pub fn infer_id_from_path(path: &Path) -> anyhow::Result<String> { - let f = path - .file_stem() - .ok_or(anyhow!("no filename"))? - .to_str() - .ok_or(anyhow!("non utf8 filename"))?; +// pub fn infer_id_from_path(path: &Path) -> anyhow::Result<String> { +// let f = path +// .file_stem() +// .ok_or(anyhow!("no filename"))? +// .to_str() +// .ok_or(anyhow!("non utf8 filename"))?; - if let Some(mat) = RE_YOUTUBE_ID.captures(f) { - let id = mat.get(1).unwrap().as_str(); - return Ok(format!("youtube-{id}")); - } +// if let Some(mat) = RE_YOUTUBE_ID.captures(f) { +// let id = mat.get(1).unwrap().as_str(); +// return Ok(format!("youtube-{id}")); +// } - let mut fsan = String::with_capacity(f.len()); - for c in f.chars() { - fsan.extend(match c { - 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), - ' ' => Some('-'), - _ => None, - }); - } - Ok(fsan) -} +// let mut fsan = String::with_capacity(f.len()); +// for c in f.chars() { +// fsan.extend(match c { +// 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), +// ' ' => Some('-'), +// _ => None, +// }); +// } +// Ok(fsan) +// } -fn merge_node(x: Node, y: Node) -> anyhow::Result<Node> { - let (media, source) = match ( - x.public.media, - y.public.media, - x.private.source, - y.private.source, - ) { - (Some(x), Some(y), Some(sx), Some(sy)) => { - let k = merge_media(x, y, sx, sy); - (Some(k.0), Some(k.1)) - } - (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), - (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), - (None, None, None, None) => (None, None), - _ => bail!("invalid node. source and media dont agree."), - }; - Ok(Node { - public: NodePublic { - kind: x.public.kind.or(y.public.kind), - title: x.public.title.or(y.public.title), - subtitle: x.public.subtitle.or(y.public.subtitle), - id: x.public.id.or(y.public.id), - path: vec![], - children: merge_children(x.public.children, y.public.children), - tagline: x.public.tagline.or(y.public.tagline), - description: x.public.description.or(y.public.description), - release_date: x.public.release_date.or(y.public.release_date), - index: x.public.index.or(y.public.index), - media, - ratings: x - .public - .ratings - .into_iter() - .chain(y.public.ratings) - .collect(), - federated: None, - poster: x.public.poster.or(y.public.poster), - backdrop: x.public.backdrop.or(y.public.backdrop), - }, - private: NodePrivate { - id: x.private.id.or(y.private.id), - source, - backdrop: None, - poster: None, - }, - }) -} +// fn merge_node(x: Node, y: Node) -> anyhow::Result<Node> { +// let (media, source) = match ( +// x.public.media, +// y.public.media, +// x.private.source, +// y.private.source, +// ) { +// (Some(x), Some(y), Some(sx), Some(sy)) => { +// let k = merge_media(x, y, sx, sy); +// (Some(k.0), Some(k.1)) +// } +// (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), +// (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), +// (None, None, None, None) => (None, None), +// _ => bail!("invalid node. source and media dont agree."), +// }; +// Ok(Node { +// public: NodePublic { +// kind: x.public.kind.or(y.public.kind), +// title: x.public.title.or(y.public.title), +// subtitle: x.public.subtitle.or(y.public.subtitle), +// id: x.public.id.or(y.public.id), +// path: vec![], +// children: merge_children(x.public.children, y.public.children), +// tagline: x.public.tagline.or(y.public.tagline), +// description: x.public.description.or(y.public.description), +// release_date: x.public.release_date.or(y.public.release_date), +// index: x.public.index.or(y.public.index), +// media, +// ratings: x +// .public +// .ratings +// .into_iter() +// .chain(y.public.ratings) +// .collect(), +// federated: None, +// poster: x.public.poster.or(y.public.poster), +// backdrop: x.public.backdrop.or(y.public.backdrop), +// }, +// private: NodePrivate { +// id: x.private.id.or(y.private.id), +// source, +// backdrop: None, +// poster: None, +// }, +// }) +// } -fn merge_children(mut a: Vec<String>, b: Vec<String>) -> Vec<String> { - let acont = HashSet::<_, RandomState>::from_iter(a.clone()); - for el in b { - if !acont.contains(&el) { - a.push(el) - } - } - a -} -fn merge_media( - x: MediaInfo, - y: MediaInfo, - sx: Vec<TrackSource>, - sy: Vec<TrackSource>, -) -> (MediaInfo, Vec<TrackSource>) { - let mut tracks: Vec<SourceTrack> = Vec::new(); - let mut source: Vec<TrackSource> = Vec::new(); - for (t, s) in x - .tracks - .into_iter() - .zip(sx.into_iter()) - .chain(y.tracks.into_iter().zip(sy.into_iter())) - { - let mut remove = None; - let mut skip = false; - for (i, ot) in tracks.iter().enumerate() { - if t.name == ot.name - && t.kind == ot.kind - && t.language == ot.language - && t.codec == ot.codec - { - if t.federated.len() < ot.federated.len() { - remove = Some(i); - } else { - skip = true; - } - } - } - if let Some(r) = remove { - tracks.swap_remove(r); - source.swap_remove(r); - } - if !skip { - tracks.push(t); - source.push(s); - } - } - ( - MediaInfo { - duration: x.duration * 0.5 + y.duration * 0.5, - tracks, - chapters: if x.chapters.len() > y.chapters.len() { - x.chapters - } else { - y.chapters - }, - }, - source, - ) -} +// fn merge_children(mut a: Vec<String>, b: Vec<String>) -> Vec<String> { +// let acont = HashSet::<_, RandomState>::from_iter(a.clone()); +// for el in b { +// if !acont.contains(&el) { +// a.push(el) +// } +// } +// a +// } +// fn merge_media( +// x: MediaInfo, +// y: MediaInfo, +// sx: Vec<TrackSource>, +// sy: Vec<TrackSource>, +// ) -> (MediaInfo, Vec<TrackSource>) { +// let mut tracks: Vec<SourceTrack> = Vec::new(); +// let mut source: Vec<TrackSource> = Vec::new(); +// for (t, s) in x +// .tracks +// .into_iter() +// .zip(sx.into_iter()) +// .chain(y.tracks.into_iter().zip(sy.into_iter())) +// { +// let mut remove = None; +// let mut skip = false; +// for (i, ot) in tracks.iter().enumerate() { +// if t.name == ot.name +// && t.kind == ot.kind +// && t.language == ot.language +// && t.codec == ot.codec +// { +// if t.federated.len() < ot.federated.len() { +// remove = Some(i); +// } else { +// skip = true; +// } +// } +// } +// if let Some(r) = remove { +// tracks.swap_remove(r); +// source.swap_remove(r); +// } +// if !skip { +// tracks.push(t); +// source.push(s); +// } +// } +// ( +// MediaInfo { +// duration: x.duration * 0.5 + y.duration * 0.5, +// tracks, +// chapters: if x.chapters.len() > y.chapters.len() { +// x.chapters +// } else { +// y.chapters +// }, +// }, +// source, +// ) +// } -static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); +// static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); -#[async_recursion] -async fn import_remote( - id: String, - host: &str, - db: &impl ImportStorage, - session: &Arc<Session>, - index_path: &[usize], -) -> anyhow::Result<()> { - let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); - info!("loading federated node {id:?}"); +// #[async_recursion] +// async fn import_remote( +// id: String, +// host: &str, +// db: &impl ImportStorage, +// session: &Arc<Session>, +// index_path: &[usize], +// ) -> anyhow::Result<()> { +// let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); +// info!("loading federated node {id:?}"); - let mut node = session.node(&id).await.context("fetching remote node")?; - let mut node_ext = session - .node_extended(&id) - .await - .context("fetching extended remote node")?; +// let mut node = session.node(&id).await.context("fetching remote node")?; +// let mut node_ext = session +// .node_extended(&id) +// .await +// .context("fetching extended remote node")?; - let track_sources = if let Some(media) = &mut node.media { - let mut track_sources = Vec::new(); - for (i, t) in media.tracks.iter_mut().enumerate() { - t.federated.push(host.to_owned()); - track_sources.push(TrackSource::Remote(i)) - } - Some(track_sources) - } else { - None - }; +// let track_sources = if let Some(media) = &mut node.media { +// let mut track_sources = Vec::new(); +// for (i, t) in media.tracks.iter_mut().enumerate() { +// t.federated.push(host.to_owned()); +// track_sources.push(TrackSource::Remote(i)) +// } +// Some(track_sources) +// } else { +// None +// }; - drop(_permit); +// drop(_permit); - let mut node = Node { - public: node.clone(), - private: NodePrivate { - backdrop: None, - poster: None, - id: None, - source: track_sources, - }, - }; - make_opt_asset_federated(host, &mut node.public.backdrop)?; - make_opt_asset_federated(host, &mut node.public.poster)?; - for g in node_ext.people.values_mut() { - for a in g { - make_opt_asset_federated(host, &mut a.person.headshot)?; - } - } +// let mut node = Node { +// public: node.clone(), +// private: NodePrivate { +// backdrop: None, +// poster: None, +// id: None, +// source: track_sources, +// }, +// }; +// make_opt_asset_federated(host, &mut node.public.backdrop)?; +// make_opt_asset_federated(host, &mut node.public.poster)?; +// for g in node_ext.people.values_mut() { +// for a in g { +// make_opt_asset_federated(host, &mut a.person.headshot)?; +// } +// } - debug!("adding {id}"); - db.add_partial_node(&id, index_path, node.clone())?; - db.add_partial_node_ext(&id, index_path, node_ext)?; +// debug!("adding {id}"); +// db.add_partial_node(&id, index_path, node.clone())?; +// db.add_partial_node_ext(&id, index_path, node_ext)?; - let mut children: FuturesUnordered<_> = node - .public - .children - .iter() - .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) - .collect(); +// let mut children: FuturesUnordered<_> = node +// .public +// .children +// .iter() +// .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) +// .collect(); - while let Some(r) = children.next().await { - r?; - } +// while let Some(r) = children.next().await { +// r?; +// } - Ok(()) -} +// Ok(()) +// } -pub fn make_opt_asset_federated(host: &str, p: &mut Option<Asset>) -> anyhow::Result<()> { - if let Some(a) = p { - make_asset_federated(host, a)? - } - Ok(()) -} -pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { - let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; - *p = AssetInner::Federated { - host: host.to_owned(), - asset: data, - } - .ser(); - Ok(()) -} +// pub fn make_opt_asset_federated(host: &str, p: &mut Option<Asset>) -> anyhow::Result<()> { +// if let Some(a) = p { +// make_asset_federated(host, a)? +// } +// Ok(()) +// } +// pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { +// let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; +// *p = AssetInner::Federated { +// host: host.to_owned(), +// asset: data, +// } +// .ser(); +// Ok(()) +// } |