From e7ba3274e27fad755f15465581f5b403c82ab4d2 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 29 Jan 2025 16:07:58 +0100 Subject: prepare database refactor --- import/src/db.rs | 208 ------- import/src/lib.rs | 1665 ++++++++++++++++++++++++++--------------------------- 2 files changed, 832 insertions(+), 1041 deletions(-) delete mode 100644 import/src/db.rs (limited to 'import/src') diff --git a/import/src/db.rs b/import/src/db.rs deleted file mode 100644 index 7a3636c..0000000 --- a/import/src/db.rs +++ /dev/null @@ -1,208 +0,0 @@ -use anyhow::{anyhow, Context}; -use jellybase::database::{ - redb::{ReadableTable, ReadableTableMetadata}, - tantivy::{doc, DateTime}, - DataAcid, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT, -}; -use jellycommon::{ExtendedNode, Node}; -use log::info; -use std::collections::HashMap; -use std::sync::RwLock; - -pub(crate) trait ImportStorage: Sync { - fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>; - fn add_partial_node_ext( - &self, - id: &str, - index_path: &[usize], - node: ExtendedNode, - ) -> anyhow::Result<()>; - - fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>>; - fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>; - - fn pre_clean(&self) -> anyhow::Result<()>; - fn remove_prev_nodes(&self) -> anyhow::Result<()>; - fn finish(&self) -> anyhow::Result<()>; -} - -pub(crate) struct DatabaseStorage<'a> { - pub db: &'a DataAcid, -} -impl<'a> DatabaseStorage<'a> { - pub fn new(db: &'a DataAcid) -> Self { - Self { db } - } -} -impl ImportStorage for DatabaseStorage<'_> { - fn pre_clean(&self) -> anyhow::Result<()> { - let txn = self.db.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - if !table.is_empty()? { - info!("clearing temporary node tree from an aborted last import..."); - table.retain(|_, _| false)?; - } - drop(table); - txn.commit()?; - Ok(()) - } - fn remove_prev_nodes(&self) -> anyhow::Result<()> { - info!("removing old nodes..."); - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE)?; - table.retain(|_, _| false)?; - drop(table); - txn.commit()?; - Ok(()) - } - fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>> { - let txn = self.db.inner.begin_read()?; - let table = txn.open_table(T_NODE_IMPORT)?; - let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?; - Ok(value.value().0) - } - fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { - insert_complete_node(self.db, id, node) - } - - fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - - let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); - parts.push((index_path.to_vec(), node.clone())); - table.insert(id, Ser(parts))?; - - drop(table); - txn.commit()?; - Ok(()) - } - - fn add_partial_node_ext( - &self, - id: &str, - _index_path: &[usize], - node: ExtendedNode, - ) -> anyhow::Result<()> { - // TODO merge this - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_EXTENDED)?; - table.insert(id, Ser(node))?; - drop(table); - txn.commit()?; - Ok(()) - } - - fn finish(&self) -> anyhow::Result<()> { - info!("clearing temporary node tree..."); - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_IMPORT)?; - table.retain(|_, _| false)?; - drop(table); - txn.commit()?; - - self.db.node_index.writer.write().unwrap().commit()?; - Ok(()) - } -} - -pub type Parts = RwLock, Node)>>>; -pub(crate) struct MemoryStorage<'a> { - pub db: &'a DataAcid, - pub parts: Parts, -} -impl<'a> MemoryStorage<'a> { - pub fn new(db: &'a DataAcid) -> Self { - Self { - db, - parts: Default::default(), - } - } -} -impl ImportStorage for MemoryStorage<'_> { - fn pre_clean(&self) -> anyhow::Result<()> { - Ok(()) - } - fn remove_prev_nodes(&self) -> anyhow::Result<()> { - info!("removing old nodes..."); - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE)?; - table.retain(|_, _| false)?; - drop(table); - txn.commit()?; - self.db - .node_index - .writer - .read() - .unwrap() - .delete_all_documents()?; - self.db.node_index.writer.write().unwrap().commit()?; - Ok(()) - } - fn get_partial_parts(&self, id: &str) -> anyhow::Result, Node)>> { - Ok(self - .parts - .read() - .unwrap() - .get(id) - .ok_or(anyhow!("node parts not found"))? - .to_owned()) - } - fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> { - insert_complete_node(self.db, id, node) - } - - fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> { - self.parts - .write() - .unwrap() - .entry(id.to_owned()) - .or_default() - .push((index_path.to_owned(), node)); - Ok(()) - } - - fn add_partial_node_ext( - &self, - id: &str, - _index_path: &[usize], - node: ExtendedNode, - ) -> anyhow::Result<()> { - // TODO merge this - let txn = self.db.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_EXTENDED)?; - table.insert(id, Ser(node))?; - drop(table); - txn.commit()?; - Ok(()) - } - - fn finish(&self) -> anyhow::Result<()> { - self.db.node_index.writer.write().unwrap().commit()?; - Ok(()) - } -} - -fn insert_complete_node(db: &DataAcid, id: &str, node: Node) -> anyhow::Result<()> { - let txn_write = db.inner.begin_write()?; - let mut t_node = txn_write.open_table(T_NODE)?; - t_node.insert(id, Ser(node.clone()))?; - drop(t_node); - txn_write.commit()?; - - db - .node_index - .writer - .read() - .unwrap() - .add_document(doc!( - db.node_index.id => node.public.id.unwrap_or_default(), - db.node_index.title => node.public.title.unwrap_or_default(), - db.node_index.description => node.public.description.unwrap_or_default(), - db.node_index.releasedate => DateTime::from_timestamp_millis(node.public.release_date.unwrap_or_default()), - db.node_index.f_index => node.public.index.unwrap_or_default() as u64, - db.node_index.parent => node.public.path.last().cloned().unwrap_or_default(), - )) - .context("inserting document")?; - Ok(()) -} diff --git a/import/src/lib.rs b/import/src/lib.rs index 89088e9..df23f7e 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -3,840 +3,839 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2024 metamuffin */ -pub mod db; pub mod infojson; pub mod tmdb; pub mod trakt; -use anyhow::{anyhow, bail, Context, Error, Ok}; -use async_recursion::async_recursion; -use base64::Engine; -use db::{DatabaseStorage, ImportStorage, MemoryStorage}; -use futures::{stream::FuturesUnordered, StreamExt}; -use jellybase::{ - assetfed::AssetInner, - cache::{async_cache_file, cache_memory}, - database::DataAcid, - federation::Federation, - CONF, SECRETS, -}; -use jellyclient::Session; -use jellycommon::{ - chrono::{DateTime, Datelike}, - Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, - NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, -}; -use jellymatroska::read::EbmlReader; -use jellyremuxer::metadata::import_metadata; -use log::{debug, info, warn}; -use regex::Regex; -use std::{ - cmp::Ordering, - collections::HashSet, - ffi::OsStr, - fs::File, - hash::RandomState, - io::BufReader, - ops::Deref, - path::{Path, PathBuf}, - sync::{Arc, LazyLock}, -}; -use tmdb::{parse_release_date, Tmdb}; -use tokio::{ - io::AsyncWriteExt, - sync::{RwLock, Semaphore}, - task::spawn_blocking, -}; -use trakt::Trakt; - -static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); -pub static IMPORT_ERRORS: RwLock> = RwLock::const_new(Vec::new()); - -static RE_EPISODE_FILENAME: LazyLock = - LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); -static RE_YOUTUBE_ID: LazyLock = - LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); - -struct Apis { - trakt: Option, - tmdb: Option, -} - -pub fn is_importing() -> bool { - IMPORT_SEM.available_permits() == 0 -} - -pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { - let permit = IMPORT_SEM.try_acquire()?; - - let ap = Apis { - trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), - tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), - }; - - let e = if CONF.use_in_memory_import_storage { - import_inner(&MemoryStorage::new(db), fed, &ap).await - } else { - import_inner(&DatabaseStorage::new(db), fed, &ap).await - }; - let e = match e { - Result::Ok(e) => e, - Result::Err(e) => vec![e], - }; - *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); - - drop(permit); - Ok(()) -} - -pub(crate) async fn import_inner( - db: &impl ImportStorage, - fed: &Federation, - ap: &Apis, -) -> anyhow::Result> { - db.pre_clean()?; - info!("loading sources..."); - let mut errors = Vec::new(); - match import_path(CONF.library_path.clone(), vec![], db, fed, ap) - .await - .context("indexing") - { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - }; - db.remove_prev_nodes()?; - info!("merging nodes..."); - match generate_node_paths(db).context("merging nodes") { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - db.finish()?; - info!("import completed"); - Ok(errors) -} - -fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result> { - // TODO mark nodes done to allow recursion - fn traverse( - db: &impl ImportStorage, - id: String, - mut path: Vec, - parent_title: &str, - ) -> anyhow::Result> { - let mut errors = Vec::new(); - let node = { - let mut parts = db - .get_partial_parts(&id) - .context(anyhow!("path = {path:?}"))?; - - parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); - - let mut node = parts - .into_iter() - .map(|(_, x)| x) - .reduce(|x, y| merge_node(x, y).unwrap()) - .unwrap(); - - node.public.id = Some(id.to_owned()); - node.public.path = vec![]; // will be reconstructed in the next pass - node.public.federated = None; - - // TODO this discardes a lot of information. maybe change this. - if let Some(media) = &node.public.media { - for t in &media.tracks { - if let Some(host) = t.federated.first() { - if host != &CONF.hostname { - node.public.federated = Some(host.to_string()) - } - } - } - } - - if node.public.path.is_empty() { - node.public.path = path.clone(); - } - node.public.subtitle = match node.public.kind.unwrap_or_default() { - NodeKind::Movie => node.public.release_date.map(|date| { - format!( - "{}", - DateTime::from_timestamp_millis(date) - .unwrap() - .date_naive() - .year() - ) - }), - NodeKind::Season - | NodeKind::Episode - | NodeKind::ShortFormVideo - | NodeKind::Video => Some(parent_title.to_string()), - _ => None, - }; - - db.insert_complete_node(&id, node.clone())?; - - node - }; - - path.push(id); - let ps = node.public.title.unwrap_or_default(); - for c in node.public.children { - match traverse(db, c, path.clone(), &ps) { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - } - Ok(errors) - } - traverse(db, "library".to_string(), vec![], "Root") -} - -fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { - if x.is_empty() { - Ordering::Greater - } else if y.is_empty() { - Ordering::Less - } else { - match x[0].cmp(&y[0]) { - o @ (Ordering::Less | Ordering::Greater) => o, - Ordering::Equal => compare_index_path(&x[1..], &y[1..]), - } - } -} - -#[async_recursion] -async fn import_path( - path: PathBuf, - mut index_path: Vec, - db: &impl ImportStorage, - fed: &Federation, - ap: &Apis, -) -> anyhow::Result> { - let mut errors = Vec::new(); - if path.is_dir() { - let mut children_paths = path - .read_dir()? - .map(Result::unwrap) - .filter_map(|e| { - if e.path().extension() == Some(OsStr::new("yaml")) - || e.path().extension() == Some(OsStr::new("jelly")) - || e.metadata().unwrap().is_dir() - { - Some(e.path()) - } else { - None - } - }) - .collect::>(); - - children_paths.sort(); - - let mut children: FuturesUnordered<_> = children_paths - .into_iter() - .enumerate() - .map(|(i, p)| { - import_path( - p.clone(), - { - let mut path = index_path.clone(); - path.push(i); - path - }, - db, - fed, - ap, - ) - }) - .collect(); - - while let Some(k) = children.next().await { - match k { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - } - } else { - info!("reading {path:?}"); - let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; - let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { - serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? - } else { - serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? - }; - - for (i, s) in opts.sources.into_iter().enumerate() { - index_path.push(i); - if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) - .await - .context(anyhow!("processing source in {path:?}")) - { - errors.push(e) - } - index_path.pop(); - } - } - Ok(errors) -} - -static SEM_IMPORT: Semaphore = Semaphore::const_new(2); - -#[async_recursion] -async fn process_source( - id: String, - s: ImportSource, - path: &Path, - index_path: &[usize], - db: &impl ImportStorage, - fed: &Federation, - ap: &Apis, -) -> anyhow::Result> { - let mut errors = vec![]; - match s { - ImportSource::Override(mut n) => { - if let Some(backdrop) = n.private.backdrop.clone() { - n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); - } - if let Some(poster) = n.private.poster.clone() { - n.public.poster = Some(AssetInner::Library(poster).ser()); - } - db.add_partial_node(&id, index_path, n)? - } - ImportSource::Trakt { id: tid, kind } => { - info!("trakt {id}"); - let trakt = ap - .trakt - .as_ref() - .ok_or(anyhow!("trakt api key is required"))?; - let trakt_object = trakt - .lookup(kind, tid, true) - .await - .context("looking up metadata")?; - let trakt_people = trakt - .people(kind, tid, true) - .await - .context("looking up people")?; - - let mut node = Node::default(); - let mut node_ext = ExtendedNode::default(); - { - node.public.kind = Some(match kind { - TraktKind::Movie => NodeKind::Movie, - TraktKind::Show => NodeKind::Show, - TraktKind::Season => NodeKind::Season, - TraktKind::Episode => NodeKind::Episode, - _ => bail!("unexpected kind for trakt import"), - }); - node.public.title = Some(trakt_object.title.to_owned()); - if let Some(overview) = &trakt_object.overview { - node.public.description = Some(overview.to_owned()) - } - if let Some(tagline) = &trakt_object.tagline { - node.public.tagline = Some(tagline.to_owned()) - } - if let Some(rating) = &trakt_object.rating { - node.public.ratings.insert(Rating::Trakt, *rating); - } - for p in trakt_people.cast.iter() { - node_ext - .people - .entry(PeopleGroup::Cast) - .or_default() - .push(p.a()) - } - for (group, people) in trakt_people.crew.iter() { - for p in people { - node_ext.people.entry(group.a()).or_default().push(p.a()) - } - } - // TODO lazy assets - for ps in node_ext.people.values_mut() { - for p in ps { - if let Some(id) = p.person.ids.tmdb { - if let Some(tmdb) = &ap.tmdb { - let k = tmdb.person_image(id).await?; - if let Some(prof) = k.profiles.first() { - p.person.headshot = Some( - AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), - ); - } - } - } - } - } - } - db.add_partial_node(&id, index_path, node)?; - db.add_partial_node_ext(&id, index_path, node_ext)?; - - if let Some(tid) = trakt_object.ids.tmdb { - if let Some(kind) = match kind { - TraktKind::Movie => Some(TmdbKind::Movie), - TraktKind::Show => Some(TmdbKind::Tv), - TraktKind::Season => Some(TmdbKind::Tv), // TODO - TraktKind::Episode | TraktKind::Person | TraktKind::User => None, - } { - let mut index_path = index_path.to_vec(); - index_path.push(1); - match process_source( - id, - ImportSource::Tmdb { id: tid, kind }, - path, - &index_path, - db, - fed, - ap, - ) - .await - { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - } - } - } - } - ImportSource::Tmdb { id: tid, kind } => { - info!("tmdb {id}"); - let tmdb = ap - .tmdb - .as_ref() - .ok_or(anyhow!("tmdb api key is required"))?; - - let details = tmdb.details(kind, tid).await?; - - let mut node = Node::default(); - - // TODO lazy assets - if let Some(poster) = &details.poster_path { - node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser()); - } - if let Some(backdrop) = &details.backdrop_path { - node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser()); - } - - node.public.tagline = details.tagline.clone(); - node.public.title = details.title.clone(); - node.public.description = Some(details.overview.clone()); - node.public - .ratings - .insert(Rating::Tmdb, details.vote_average); - if let Some(date) = details.release_date.clone() { - node.public.release_date = - parse_release_date(&date).context("parsing release date")?; - } - - db.add_partial_node(&id, index_path, node)?; - } - ImportSource::Media { - path: mpath, - ignore_attachments, - ignore_chapters, - ignore_metadata, - } => { - info!("media import {mpath:?}"); - let abspath = CONF.media_path.join(&mpath); - if !abspath.exists() { - bail!("media missing at {abspath:?}"); - } - if abspath.is_dir() { - let mut node = Node::default(); - for f in abspath.read_dir()? { - let f = f?; - let child_path = f.path(); - if child_path.is_dir() - || matches!( - child_path.extension().map(|o| o.to_str().unwrap()), - Some("mks" | "mka" | "mkv" | "webm") - ) - { - let inf_id = - infer_id_from_path(&child_path).context("inferring child id")?; - - match process_source( - inf_id.clone(), - ImportSource::Media { - path: mpath.join(f.file_name()), - ignore_attachments, - ignore_chapters, - ignore_metadata, - }, - path, - index_path, - db, - fed, - ap, - ) - .await - .context(anyhow!("recursive media import: {:?}", f.path())) - { - Result::Ok(o) => errors.extend(o), - Result::Err(e) => errors.push(e), - }; - node.public.children.push(inf_id); - } - } - db.add_partial_node(&id, index_path, node)?; - } else if abspath.is_file() { - let _permit = SEM_IMPORT.acquire().await.unwrap(); - let metadata = { - let abspath = abspath.clone(); - let mpath = mpath.to_owned(); - spawn_blocking(move || { - cache_memory(&["probe", mpath.to_str().unwrap()], || { - let input = File::open(&abspath).context("opening media file")?; - let mut input = EbmlReader::new(BufReader::new(input)); - import_metadata(&mut input) - }) - }) - } - .await? - .context(anyhow!("probing {abspath:?}"))? - .deref() - .to_owned(); - - let mut node = Node::default(); - - if !ignore_metadata { - if let Some(captures) = - RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap()) - { - node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok()); - if let Some(title) = captures.get(6) { - node.public.title = Some(title.as_str().to_string()); - } - } - node.public.title = metadata.title; - node.public.description = metadata.description; - node.public.tagline = metadata.tagline; - } - node.public.media = Some(MediaInfo { - duration: metadata.duration, - tracks: metadata.tracks, - chapters: if ignore_chapters { - vec![] - } else { - metadata.chapters - }, - }); - node.private.source = Some( - metadata - .track_sources - .into_iter() - .map(|mut ts| { - ts.path = mpath.to_owned(); - TrackSource::Local(ts) - }) - .collect(), - ); - - if !ignore_attachments { - if let Some((filename, data)) = metadata.cover { - node.public.poster = Some( - AssetInner::Cache( - async_cache_file( - &["att-cover", mpath.to_str().unwrap(), &filename], - |mut f| async move { - f.write_all(&data).await?; - Ok(()) - }, - ) - .await?, - ) - .ser(), - ) - }; - - if let Some(infojson) = metadata.infojson { - let infojson: infojson::YVideo = - serde_json::from_str(&infojson).context("parsing infojson")?; - - node.public.kind = Some( - if infojson.duration.unwrap_or(0.) < 120. - && infojson.aspect_ratio.unwrap_or(2.) < 1. - { - NodeKind::ShortFormVideo - } else { - NodeKind::Video - }, - ); - node.public.title = Some(infojson.title); - node.public.description = Some(infojson.description); - node.public.tagline = Some(infojson.webpage_url); - node.public - .ratings - .insert(Rating::YoutubeViews, infojson.view_count as f64); - node.public.release_date = Some( - infojson::parse_upload_date(&infojson.upload_date) - .context("parsing upload date")?, - ); - node.public.ratings.extend( - infojson - .like_count - .map(|l| (Rating::YoutubeLikes, l as f64)), - ); - } - } - drop(_permit); - db.add_partial_node(&id, index_path, node)?; - } else { - warn!("non file/dir import ignored: {abspath:?}") - } - } - ImportSource::Federated { host } => { - info!("federated import of {id:?} from {host:?}"); - let session = fed.get_session(&host).await.context("creating session")?; - - import_remote(id.clone(), &host, db, &session, index_path) - .await - .context("federated import")? - } - ImportSource::AutoChildren { path: cpath } => { - info!("auto children at {path:?}"); - let paths = cpath - .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) - .read_dir()? - .map(Result::unwrap) - .map(|e| e.path()) - .filter(|e| { - e.extension() == Some(OsStr::new("yaml")) - || e.extension() == Some(OsStr::new("jelly")) - }); - - let mut children = Vec::new(); - for p in paths { - let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { - serde_json::from_reader(File::open(&p)?)? - } else { - serde_yaml::from_reader(File::open(&p)?)? - }; - if opts.id != id { - children.push(opts.id); - } - } - db.add_partial_node( - &id, - index_path, - Node { - private: NodePrivate::default(), - public: NodePublic { - children, - ..Default::default() - }, - }, - )?; - } - } - Ok(errors) -} - -pub fn infer_id_from_path(path: &Path) -> anyhow::Result { - let f = path - .file_stem() - .ok_or(anyhow!("no filename"))? - .to_str() - .ok_or(anyhow!("non utf8 filename"))?; - - if let Some(mat) = RE_YOUTUBE_ID.captures(f) { - let id = mat.get(1).unwrap().as_str(); - return Ok(format!("youtube-{id}")); - } - - let mut fsan = String::with_capacity(f.len()); - for c in f.chars() { - fsan.extend(match c { - 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), - ' ' => Some('-'), - _ => None, - }); - } - Ok(fsan) -} - -fn merge_node(x: Node, y: Node) -> anyhow::Result { - let (media, source) = match ( - x.public.media, - y.public.media, - x.private.source, - y.private.source, - ) { - (Some(x), Some(y), Some(sx), Some(sy)) => { - let k = merge_media(x, y, sx, sy); - (Some(k.0), Some(k.1)) - } - (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), - (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), - (None, None, None, None) => (None, None), - _ => bail!("invalid node. source and media dont agree."), - }; - Ok(Node { - public: NodePublic { - kind: x.public.kind.or(y.public.kind), - title: x.public.title.or(y.public.title), - subtitle: x.public.subtitle.or(y.public.subtitle), - id: x.public.id.or(y.public.id), - path: vec![], - children: merge_children(x.public.children, y.public.children), - tagline: x.public.tagline.or(y.public.tagline), - description: x.public.description.or(y.public.description), - release_date: x.public.release_date.or(y.public.release_date), - index: x.public.index.or(y.public.index), - media, - ratings: x - .public - .ratings - .into_iter() - .chain(y.public.ratings) - .collect(), - federated: None, - poster: x.public.poster.or(y.public.poster), - backdrop: x.public.backdrop.or(y.public.backdrop), - }, - private: NodePrivate { - id: x.private.id.or(y.private.id), - source, - backdrop: None, - poster: None, - }, - }) -} - -fn merge_children(mut a: Vec, b: Vec) -> Vec { - let acont = HashSet::<_, RandomState>::from_iter(a.clone()); - for el in b { - if !acont.contains(&el) { - a.push(el) - } - } - a -} -fn merge_media( - x: MediaInfo, - y: MediaInfo, - sx: Vec, - sy: Vec, -) -> (MediaInfo, Vec) { - let mut tracks: Vec = Vec::new(); - let mut source: Vec = Vec::new(); - for (t, s) in x - .tracks - .into_iter() - .zip(sx.into_iter()) - .chain(y.tracks.into_iter().zip(sy.into_iter())) - { - let mut remove = None; - let mut skip = false; - for (i, ot) in tracks.iter().enumerate() { - if t.name == ot.name - && t.kind == ot.kind - && t.language == ot.language - && t.codec == ot.codec - { - if t.federated.len() < ot.federated.len() { - remove = Some(i); - } else { - skip = true; - } - } - } - if let Some(r) = remove { - tracks.swap_remove(r); - source.swap_remove(r); - } - if !skip { - tracks.push(t); - source.push(s); - } - } - ( - MediaInfo { - duration: x.duration * 0.5 + y.duration * 0.5, - tracks, - chapters: if x.chapters.len() > y.chapters.len() { - x.chapters - } else { - y.chapters - }, - }, - source, - ) -} - -static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); - -#[async_recursion] -async fn import_remote( - id: String, - host: &str, - db: &impl ImportStorage, - session: &Arc, - index_path: &[usize], -) -> anyhow::Result<()> { - let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); - info!("loading federated node {id:?}"); - - let mut node = session.node(&id).await.context("fetching remote node")?; - let mut node_ext = session - .node_extended(&id) - .await - .context("fetching extended remote node")?; - - let track_sources = if let Some(media) = &mut node.media { - let mut track_sources = Vec::new(); - for (i, t) in media.tracks.iter_mut().enumerate() { - t.federated.push(host.to_owned()); - track_sources.push(TrackSource::Remote(i)) - } - Some(track_sources) - } else { - None - }; - - drop(_permit); - - let mut node = Node { - public: node.clone(), - private: NodePrivate { - backdrop: None, - poster: None, - id: None, - source: track_sources, - }, - }; - make_opt_asset_federated(host, &mut node.public.backdrop)?; - make_opt_asset_federated(host, &mut node.public.poster)?; - for g in node_ext.people.values_mut() { - for a in g { - make_opt_asset_federated(host, &mut a.person.headshot)?; - } - } - - debug!("adding {id}"); - db.add_partial_node(&id, index_path, node.clone())?; - db.add_partial_node_ext(&id, index_path, node_ext)?; - - let mut children: FuturesUnordered<_> = node - .public - .children - .iter() - .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) - .collect(); - - while let Some(r) = children.next().await { - r?; - } - - Ok(()) -} - -pub fn make_opt_asset_federated(host: &str, p: &mut Option) -> anyhow::Result<()> { - if let Some(a) = p { - make_asset_federated(host, a)? - } - Ok(()) -} -pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { - let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; - *p = AssetInner::Federated { - host: host.to_owned(), - asset: data, - } - .ser(); - Ok(()) -} +// use anyhow::{anyhow, bail, Context, Error, Ok}; +// use async_recursion::async_recursion; +// use base64::Engine; +// use db::{DatabaseStorage, ImportStorage, MemoryStorage}; +// use futures::{stream::FuturesUnordered, StreamExt}; +// use jellybase::{ +// assetfed::AssetInner, +// cache::{async_cache_file, cache_memory}, +// database::DataAcid, +// federation::Federation, +// CONF, SECRETS, +// }; +// use jellyclient::Session; +// use jellycommon::{ +// chrono::{DateTime, Datelike}, +// Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, +// NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, +// }; +// use jellymatroska::read::EbmlReader; +// use jellyremuxer::metadata::import_metadata; +// use log::{debug, info, warn}; +// use regex::Regex; +// use std::{ +// cmp::Ordering, +// collections::HashSet, +// ffi::OsStr, +// fs::File, +// hash::RandomState, +// io::BufReader, +// ops::Deref, +// path::{Path, PathBuf}, +// sync::{Arc, LazyLock}, +// }; +// use tmdb::{parse_release_date, Tmdb}; +// use tokio::{ +// io::AsyncWriteExt, +// sync::{RwLock, Semaphore}, +// task::spawn_blocking, +// }; +// use trakt::Trakt; + +// static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); +// pub static IMPORT_ERRORS: RwLock> = RwLock::const_new(Vec::new()); + +// static RE_EPISODE_FILENAME: LazyLock = +// LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); +// static RE_YOUTUBE_ID: LazyLock = +// LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); + +// struct Apis { +// trakt: Option, +// tmdb: Option, +// } + +// pub fn is_importing() -> bool { +// IMPORT_SEM.available_permits() == 0 +// } + +// pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { +// let permit = IMPORT_SEM.try_acquire()?; + +// let ap = Apis { +// trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), +// tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), +// }; + +// let e = if CONF.use_in_memory_import_storage { +// import_inner(&MemoryStorage::new(db), fed, &ap).await +// } else { +// import_inner(&DatabaseStorage::new(db), fed, &ap).await +// }; +// let e = match e { +// Result::Ok(e) => e, +// Result::Err(e) => vec![e], +// }; +// *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); + +// drop(permit); +// Ok(()) +// } + +// pub(crate) async fn import_inner( +// db: &impl ImportStorage, +// fed: &Federation, +// ap: &Apis, +// ) -> anyhow::Result> { +// db.pre_clean()?; +// info!("loading sources..."); +// let mut errors = Vec::new(); +// match import_path(CONF.library_path.clone(), vec![], db, fed, ap) +// .await +// .context("indexing") +// { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// }; +// db.remove_prev_nodes()?; +// info!("merging nodes..."); +// match generate_node_paths(db).context("merging nodes") { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// db.finish()?; +// info!("import completed"); +// Ok(errors) +// } + +// fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result> { +// // TODO mark nodes done to allow recursion +// fn traverse( +// db: &impl ImportStorage, +// id: String, +// mut path: Vec, +// parent_title: &str, +// ) -> anyhow::Result> { +// let mut errors = Vec::new(); +// let node = { +// let mut parts = db +// .get_partial_parts(&id) +// .context(anyhow!("path = {path:?}"))?; + +// parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); + +// let mut node = parts +// .into_iter() +// .map(|(_, x)| x) +// .reduce(|x, y| merge_node(x, y).unwrap()) +// .unwrap(); + +// node.public.id = Some(id.to_owned()); +// node.public.path = vec![]; // will be reconstructed in the next pass +// node.public.federated = None; + +// // TODO this discardes a lot of information. maybe change this. +// if let Some(media) = &node.public.media { +// for t in &media.tracks { +// if let Some(host) = t.federated.first() { +// if host != &CONF.hostname { +// node.public.federated = Some(host.to_string()) +// } +// } +// } +// } + +// if node.public.path.is_empty() { +// node.public.path = path.clone(); +// } +// node.public.subtitle = match node.public.kind.unwrap_or_default() { +// NodeKind::Movie => node.public.release_date.map(|date| { +// format!( +// "{}", +// DateTime::from_timestamp_millis(date) +// .unwrap() +// .date_naive() +// .year() +// ) +// }), +// NodeKind::Season +// | NodeKind::Episode +// | NodeKind::ShortFormVideo +// | NodeKind::Video => Some(parent_title.to_string()), +// _ => None, +// }; + +// db.insert_complete_node(&id, node.clone())?; + +// node +// }; + +// path.push(id); +// let ps = node.public.title.unwrap_or_default(); +// for c in node.public.children { +// match traverse(db, c, path.clone(), &ps) { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// } +// Ok(errors) +// } +// traverse(db, "library".to_string(), vec![], "Root") +// } + +// fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { +// if x.is_empty() { +// Ordering::Greater +// } else if y.is_empty() { +// Ordering::Less +// } else { +// match x[0].cmp(&y[0]) { +// o @ (Ordering::Less | Ordering::Greater) => o, +// Ordering::Equal => compare_index_path(&x[1..], &y[1..]), +// } +// } +// } + +// #[async_recursion] +// async fn import_path( +// path: PathBuf, +// mut index_path: Vec, +// db: &impl ImportStorage, +// fed: &Federation, +// ap: &Apis, +// ) -> anyhow::Result> { +// let mut errors = Vec::new(); +// if path.is_dir() { +// let mut children_paths = path +// .read_dir()? +// .map(Result::unwrap) +// .filter_map(|e| { +// if e.path().extension() == Some(OsStr::new("yaml")) +// || e.path().extension() == Some(OsStr::new("jelly")) +// || e.metadata().unwrap().is_dir() +// { +// Some(e.path()) +// } else { +// None +// } +// }) +// .collect::>(); + +// children_paths.sort(); + +// let mut children: FuturesUnordered<_> = children_paths +// .into_iter() +// .enumerate() +// .map(|(i, p)| { +// import_path( +// p.clone(), +// { +// let mut path = index_path.clone(); +// path.push(i); +// path +// }, +// db, +// fed, +// ap, +// ) +// }) +// .collect(); + +// while let Some(k) = children.next().await { +// match k { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// } +// } else { +// info!("reading {path:?}"); +// let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; +// let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { +// serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? +// } else { +// serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? +// }; + +// for (i, s) in opts.sources.into_iter().enumerate() { +// index_path.push(i); +// if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) +// .await +// .context(anyhow!("processing source in {path:?}")) +// { +// errors.push(e) +// } +// index_path.pop(); +// } +// } +// Ok(errors) +// } + +// static SEM_IMPORT: Semaphore = Semaphore::const_new(2); + +// #[async_recursion] +// async fn process_source( +// id: String, +// s: ImportSource, +// path: &Path, +// index_path: &[usize], +// db: &impl ImportStorage, +// fed: &Federation, +// ap: &Apis, +// ) -> anyhow::Result> { +// let mut errors = vec![]; +// match s { +// ImportSource::Override(mut n) => { +// if let Some(backdrop) = n.private.backdrop.clone() { +// n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); +// } +// if let Some(poster) = n.private.poster.clone() { +// n.public.poster = Some(AssetInner::Library(poster).ser()); +// } +// db.add_partial_node(&id, index_path, n)? +// } +// ImportSource::Trakt { id: tid, kind } => { +// info!("trakt {id}"); +// let trakt = ap +// .trakt +// .as_ref() +// .ok_or(anyhow!("trakt api key is required"))?; +// let trakt_object = trakt +// .lookup(kind, tid, true) +// .await +// .context("looking up metadata")?; +// let trakt_people = trakt +// .people(kind, tid, true) +// .await +// .context("looking up people")?; + +// let mut node = Node::default(); +// let mut node_ext = ExtendedNode::default(); +// { +// node.public.kind = Some(match kind { +// TraktKind::Movie => NodeKind::Movie, +// TraktKind::Show => NodeKind::Show, +// TraktKind::Season => NodeKind::Season, +// TraktKind::Episode => NodeKind::Episode, +// _ => bail!("unexpected kind for trakt import"), +// }); +// node.public.title = Some(trakt_object.title.to_owned()); +// if let Some(overview) = &trakt_object.overview { +// node.public.description = Some(overview.to_owned()) +// } +// if let Some(tagline) = &trakt_object.tagline { +// node.public.tagline = Some(tagline.to_owned()) +// } +// if let Some(rating) = &trakt_object.rating { +// node.public.ratings.insert(Rating::Trakt, *rating); +// } +// for p in trakt_people.cast.iter() { +// node_ext +// .people +// .entry(PeopleGroup::Cast) +// .or_default() +// .push(p.a()) +// } +// for (group, people) in trakt_people.crew.iter() { +// for p in people { +// node_ext.people.entry(group.a()).or_default().push(p.a()) +// } +// } +// // TODO lazy assets +// for ps in node_ext.people.values_mut() { +// for p in ps { +// if let Some(id) = p.person.ids.tmdb { +// if let Some(tmdb) = &ap.tmdb { +// let k = tmdb.person_image(id).await?; +// if let Some(prof) = k.profiles.first() { +// p.person.headshot = Some( +// AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), +// ); +// } +// } +// } +// } +// } +// } +// db.add_partial_node(&id, index_path, node)?; +// db.add_partial_node_ext(&id, index_path, node_ext)?; + +// if let Some(tid) = trakt_object.ids.tmdb { +// if let Some(kind) = match kind { +// TraktKind::Movie => Some(TmdbKind::Movie), +// TraktKind::Show => Some(TmdbKind::Tv), +// TraktKind::Season => Some(TmdbKind::Tv), // TODO +// TraktKind::Episode | TraktKind::Person | TraktKind::User => None, +// } { +// let mut index_path = index_path.to_vec(); +// index_path.push(1); +// match process_source( +// id, +// ImportSource::Tmdb { id: tid, kind }, +// path, +// &index_path, +// db, +// fed, +// ap, +// ) +// .await +// { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// } +// } +// } +// } +// ImportSource::Tmdb { id: tid, kind } => { +// info!("tmdb {id}"); +// let tmdb = ap +// .tmdb +// .as_ref() +// .ok_or(anyhow!("tmdb api key is required"))?; + +// let details = tmdb.details(kind, tid).await?; + +// let mut node = Node::default(); + +// // TODO lazy assets +// if let Some(poster) = &details.poster_path { +// node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser()); +// } +// if let Some(backdrop) = &details.backdrop_path { +// node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser()); +// } + +// node.public.tagline = details.tagline.clone(); +// node.public.title = details.title.clone(); +// node.public.description = Some(details.overview.clone()); +// node.public +// .ratings +// .insert(Rating::Tmdb, details.vote_average); +// if let Some(date) = details.release_date.clone() { +// node.public.release_date = +// parse_release_date(&date).context("parsing release date")?; +// } + +// db.add_partial_node(&id, index_path, node)?; +// } +// ImportSource::Media { +// path: mpath, +// ignore_attachments, +// ignore_chapters, +// ignore_metadata, +// } => { +// info!("media import {mpath:?}"); +// let abspath = CONF.media_path.join(&mpath); +// if !abspath.exists() { +// bail!("media missing at {abspath:?}"); +// } +// if abspath.is_dir() { +// let mut node = Node::default(); +// for f in abspath.read_dir()? { +// let f = f?; +// let child_path = f.path(); +// if child_path.is_dir() +// || matches!( +// child_path.extension().map(|o| o.to_str().unwrap()), +// Some("mks" | "mka" | "mkv" | "webm") +// ) +// { +// let inf_id = +// infer_id_from_path(&child_path).context("inferring child id")?; + +// match process_source( +// inf_id.clone(), +// ImportSource::Media { +// path: mpath.join(f.file_name()), +// ignore_attachments, +// ignore_chapters, +// ignore_metadata, +// }, +// path, +// index_path, +// db, +// fed, +// ap, +// ) +// .await +// .context(anyhow!("recursive media import: {:?}", f.path())) +// { +// Result::Ok(o) => errors.extend(o), +// Result::Err(e) => errors.push(e), +// }; +// node.public.children.push(inf_id); +// } +// } +// db.add_partial_node(&id, index_path, node)?; +// } else if abspath.is_file() { +// let _permit = SEM_IMPORT.acquire().await.unwrap(); +// let metadata = { +// let abspath = abspath.clone(); +// let mpath = mpath.to_owned(); +// spawn_blocking(move || { +// cache_memory(&["probe", mpath.to_str().unwrap()], || { +// let input = File::open(&abspath).context("opening media file")?; +// let mut input = EbmlReader::new(BufReader::new(input)); +// import_metadata(&mut input) +// }) +// }) +// } +// .await? +// .context(anyhow!("probing {abspath:?}"))? +// .deref() +// .to_owned(); + +// let mut node = Node::default(); + +// if !ignore_metadata { +// if let Some(captures) = +// RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap()) +// { +// node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok()); +// if let Some(title) = captures.get(6) { +// node.public.title = Some(title.as_str().to_string()); +// } +// } +// node.public.title = metadata.title; +// node.public.description = metadata.description; +// node.public.tagline = metadata.tagline; +// } +// node.public.media = Some(MediaInfo { +// duration: metadata.duration, +// tracks: metadata.tracks, +// chapters: if ignore_chapters { +// vec![] +// } else { +// metadata.chapters +// }, +// }); +// node.private.source = Some( +// metadata +// .track_sources +// .into_iter() +// .map(|mut ts| { +// ts.path = mpath.to_owned(); +// TrackSource::Local(ts) +// }) +// .collect(), +// ); + +// if !ignore_attachments { +// if let Some((filename, data)) = metadata.cover { +// node.public.poster = Some( +// AssetInner::Cache( +// async_cache_file( +// &["att-cover", mpath.to_str().unwrap(), &filename], +// |mut f| async move { +// f.write_all(&data).await?; +// Ok(()) +// }, +// ) +// .await?, +// ) +// .ser(), +// ) +// }; + +// if let Some(infojson) = metadata.infojson { +// let infojson: infojson::YVideo = +// serde_json::from_str(&infojson).context("parsing infojson")?; + +// node.public.kind = Some( +// if infojson.duration.unwrap_or(0.) < 120. +// && infojson.aspect_ratio.unwrap_or(2.) < 1. +// { +// NodeKind::ShortFormVideo +// } else { +// NodeKind::Video +// }, +// ); +// node.public.title = Some(infojson.title); +// node.public.description = Some(infojson.description); +// node.public.tagline = Some(infojson.webpage_url); +// node.public +// .ratings +// .insert(Rating::YoutubeViews, infojson.view_count as f64); +// node.public.release_date = Some( +// infojson::parse_upload_date(&infojson.upload_date) +// .context("parsing upload date")?, +// ); +// node.public.ratings.extend( +// infojson +// .like_count +// .map(|l| (Rating::YoutubeLikes, l as f64)), +// ); +// } +// } +// drop(_permit); +// db.add_partial_node(&id, index_path, node)?; +// } else { +// warn!("non file/dir import ignored: {abspath:?}") +// } +// } +// ImportSource::Federated { host } => { +// info!("federated import of {id:?} from {host:?}"); +// let session = fed.get_session(&host).await.context("creating session")?; + +// import_remote(id.clone(), &host, db, &session, index_path) +// .await +// .context("federated import")? +// } +// ImportSource::AutoChildren { path: cpath } => { +// info!("auto children at {path:?}"); +// let paths = cpath +// .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) +// .read_dir()? +// .map(Result::unwrap) +// .map(|e| e.path()) +// .filter(|e| { +// e.extension() == Some(OsStr::new("yaml")) +// || e.extension() == Some(OsStr::new("jelly")) +// }); + +// let mut children = Vec::new(); +// for p in paths { +// let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { +// serde_json::from_reader(File::open(&p)?)? +// } else { +// serde_yaml::from_reader(File::open(&p)?)? +// }; +// if opts.id != id { +// children.push(opts.id); +// } +// } +// db.add_partial_node( +// &id, +// index_path, +// Node { +// private: NodePrivate::default(), +// public: NodePublic { +// children, +// ..Default::default() +// }, +// }, +// )?; +// } +// } +// Ok(errors) +// } + +// pub fn infer_id_from_path(path: &Path) -> anyhow::Result { +// let f = path +// .file_stem() +// .ok_or(anyhow!("no filename"))? +// .to_str() +// .ok_or(anyhow!("non utf8 filename"))?; + +// if let Some(mat) = RE_YOUTUBE_ID.captures(f) { +// let id = mat.get(1).unwrap().as_str(); +// return Ok(format!("youtube-{id}")); +// } + +// let mut fsan = String::with_capacity(f.len()); +// for c in f.chars() { +// fsan.extend(match c { +// 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), +// ' ' => Some('-'), +// _ => None, +// }); +// } +// Ok(fsan) +// } + +// fn merge_node(x: Node, y: Node) -> anyhow::Result { +// let (media, source) = match ( +// x.public.media, +// y.public.media, +// x.private.source, +// y.private.source, +// ) { +// (Some(x), Some(y), Some(sx), Some(sy)) => { +// let k = merge_media(x, y, sx, sy); +// (Some(k.0), Some(k.1)) +// } +// (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), +// (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), +// (None, None, None, None) => (None, None), +// _ => bail!("invalid node. source and media dont agree."), +// }; +// Ok(Node { +// public: NodePublic { +// kind: x.public.kind.or(y.public.kind), +// title: x.public.title.or(y.public.title), +// subtitle: x.public.subtitle.or(y.public.subtitle), +// id: x.public.id.or(y.public.id), +// path: vec![], +// children: merge_children(x.public.children, y.public.children), +// tagline: x.public.tagline.or(y.public.tagline), +// description: x.public.description.or(y.public.description), +// release_date: x.public.release_date.or(y.public.release_date), +// index: x.public.index.or(y.public.index), +// media, +// ratings: x +// .public +// .ratings +// .into_iter() +// .chain(y.public.ratings) +// .collect(), +// federated: None, +// poster: x.public.poster.or(y.public.poster), +// backdrop: x.public.backdrop.or(y.public.backdrop), +// }, +// private: NodePrivate { +// id: x.private.id.or(y.private.id), +// source, +// backdrop: None, +// poster: None, +// }, +// }) +// } + +// fn merge_children(mut a: Vec, b: Vec) -> Vec { +// let acont = HashSet::<_, RandomState>::from_iter(a.clone()); +// for el in b { +// if !acont.contains(&el) { +// a.push(el) +// } +// } +// a +// } +// fn merge_media( +// x: MediaInfo, +// y: MediaInfo, +// sx: Vec, +// sy: Vec, +// ) -> (MediaInfo, Vec) { +// let mut tracks: Vec = Vec::new(); +// let mut source: Vec = Vec::new(); +// for (t, s) in x +// .tracks +// .into_iter() +// .zip(sx.into_iter()) +// .chain(y.tracks.into_iter().zip(sy.into_iter())) +// { +// let mut remove = None; +// let mut skip = false; +// for (i, ot) in tracks.iter().enumerate() { +// if t.name == ot.name +// && t.kind == ot.kind +// && t.language == ot.language +// && t.codec == ot.codec +// { +// if t.federated.len() < ot.federated.len() { +// remove = Some(i); +// } else { +// skip = true; +// } +// } +// } +// if let Some(r) = remove { +// tracks.swap_remove(r); +// source.swap_remove(r); +// } +// if !skip { +// tracks.push(t); +// source.push(s); +// } +// } +// ( +// MediaInfo { +// duration: x.duration * 0.5 + y.duration * 0.5, +// tracks, +// chapters: if x.chapters.len() > y.chapters.len() { +// x.chapters +// } else { +// y.chapters +// }, +// }, +// source, +// ) +// } + +// static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); + +// #[async_recursion] +// async fn import_remote( +// id: String, +// host: &str, +// db: &impl ImportStorage, +// session: &Arc, +// index_path: &[usize], +// ) -> anyhow::Result<()> { +// let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); +// info!("loading federated node {id:?}"); + +// let mut node = session.node(&id).await.context("fetching remote node")?; +// let mut node_ext = session +// .node_extended(&id) +// .await +// .context("fetching extended remote node")?; + +// let track_sources = if let Some(media) = &mut node.media { +// let mut track_sources = Vec::new(); +// for (i, t) in media.tracks.iter_mut().enumerate() { +// t.federated.push(host.to_owned()); +// track_sources.push(TrackSource::Remote(i)) +// } +// Some(track_sources) +// } else { +// None +// }; + +// drop(_permit); + +// let mut node = Node { +// public: node.clone(), +// private: NodePrivate { +// backdrop: None, +// poster: None, +// id: None, +// source: track_sources, +// }, +// }; +// make_opt_asset_federated(host, &mut node.public.backdrop)?; +// make_opt_asset_federated(host, &mut node.public.poster)?; +// for g in node_ext.people.values_mut() { +// for a in g { +// make_opt_asset_federated(host, &mut a.person.headshot)?; +// } +// } + +// debug!("adding {id}"); +// db.add_partial_node(&id, index_path, node.clone())?; +// db.add_partial_node_ext(&id, index_path, node_ext)?; + +// let mut children: FuturesUnordered<_> = node +// .public +// .children +// .iter() +// .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) +// .collect(); + +// while let Some(r) = children.next().await { +// r?; +// } + +// Ok(()) +// } + +// pub fn make_opt_asset_federated(host: &str, p: &mut Option) -> anyhow::Result<()> { +// if let Some(a) = p { +// make_asset_federated(host, a)? +// } +// Ok(()) +// } +// pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { +// let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; +// *p = AssetInner::Federated { +// host: host.to_owned(), +// asset: data, +// } +// .ser(); +// Ok(()) +// } -- cgit v1.2.3-70-g09d2