aboutsummaryrefslogtreecommitdiff
path: root/import
diff options
context:
space:
mode:
Diffstat (limited to 'import')
-rw-r--r--import/Cargo.toml6
-rw-r--r--import/src/db.rs208
-rw-r--r--import/src/lib.rs1551
3 files changed, 778 insertions, 987 deletions
diff --git a/import/Cargo.toml b/import/Cargo.toml
index 27872b7..e218cb0 100644
--- a/import/Cargo.toml
+++ b/import/Cargo.toml
@@ -11,13 +11,13 @@ jellymatroska = { path = "../matroska" }
jellyremuxer = { path = "../remuxer" }
log = { workspace = true }
-anyhow = "1.0.92"
+anyhow = "1.0.95"
reqwest = { workspace = true }
urlencoding = "2.1.3"
bincode = { version = "2.0.0-rc.3", features = ["derive"] }
-serde = { version = "1.0.214", features = ["derive"] }
-serde_json = "1.0.132"
+serde = { version = "1.0.217", features = ["derive"] }
+serde_json = "1.0.138"
serde_yaml = "0.9.34"
async-recursion = "1.1.1"
diff --git a/import/src/db.rs b/import/src/db.rs
deleted file mode 100644
index 7a3636c..0000000
--- a/import/src/db.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use anyhow::{anyhow, Context};
-use jellybase::database::{
- redb::{ReadableTable, ReadableTableMetadata},
- tantivy::{doc, DateTime},
- DataAcid, Ser, T_NODE, T_NODE_EXTENDED, T_NODE_IMPORT,
-};
-use jellycommon::{ExtendedNode, Node};
-use log::info;
-use std::collections::HashMap;
-use std::sync::RwLock;
-
-pub(crate) trait ImportStorage: Sync {
- fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()>;
- fn add_partial_node_ext(
- &self,
- id: &str,
- index_path: &[usize],
- node: ExtendedNode,
- ) -> anyhow::Result<()>;
-
- fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>>;
- fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()>;
-
- fn pre_clean(&self) -> anyhow::Result<()>;
- fn remove_prev_nodes(&self) -> anyhow::Result<()>;
- fn finish(&self) -> anyhow::Result<()>;
-}
-
-pub(crate) struct DatabaseStorage<'a> {
- pub db: &'a DataAcid,
-}
-impl<'a> DatabaseStorage<'a> {
- pub fn new(db: &'a DataAcid) -> Self {
- Self { db }
- }
-}
-impl ImportStorage for DatabaseStorage<'_> {
- fn pre_clean(&self) -> anyhow::Result<()> {
- let txn = self.db.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
- if !table.is_empty()? {
- info!("clearing temporary node tree from an aborted last import...");
- table.retain(|_, _| false)?;
- }
- drop(table);
- txn.commit()?;
- Ok(())
- }
- fn remove_prev_nodes(&self) -> anyhow::Result<()> {
- info!("removing old nodes...");
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE)?;
- table.retain(|_, _| false)?;
- drop(table);
- txn.commit()?;
- Ok(())
- }
- fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
- let txn = self.db.inner.begin_read()?;
- let table = txn.open_table(T_NODE_IMPORT)?;
- let value = table.get(id)?.ok_or(anyhow!("node parts not found"))?;
- Ok(value.value().0)
- }
- fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
- insert_complete_node(self.db, id, node)
- }
-
- fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
-
- let mut parts = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
- parts.push((index_path.to_vec(), node.clone()));
- table.insert(id, Ser(parts))?;
-
- drop(table);
- txn.commit()?;
- Ok(())
- }
-
- fn add_partial_node_ext(
- &self,
- id: &str,
- _index_path: &[usize],
- node: ExtendedNode,
- ) -> anyhow::Result<()> {
- // TODO merge this
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_EXTENDED)?;
- table.insert(id, Ser(node))?;
- drop(table);
- txn.commit()?;
- Ok(())
- }
-
- fn finish(&self) -> anyhow::Result<()> {
- info!("clearing temporary node tree...");
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_IMPORT)?;
- table.retain(|_, _| false)?;
- drop(table);
- txn.commit()?;
-
- self.db.node_index.writer.write().unwrap().commit()?;
- Ok(())
- }
-}
-
-pub type Parts = RwLock<HashMap<String, Vec<(Vec<usize>, Node)>>>;
-pub(crate) struct MemoryStorage<'a> {
- pub db: &'a DataAcid,
- pub parts: Parts,
-}
-impl<'a> MemoryStorage<'a> {
- pub fn new(db: &'a DataAcid) -> Self {
- Self {
- db,
- parts: Default::default(),
- }
- }
-}
-impl ImportStorage for MemoryStorage<'_> {
- fn pre_clean(&self) -> anyhow::Result<()> {
- Ok(())
- }
- fn remove_prev_nodes(&self) -> anyhow::Result<()> {
- info!("removing old nodes...");
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE)?;
- table.retain(|_, _| false)?;
- drop(table);
- txn.commit()?;
- self.db
- .node_index
- .writer
- .read()
- .unwrap()
- .delete_all_documents()?;
- self.db.node_index.writer.write().unwrap().commit()?;
- Ok(())
- }
- fn get_partial_parts(&self, id: &str) -> anyhow::Result<Vec<(Vec<usize>, Node)>> {
- Ok(self
- .parts
- .read()
- .unwrap()
- .get(id)
- .ok_or(anyhow!("node parts not found"))?
- .to_owned())
- }
- fn insert_complete_node(&self, id: &str, node: Node) -> anyhow::Result<()> {
- insert_complete_node(self.db, id, node)
- }
-
- fn add_partial_node(&self, id: &str, index_path: &[usize], node: Node) -> anyhow::Result<()> {
- self.parts
- .write()
- .unwrap()
- .entry(id.to_owned())
- .or_default()
- .push((index_path.to_owned(), node));
- Ok(())
- }
-
- fn add_partial_node_ext(
- &self,
- id: &str,
- _index_path: &[usize],
- node: ExtendedNode,
- ) -> anyhow::Result<()> {
- // TODO merge this
- let txn = self.db.inner.begin_write()?;
- let mut table = txn.open_table(T_NODE_EXTENDED)?;
- table.insert(id, Ser(node))?;
- drop(table);
- txn.commit()?;
- Ok(())
- }
-
- fn finish(&self) -> anyhow::Result<()> {
- self.db.node_index.writer.write().unwrap().commit()?;
- Ok(())
- }
-}
-
-fn insert_complete_node(db: &DataAcid, id: &str, node: Node) -> anyhow::Result<()> {
- let txn_write = db.inner.begin_write()?;
- let mut t_node = txn_write.open_table(T_NODE)?;
- t_node.insert(id, Ser(node.clone()))?;
- drop(t_node);
- txn_write.commit()?;
-
- db
- .node_index
- .writer
- .read()
- .unwrap()
- .add_document(doc!(
- db.node_index.id => node.public.id.unwrap_or_default(),
- db.node_index.title => node.public.title.unwrap_or_default(),
- db.node_index.description => node.public.description.unwrap_or_default(),
- db.node_index.releasedate => DateTime::from_timestamp_millis(node.public.release_date.unwrap_or_default()),
- db.node_index.f_index => node.public.index.unwrap_or_default() as u64,
- db.node_index.parent => node.public.path.last().cloned().unwrap_or_default(),
- ))
- .context("inserting document")?;
- Ok(())
-}
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 89088e9..df23f7e 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -3,840 +3,839 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2024 metamuffin <metamuffin.org>
*/
-pub mod db;
pub mod infojson;
pub mod tmdb;
pub mod trakt;
-use anyhow::{anyhow, bail, Context, Error, Ok};
-use async_recursion::async_recursion;
-use base64::Engine;
-use db::{DatabaseStorage, ImportStorage, MemoryStorage};
-use futures::{stream::FuturesUnordered, StreamExt};
-use jellybase::{
- assetfed::AssetInner,
- cache::{async_cache_file, cache_memory},
- database::DataAcid,
- federation::Federation,
- CONF, SECRETS,
-};
-use jellyclient::Session;
-use jellycommon::{
- chrono::{DateTime, Datelike},
- Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate,
- NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind,
-};
-use jellymatroska::read::EbmlReader;
-use jellyremuxer::metadata::import_metadata;
-use log::{debug, info, warn};
-use regex::Regex;
-use std::{
- cmp::Ordering,
- collections::HashSet,
- ffi::OsStr,
- fs::File,
- hash::RandomState,
- io::BufReader,
- ops::Deref,
- path::{Path, PathBuf},
- sync::{Arc, LazyLock},
-};
-use tmdb::{parse_release_date, Tmdb};
-use tokio::{
- io::AsyncWriteExt,
- sync::{RwLock, Semaphore},
- task::spawn_blocking,
-};
-use trakt::Trakt;
+// use anyhow::{anyhow, bail, Context, Error, Ok};
+// use async_recursion::async_recursion;
+// use base64::Engine;
+// use db::{DatabaseStorage, ImportStorage, MemoryStorage};
+// use futures::{stream::FuturesUnordered, StreamExt};
+// use jellybase::{
+// assetfed::AssetInner,
+// cache::{async_cache_file, cache_memory},
+// database::DataAcid,
+// federation::Federation,
+// CONF, SECRETS,
+// };
+// use jellyclient::Session;
+// use jellycommon::{
+// chrono::{DateTime, Datelike},
+// Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate,
+// NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind,
+// };
+// use jellymatroska::read::EbmlReader;
+// use jellyremuxer::metadata::import_metadata;
+// use log::{debug, info, warn};
+// use regex::Regex;
+// use std::{
+// cmp::Ordering,
+// collections::HashSet,
+// ffi::OsStr,
+// fs::File,
+// hash::RandomState,
+// io::BufReader,
+// ops::Deref,
+// path::{Path, PathBuf},
+// sync::{Arc, LazyLock},
+// };
+// use tmdb::{parse_release_date, Tmdb};
+// use tokio::{
+// io::AsyncWriteExt,
+// sync::{RwLock, Semaphore},
+// task::spawn_blocking,
+// };
+// use trakt::Trakt;
-static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
-pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new());
+// static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
+// pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new());
-static RE_EPISODE_FILENAME: LazyLock<Regex> =
- LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap());
-static RE_YOUTUBE_ID: LazyLock<Regex> =
- LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap());
+// static RE_EPISODE_FILENAME: LazyLock<Regex> =
+// LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap());
+// static RE_YOUTUBE_ID: LazyLock<Regex> =
+// LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap());
-struct Apis {
- trakt: Option<Trakt>,
- tmdb: Option<Tmdb>,
-}
+// struct Apis {
+// trakt: Option<Trakt>,
+// tmdb: Option<Tmdb>,
+// }
-pub fn is_importing() -> bool {
- IMPORT_SEM.available_permits() == 0
-}
+// pub fn is_importing() -> bool {
+// IMPORT_SEM.available_permits() == 0
+// }
-pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> {
- let permit = IMPORT_SEM.try_acquire()?;
+// pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> {
+// let permit = IMPORT_SEM.try_acquire()?;
- let ap = Apis {
- trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)),
- tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
- };
+// let ap = Apis {
+// trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)),
+// tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
+// };
- let e = if CONF.use_in_memory_import_storage {
- import_inner(&MemoryStorage::new(db), fed, &ap).await
- } else {
- import_inner(&DatabaseStorage::new(db), fed, &ap).await
- };
- let e = match e {
- Result::Ok(e) => e,
- Result::Err(e) => vec![e],
- };
- *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect();
+// let e = if CONF.use_in_memory_import_storage {
+// import_inner(&MemoryStorage::new(db), fed, &ap).await
+// } else {
+// import_inner(&DatabaseStorage::new(db), fed, &ap).await
+// };
+// let e = match e {
+// Result::Ok(e) => e,
+// Result::Err(e) => vec![e],
+// };
+// *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect();
- drop(permit);
- Ok(())
-}
+// drop(permit);
+// Ok(())
+// }
-pub(crate) async fn import_inner(
- db: &impl ImportStorage,
- fed: &Federation,
- ap: &Apis,
-) -> anyhow::Result<Vec<anyhow::Error>> {
- db.pre_clean()?;
- info!("loading sources...");
- let mut errors = Vec::new();
- match import_path(CONF.library_path.clone(), vec![], db, fed, ap)
- .await
- .context("indexing")
- {
- Result::Ok(o) => errors.extend(o),
- Result::Err(e) => errors.push(e),
- };
- db.remove_prev_nodes()?;
- info!("merging nodes...");
- match generate_node_paths(db).context("merging nodes") {
- Result::Ok(o) => errors.extend(o),
- Result::Err(e) => errors.push(e),
- }
- db.finish()?;
- info!("import completed");
- Ok(errors)
-}
+// pub(crate) async fn import_inner(
+// db: &impl ImportStorage,
+// fed: &Federation,
+// ap: &Apis,
+// ) -> anyhow::Result<Vec<anyhow::Error>> {
+// db.pre_clean()?;
+// info!("loading sources...");
+// let mut errors = Vec::new();
+// match import_path(CONF.library_path.clone(), vec![], db, fed, ap)
+// .await
+// .context("indexing")
+// {
+// Result::Ok(o) => errors.extend(o),
+// Result::Err(e) => errors.push(e),
+// };
+// db.remove_prev_nodes()?;
+// info!("merging nodes...");
+// match generate_node_paths(db).context("merging nodes") {
+// Result::Ok(o) => errors.extend(o),
+// Result::Err(e) => errors.push(e),
+// }
+// db.finish()?;
+// info!("import completed");
+// Ok(errors)
+// }
-fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<Vec<Error>> {
- // TODO mark nodes done to allow recursion
- fn traverse(
- db: &impl ImportStorage,
- id: String,
- mut path: Vec<String>,
- parent_title: &str,
- ) -> anyhow::Result<Vec<Error>> {
- let mut errors = Vec::new();
- let node = {
- let mut parts = db
- .get_partial_parts(&id)
- .context(anyhow!("path = {path:?}"))?;
+// fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<Vec<Error>> {
+// // TODO mark nodes done to allow recursion
+// fn traverse(
+// db: &impl ImportStorage,
+// id: String,
+// mut path: Vec<String>,
+// parent_title: &str,
+// ) -> anyhow::Result<Vec<Error>> {
+// let mut errors = Vec::new();
+// let node = {
+// let mut parts = db
+// .get_partial_parts(&id)
+// .context(anyhow!("path = {path:?}"))?;
- parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
+// parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
- let mut node = parts
- .into_iter()
- .map(|(_, x)| x)
- .reduce(|x, y| merge_node(x, y).unwrap())
- .unwrap();
+// let mut node = parts
+// .into_iter()
+// .map(|(_, x)| x)
+// .reduce(|x, y| merge_node(x, y).unwrap())
+// .unwrap();
- node.public.id = Some(id.to_owned());
- node.public.path = vec![]; // will be reconstructed in the next pass
- node.public.federated = None;
+// node.public.id = Some(id.to_owned());
+// node.public.path = vec![]; // will be reconstructed in the next pass
+// node.public.federated = None;
- // TODO this discardes a lot of information. maybe change this.
- if let Some(media) = &node.public.media {
- for t in &media.tracks {
- if let Some(host) = t.federated.first() {
- if host != &CONF.hostname {
- node.public.federated = Some(host.to_string())
- }
- }
- }
- }
+// // TODO this discardes a lot of information. maybe change this.
+// if let Some(media) = &node.public.media {
+// for t in &media.tracks {
+// if let Some(host) = t.federated.first() {
+// if host != &CONF.hostname {
+// node.public.federated = Some(host.to_string())
+// }
+// }
+// }
+// }
- if node.public.path.is_empty() {
- node.public.path = path.clone();
- }
- node.public.subtitle = match node.public.kind.unwrap_or_default() {
- NodeKind::Movie => node.public.release_date.map(|date| {
- format!(
- "{}",
- DateTime::from_timestamp_millis(date)
- .unwrap()
- .date_naive()
- .year()
- )
- }),
- NodeKind::Season
- | NodeKind::Episode
- | NodeKind::ShortFormVideo
- | NodeKind::Video => Some(parent_title.to_string()),
- _ => None,
- };
+// if node.public.path.is_empty() {
+// node.public.path = path.clone();
+// }
+// node.public.subtitle = match node.public.kind.unwrap_or_default() {
+// NodeKind::Movie => node.public.release_date.map(|date| {
+// format!(
+// "{}",
+// DateTime::from_timestamp_millis(date)
+// .unwrap()
+// .date_naive()
+// .year()
+// )
+// }),
+// NodeKind::Season
+// | NodeKind::Episode
+// | NodeKind::ShortFormVideo
+// | NodeKind::Video => Some(parent_title.to_string()),
+// _ => None,
+// };
- db.insert_complete_node(&id, node.clone())?;
+// db.insert_complete_node(&id, node.clone())?;
- node
- };
+// node
+// };
- path.push(id);
- let ps = node.public.title.unwrap_or_default();
- for c in node.public.children {
- match traverse(db, c, path.clone(), &ps) {
- Result::Ok(o) => errors.extend(o),
- Result::Err(e) => errors.push(e),
- }
- }
- Ok(errors)
- }
- traverse(db, "library".to_string(), vec![], "Root")
-}
+// path.push(id);
+// let ps = node.public.title.unwrap_or_default();
+// for c in node.public.children {
+// match traverse(db, c, path.clone(), &ps) {
+// Result::Ok(o) => errors.extend(o),
+// Result::Err(e) => errors.push(e),
+// }
+// }
+// Ok(errors)
+// }
+// traverse(db, "library".to_string(), vec![], "Root")
+// }
-fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
- if x.is_empty() {
- Ordering::Greater
- } else if y.is_empty() {
- Ordering::Less
- } else {
- match x[0].cmp(&y[0]) {
- o @ (Ordering::Less | Ordering::Greater) => o,
- Ordering::Equal => compare_index_path(&x[1..], &y[1..]),
- }
- }
-}
+// fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
+// if x.is_empty() {
+// Ordering::Greater
+// } else if y.is_empty() {
+// Ordering::Less
+// } else {
+// match x[0].cmp(&y[0]) {
+// o @ (Ordering::Less | Ordering::Greater) => o,
+// Ordering::Equal => compare_index_path(&x[1..], &y[1..]),
+// }
+// }
+// }
-#[async_recursion]
-async fn import_path(
- path: PathBuf,
- mut index_path: Vec<usize>,
- db: &impl ImportStorage,
- fed: &Federation,
- ap: &Apis,
-) -> anyhow::Result<Vec<anyhow::Error>> {
- let mut errors = Vec::new();
- if path.is_dir() {
- let mut children_paths = path
- .read_dir()?
- .map(Result::unwrap)
- .filter_map(|e| {
- if e.path().extension() == Some(OsStr::new("yaml"))
- || e.path().extension() == Some(OsStr::new("jelly"))
- || e.metadata().unwrap().is_dir()
- {
- Some(e.path())
- } else {
- None
- }
- })
- .collect::<Vec<_>>();
+// #[async_recursion]
+// async fn import_path(
+// path: PathBuf,
+// mut index_path: Vec<usize>,
+// db: &impl ImportStorage,
+// fed: &Federation,
+// ap: &Apis,
+// ) -> anyhow::Result<Vec<anyhow::Error>> {
+// let mut errors = Vec::new();
+// if path.is_dir() {
+// let mut children_paths = path
+// .read_dir()?
+// .map(Result::unwrap)
+// .filter_map(|e| {
+// if e.path().extension() == Some(OsStr::new("yaml"))
+// || e.path().extension() == Some(OsStr::new("jelly"))
+// || e.metadata().unwrap().is_dir()
+// {
+// Some(e.path())
+// } else {
+// None
+// }
+// })
+// .collect::<Vec<_>>();
- children_paths.sort();
+// children_paths.sort();
- let mut children: FuturesUnordered<_> = children_paths
- .into_iter()
- .enumerate()
- .map(|(i, p)| {
- import_path(
- p.clone(),
- {
- let mut path = index_path.clone();
- path.push(i);
- path
- },
- db,
- fed,
- ap,
- )
- })
- .collect();
+// let mut children: FuturesUnordered<_> = children_paths
+// .into_iter()
+// .enumerate()
+// .map(|(i, p)| {
+// import_path(
+// p.clone(),
+// {
+// let mut path = index_path.clone();
+// path.push(i);
+// path
+// },
+// db,
+// fed,
+// ap,
+// )
+// })
+// .collect();
- while let Some(k) = children.next().await {
- match k {
- Result::Ok(o) => errors.extend(o),
- Result::Err(e) => errors.push(e),
- }
- }
- } else {
- info!("reading {path:?}");
- let opts = File::open(&path).context(anyhow!("opening {path:?}"))?;
- let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) {
- serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))?
- } else {
- serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))?
- };
+// while let Some(k) = children.next().await {
+// match k {
+// Result::Ok(o) => errors.extend(o),
+// Result::Err(e) => errors.push(e),
+// }
+// }
+// } else {
+// info!("reading {path:?}");
+// let opts = File::open(&path).context(anyhow!("opening {path:?}"))?;
+// let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) {
+// serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))?
+// } else {
+// serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))?
+// };
- for (i, s) in opts.sources.into_iter().enumerate() {
- index_path.push(i);
- if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap)
- .await
- .context(anyhow!("processing source in {path:?}"))
- {
- errors.push(e)
- }
- index_path.pop();
- }
- }
- Ok(errors)
-}
+// for (i, s) in opts.sources.into_iter().enumerate() {
+// index_path.push(i);
+// if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap)
+// .await
+// .context(anyhow!("processing source in {path:?}"))
+// {
+// errors.push(e)
+// }
+// index_path.pop();
+// }
+// }
+// Ok(errors)
+// }
-static SEM_IMPORT: Semaphore = Semaphore::const_new(2);
+// static SEM_IMPORT: Semaphore = Semaphore::const_new(2);
-#[async_recursion]
-async fn process_source(
- id: String,
- s: ImportSource,
- path: &Path,
- index_path: &[usize],
- db: &impl ImportStorage,
- fed: &Federation,
- ap: &Apis,
-) -> anyhow::Result<Vec<anyhow::Error>> {
- let mut errors = vec![];
- match s {
- ImportSource::Override(mut n) => {
- if let Some(backdrop) = n.private.backdrop.clone() {
- n.public.backdrop = Some(AssetInner::Library(backdrop).ser());
- }
- if let Some(poster) = n.private.poster.clone() {
- n.public.poster = Some(AssetInner::Library(poster).ser());
- }
- db.add_partial_node(&id, index_path, n)?
- }
- ImportSource::Trakt { id: tid, kind } => {
- info!("trakt {id}");
- let trakt = ap
- .trakt
- .as_ref()
- .ok_or(anyhow!("trakt api key is required"))?;
- let trakt_object = trakt
- .lookup(kind, tid, true)
- .await
- .context("looking up metadata")?;
- let trakt_people = trakt
- .people(kind, tid, true)
- .await
- .context("looking up people")?;
+// #[async_recursion]
+// async fn process_source(
+// id: String,
+// s: ImportSource,
+// path: &Path,
+// index_path: &[usize],
+// db: &impl ImportStorage,
+// fed: &Federation,
+// ap: &Apis,
+// ) -> anyhow::Result<Vec<anyhow::Error>> {
+// let mut errors = vec![];
+// match s {
+// ImportSource::Override(mut n) => {
+// if let Some(backdrop) = n.private.backdrop.clone() {
+// n.public.backdrop = Some(AssetInner::Library(backdrop).ser());
+// }
+// if let Some(poster) = n.private.poster.clone() {
+// n.public.poster = Some(AssetInner::Library(poster).ser());
+// }
+// db.add_partial_node(&id, index_path, n)?
+// }
+// ImportSource::Trakt { id: tid, kind } => {
+// info!("trakt {id}");
+// let trakt = ap
+// .trakt
+// .as_ref()
+// .ok_or(anyhow!("trakt api key is required"))?;
+// let trakt_object = trakt
+// .lookup(kind, tid, true)
+// .await
+// .context("looking up metadata")?;
+// let trakt_people = trakt
+// .people(kind, tid, true)
+// .await
+// .context("looking up people")?;
- let mut node = Node::default();
- let mut node_ext = ExtendedNode::default();
- {
- node.public.kind = Some(match kind {
- TraktKind::Movie => NodeKind::Movie,
- TraktKind::Show => NodeKind::Show,
- TraktKind::Season => NodeKind::Season,
- TraktKind::Episode => NodeKind::Episode,
- _ => bail!("unexpected kind for trakt import"),
- });
- node.public.title = Some(trakt_object.title.to_owned());
- if let Some(overview) = &trakt_object.overview {
- node.public.description = Some(overview.to_owned())
- }
- if let Some(tagline) = &trakt_object.tagline {
- node.public.tagline = Some(tagline.to_owned())
- }
- if let Some(rating) = &trakt_object.rating {
- node.public.ratings.insert(Rating::Trakt, *rating);
- }
- for p in trakt_people.cast.iter() {
- node_ext
- .people
- .entry(PeopleGroup::Cast)
- .or_default()
- .push(p.a())
- }
- for (group, people) in trakt_people.crew.iter() {
- for p in people {
- node_ext.people.entry(group.a()).or_default().push(p.a())
- }
- }
- // TODO lazy assets
- for ps in node_ext.people.values_mut() {
- for p in ps {
- if let Some(id) = p.person.ids.tmdb {
- if let Some(tmdb) = &ap.tmdb {
- let k = tmdb.person_image(id).await?;
- if let Some(prof) = k.profiles.first() {
- p.person.headshot = Some(
- AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(),
- );
- }
- }
- }
- }
- }
- }
- db.add_partial_node(&id, index_path, node)?;
- db.add_partial_node_ext(&id, index_path, node_ext)?;
+// let mut node = Node::default();
+// let mut node_ext = ExtendedNode::default();
+// {
+// node.public.kind = Some(match kind {
+// TraktKind::Movie => NodeKind::Movie,
+// TraktKind::Show => NodeKind::Show,
+// TraktKind::Season => NodeKind::Season,
+// TraktKind::Episode => NodeKind::Episode,
+// _ => bail!("unexpected kind for trakt import"),
+// });
+// node.public.title = Some(trakt_object.title.to_owned());
+// if let Some(overview) = &trakt_object.overview {
+// node.public.description = Some(overview.to_owned())
+// }
+// if let Some(tagline) = &trakt_object.tagline {
+// node.public.tagline = Some(tagline.to_owned())
+// }
+// if let Some(rating) = &trakt_object.rating {
+// node.public.ratings.insert(Rating::Trakt, *rating);
+// }
+// for p in trakt_people.cast.iter() {
+// node_ext
+// .people
+// .entry(PeopleGroup::Cast)
+// .or_default()
+// .push(p.a())
+// }
+// for (group, people) in trakt_people.crew.iter() {
+// for p in people {
+// node_ext.people.entry(group.a()).or_default().push(p.a())
+// }
+// }
+// // TODO lazy assets
+// for ps in node_ext.people.values_mut() {
+// for p in ps {
+// if let Some(id) = p.person.ids.tmdb {
+// if let Some(tmdb) = &ap.tmdb {
+// let k = tmdb.person_image(id).await?;
+// if let Some(prof) = k.profiles.first() {
+// p.person.headshot = Some(
+// AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(),
+// );
+// }
+// }
+// }
+// }
+// }
+// }
+// db.add_partial_node(&id, index_path, node)?;
+// db.add_partial_node_ext(&id, index_path, node_ext)?;
- if let Some(tid) = trakt_object.ids.tmdb {
- if let Some(kind) = match kind {
- TraktKind::Movie => Some(TmdbKind::Movie),
- TraktKind::Show => Some(TmdbKind::Tv),
- TraktKind::Season => Some(TmdbKind::Tv), // TODO
- TraktKind::Episode | TraktKind::Person | TraktKind::User => None,
- } {
- let mut index_path = index_path.to_vec();
- index_path.push(1);
- match process_source(
- id,
- ImportSource::Tmdb { id: tid, kind },
- path,
- &index_path,
- db,
- fed,
- ap,
- )
- .await
- {
- Result::Ok(o) => errors.extend(o),
- Result::Err(e) => errors.push(e),
- }
- }
- }
- }
- ImportSource::Tmdb { id: tid, kind } => {
- info!("tmdb {id}");
- let tmdb = ap
- .tmdb
- .as_ref()
- .ok_or(anyhow!("tmdb api key is required"))?;
+// if let Some(tid) = trakt_object.ids.tmdb {
+// if let Some(kind) = match kind {
+// TraktKind::Movie => Some(TmdbKind::Movie),
+// TraktKind::Show => Some(TmdbKind::Tv),
+// TraktKind::Season => Some(TmdbKind::Tv), // TODO
+// TraktKind::Episode | TraktKind::Person | TraktKind::User => None,
+// } {
+// let mut index_path = index_path.to_vec();
+// index_path.push(1);
+// match process_source(
+// id,
+// ImportSource::Tmdb { id: tid, kind },
+// path,
+// &index_path,
+// db,
+// fed,
+// ap,
+// )
+// .await
+// {
+// Result::Ok(o) => errors.extend(o),
+// Result::Err(e) => errors.push(e),
+// }
+// }
+// }
+// }
+// ImportSource::Tmdb { id: tid, kind } => {
+// info!("tmdb {id}");
+// let tmdb = ap
+// .tmdb
+// .as_ref()
+// .ok_or(anyhow!("tmdb api key is required"))?;
- let details = tmdb.details(kind, tid).await?;
+// let details = tmdb.details(kind, tid).await?;
- let mut node = Node::default();
+// let mut node = Node::default();
- // TODO lazy assets
- if let Some(poster) = &details.poster_path {
- node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser());
- }
- if let Some(backdrop) = &details.backdrop_path {
- node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser());
- }
+// // TODO lazy assets
+// if let Some(poster) = &details.poster_path {
+// node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser());
+// }
+// if let Some(backdrop) = &details.backdrop_path {
+// node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser());
+// }
- node.public.tagline = details.tagline.clone();
- node.public.title = details.title.clone();
- node.public.description = Some(details.overview.clone());
- node.public
- .ratings
- .insert(Rating::Tmdb, details.vote_average);
- if let Some(date) = details.release_date.clone() {
- node.public.release_date =
- parse_release_date(&date).context("parsing release date")?;
- }
+// node.public.tagline = details.tagline.clone();
+// node.public.title = details.title.clone();
+// node.public.description = Some(details.overview.clone());
+// node.public
+// .ratings
+// .insert(Rating::Tmdb, details.vote_average);
+// if let Some(date) = details.release_date.clone() {
+// node.public.release_date =
+// parse_release_date(&date).context("parsing release date")?;
+// }
- db.add_partial_node(&id, index_path, node)?;
- }
- ImportSource::Media {
- path: mpath,
- ignore_attachments,
- ignore_chapters,
- ignore_metadata,
- } => {
- info!("media import {mpath:?}");
- let abspath = CONF.media_path.join(&mpath);
- if !abspath.exists() {
- bail!("media missing at {abspath:?}");
- }
- if abspath.is_dir() {
- let mut node = Node::default();
- for f in abspath.read_dir()? {
- let f = f?;
- let child_path = f.path();
- if child_path.is_dir()
- || matches!(
- child_path.extension().map(|o| o.to_str().unwrap()),
- Some("mks" | "mka" | "mkv" | "webm")
- )
- {
- let inf_id =
- infer_id_from_path(&child_path).context("inferring child id")?;
+// db.add_partial_node(&id, index_path, node)?;
+// }
+// ImportSource::Media {
+// path: mpath,
+// ignore_attachments,
+// ignore_chapters,
+// ignore_metadata,
+// } => {
+// info!("media import {mpath:?}");
+// let abspath = CONF.media_path.join(&mpath);
+// if !abspath.exists() {
+// bail!("media missing at {abspath:?}");
+// }
+// if abspath.is_dir() {
+// let mut node = Node::default();
+// for f in abspath.read_dir()? {
+// let f = f?;
+// let child_path = f.path();
+// if child_path.is_dir()
+// || matches!(
+// child_path.extension().map(|o| o.to_str().unwrap()),
+// Some("mks" | "mka" | "mkv" | "webm")
+// )
+// {
+// let inf_id =
+// infer_id_from_path(&child_path).context("inferring child id")?;
- match process_source(
- inf_id.clone(),
- ImportSource::Media {
- path: mpath.join(f.file_name()),
- ignore_attachments,
- ignore_chapters,
- ignore_metadata,
- },
- path,
- index_path,
- db,
- fed,
- ap,
- )
- .await
- .context(anyhow!("recursive media import: {:?}", f.path()))
- {
- Result::Ok(o) => errors.extend(o),
- Result::Err(e) => errors.push(e),
- };
- node.public.children.push(inf_id);
- }
- }
- db.add_partial_node(&id, index_path, node)?;
- } else if abspath.is_file() {
- let _permit = SEM_IMPORT.acquire().await.unwrap();
- let metadata = {
- let abspath = abspath.clone();
- let mpath = mpath.to_owned();
- spawn_blocking(move || {
- cache_memory(&["probe", mpath.to_str().unwrap()], || {
- let input = File::open(&abspath).context("opening media file")?;
- let mut input = EbmlReader::new(BufReader::new(input));
- import_metadata(&mut input)
- })
- })
- }
- .await?
- .context(anyhow!("probing {abspath:?}"))?
- .deref()
- .to_owned();
+// match process_source(
+// inf_id.clone(),
+// ImportSource::Media {
+// path: mpath.join(f.file_name()),
+// ignore_attachments,
+// ignore_chapters,
+// ignore_metadata,
+// },
+// path,
+// index_path,
+// db,
+// fed,
+// ap,
+// )
+// .await
+// .context(anyhow!("recursive media import: {:?}", f.path()))
+// {
+// Result::Ok(o) => errors.extend(o),
+// Result::Err(e) => errors.push(e),
+// };
+// node.public.children.push(inf_id);
+// }
+// }
+// db.add_partial_node(&id, index_path, node)?;
+// } else if abspath.is_file() {
+// let _permit = SEM_IMPORT.acquire().await.unwrap();
+// let metadata = {
+// let abspath = abspath.clone();
+// let mpath = mpath.to_owned();
+// spawn_blocking(move || {
+// cache_memory(&["probe", mpath.to_str().unwrap()], || {
+// let input = File::open(&abspath).context("opening media file")?;
+// let mut input = EbmlReader::new(BufReader::new(input));
+// import_metadata(&mut input)
+// })
+// })
+// }
+// .await?
+// .context(anyhow!("probing {abspath:?}"))?
+// .deref()
+// .to_owned();
- let mut node = Node::default();
+// let mut node = Node::default();
- if !ignore_metadata {
- if let Some(captures) =
- RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap())
- {
- node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok());
- if let Some(title) = captures.get(6) {
- node.public.title = Some(title.as_str().to_string());
- }
- }
- node.public.title = metadata.title;
- node.public.description = metadata.description;
- node.public.tagline = metadata.tagline;
- }
- node.public.media = Some(MediaInfo {
- duration: metadata.duration,
- tracks: metadata.tracks,
- chapters: if ignore_chapters {
- vec![]
- } else {
- metadata.chapters
- },
- });
- node.private.source = Some(
- metadata
- .track_sources
- .into_iter()
- .map(|mut ts| {
- ts.path = mpath.to_owned();
- TrackSource::Local(ts)
- })
- .collect(),
- );
+// if !ignore_metadata {
+// if let Some(captures) =
+// RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap())
+// {
+// node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok());
+// if let Some(title) = captures.get(6) {
+// node.public.title = Some(title.as_str().to_string());
+// }
+// }
+// node.public.title = metadata.title;
+// node.public.description = metadata.description;
+// node.public.tagline = metadata.tagline;
+// }
+// node.public.media = Some(MediaInfo {
+// duration: metadata.duration,
+// tracks: metadata.tracks,
+// chapters: if ignore_chapters {
+// vec![]
+// } else {
+// metadata.chapters
+// },
+// });
+// node.private.source = Some(
+// metadata
+// .track_sources
+// .into_iter()
+// .map(|mut ts| {
+// ts.path = mpath.to_owned();
+// TrackSource::Local(ts)
+// })
+// .collect(),
+// );
- if !ignore_attachments {
- if let Some((filename, data)) = metadata.cover {
- node.public.poster = Some(
- AssetInner::Cache(
- async_cache_file(
- &["att-cover", mpath.to_str().unwrap(), &filename],
- |mut f| async move {
- f.write_all(&data).await?;
- Ok(())
- },
- )
- .await?,
- )
- .ser(),
- )
- };
+// if !ignore_attachments {
+// if let Some((filename, data)) = metadata.cover {
+// node.public.poster = Some(
+// AssetInner::Cache(
+// async_cache_file(
+// &["att-cover", mpath.to_str().unwrap(), &filename],
+// |mut f| async move {
+// f.write_all(&data).await?;
+// Ok(())
+// },
+// )
+// .await?,
+// )
+// .ser(),
+// )
+// };
- if let Some(infojson) = metadata.infojson {
- let infojson: infojson::YVideo =
- serde_json::from_str(&infojson).context("parsing infojson")?;
+// if let Some(infojson) = metadata.infojson {
+// let infojson: infojson::YVideo =
+// serde_json::from_str(&infojson).context("parsing infojson")?;
- node.public.kind = Some(
- if infojson.duration.unwrap_or(0.) < 120.
- && infojson.aspect_ratio.unwrap_or(2.) < 1.
- {
- NodeKind::ShortFormVideo
- } else {
- NodeKind::Video
- },
- );
- node.public.title = Some(infojson.title);
- node.public.description = Some(infojson.description);
- node.public.tagline = Some(infojson.webpage_url);
- node.public
- .ratings
- .insert(Rating::YoutubeViews, infojson.view_count as f64);
- node.public.release_date = Some(
- infojson::parse_upload_date(&infojson.upload_date)
- .context("parsing upload date")?,
- );
- node.public.ratings.extend(
- infojson
- .like_count
- .map(|l| (Rating::YoutubeLikes, l as f64)),
- );
- }
- }
- drop(_permit);
- db.add_partial_node(&id, index_path, node)?;
- } else {
- warn!("non file/dir import ignored: {abspath:?}")
- }
- }
- ImportSource::Federated { host } => {
- info!("federated import of {id:?} from {host:?}");
- let session = fed.get_session(&host).await.context("creating session")?;
+// node.public.kind = Some(
+// if infojson.duration.unwrap_or(0.) < 120.
+// && infojson.aspect_ratio.unwrap_or(2.) < 1.
+// {
+// NodeKind::ShortFormVideo
+// } else {
+// NodeKind::Video
+// },
+// );
+// node.public.title = Some(infojson.title);
+// node.public.description = Some(infojson.description);
+// node.public.tagline = Some(infojson.webpage_url);
+// node.public
+// .ratings
+// .insert(Rating::YoutubeViews, infojson.view_count as f64);
+// node.public.release_date = Some(
+// infojson::parse_upload_date(&infojson.upload_date)
+// .context("parsing upload date")?,
+// );
+// node.public.ratings.extend(
+// infojson
+// .like_count
+// .map(|l| (Rating::YoutubeLikes, l as f64)),
+// );
+// }
+// }
+// drop(_permit);
+// db.add_partial_node(&id, index_path, node)?;
+// } else {
+// warn!("non file/dir import ignored: {abspath:?}")
+// }
+// }
+// ImportSource::Federated { host } => {
+// info!("federated import of {id:?} from {host:?}");
+// let session = fed.get_session(&host).await.context("creating session")?;
- import_remote(id.clone(), &host, db, &session, index_path)
- .await
- .context("federated import")?
- }
- ImportSource::AutoChildren { path: cpath } => {
- info!("auto children at {path:?}");
- let paths = cpath
- .unwrap_or_else(|| path.parent().unwrap().to_path_buf())
- .read_dir()?
- .map(Result::unwrap)
- .map(|e| e.path())
- .filter(|e| {
- e.extension() == Some(OsStr::new("yaml"))
- || e.extension() == Some(OsStr::new("jelly"))
- });
+// import_remote(id.clone(), &host, db, &session, index_path)
+// .await
+// .context("federated import")?
+// }
+// ImportSource::AutoChildren { path: cpath } => {
+// info!("auto children at {path:?}");
+// let paths = cpath
+// .unwrap_or_else(|| path.parent().unwrap().to_path_buf())
+// .read_dir()?
+// .map(Result::unwrap)
+// .map(|e| e.path())
+// .filter(|e| {
+// e.extension() == Some(OsStr::new("yaml"))
+// || e.extension() == Some(OsStr::new("jelly"))
+// });
- let mut children = Vec::new();
- for p in paths {
- let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) {
- serde_json::from_reader(File::open(&p)?)?
- } else {
- serde_yaml::from_reader(File::open(&p)?)?
- };
- if opts.id != id {
- children.push(opts.id);
- }
- }
- db.add_partial_node(
- &id,
- index_path,
- Node {
- private: NodePrivate::default(),
- public: NodePublic {
- children,
- ..Default::default()
- },
- },
- )?;
- }
- }
- Ok(errors)
-}
+// let mut children = Vec::new();
+// for p in paths {
+// let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) {
+// serde_json::from_reader(File::open(&p)?)?
+// } else {
+// serde_yaml::from_reader(File::open(&p)?)?
+// };
+// if opts.id != id {
+// children.push(opts.id);
+// }
+// }
+// db.add_partial_node(
+// &id,
+// index_path,
+// Node {
+// private: NodePrivate::default(),
+// public: NodePublic {
+// children,
+// ..Default::default()
+// },
+// },
+// )?;
+// }
+// }
+// Ok(errors)
+// }
-pub fn infer_id_from_path(path: &Path) -> anyhow::Result<String> {
- let f = path
- .file_stem()
- .ok_or(anyhow!("no filename"))?
- .to_str()
- .ok_or(anyhow!("non utf8 filename"))?;
+// pub fn infer_id_from_path(path: &Path) -> anyhow::Result<String> {
+// let f = path
+// .file_stem()
+// .ok_or(anyhow!("no filename"))?
+// .to_str()
+// .ok_or(anyhow!("non utf8 filename"))?;
- if let Some(mat) = RE_YOUTUBE_ID.captures(f) {
- let id = mat.get(1).unwrap().as_str();
- return Ok(format!("youtube-{id}"));
- }
+// if let Some(mat) = RE_YOUTUBE_ID.captures(f) {
+// let id = mat.get(1).unwrap().as_str();
+// return Ok(format!("youtube-{id}"));
+// }
- let mut fsan = String::with_capacity(f.len());
- for c in f.chars() {
- fsan.extend(match c {
- 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c),
- ' ' => Some('-'),
- _ => None,
- });
- }
- Ok(fsan)
-}
+// let mut fsan = String::with_capacity(f.len());
+// for c in f.chars() {
+// fsan.extend(match c {
+// 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c),
+// ' ' => Some('-'),
+// _ => None,
+// });
+// }
+// Ok(fsan)
+// }
-fn merge_node(x: Node, y: Node) -> anyhow::Result<Node> {
- let (media, source) = match (
- x.public.media,
- y.public.media,
- x.private.source,
- y.private.source,
- ) {
- (Some(x), Some(y), Some(sx), Some(sy)) => {
- let k = merge_media(x, y, sx, sy);
- (Some(k.0), Some(k.1))
- }
- (Some(x), None, Some(sx), None) => (Some(x), Some(sx)),
- (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)),
- (None, None, None, None) => (None, None),
- _ => bail!("invalid node. source and media dont agree."),
- };
- Ok(Node {
- public: NodePublic {
- kind: x.public.kind.or(y.public.kind),
- title: x.public.title.or(y.public.title),
- subtitle: x.public.subtitle.or(y.public.subtitle),
- id: x.public.id.or(y.public.id),
- path: vec![],
- children: merge_children(x.public.children, y.public.children),
- tagline: x.public.tagline.or(y.public.tagline),
- description: x.public.description.or(y.public.description),
- release_date: x.public.release_date.or(y.public.release_date),
- index: x.public.index.or(y.public.index),
- media,
- ratings: x
- .public
- .ratings
- .into_iter()
- .chain(y.public.ratings)
- .collect(),
- federated: None,
- poster: x.public.poster.or(y.public.poster),
- backdrop: x.public.backdrop.or(y.public.backdrop),
- },
- private: NodePrivate {
- id: x.private.id.or(y.private.id),
- source,
- backdrop: None,
- poster: None,
- },
- })
-}
+// fn merge_node(x: Node, y: Node) -> anyhow::Result<Node> {
+// let (media, source) = match (
+// x.public.media,
+// y.public.media,
+// x.private.source,
+// y.private.source,
+// ) {
+// (Some(x), Some(y), Some(sx), Some(sy)) => {
+// let k = merge_media(x, y, sx, sy);
+// (Some(k.0), Some(k.1))
+// }
+// (Some(x), None, Some(sx), None) => (Some(x), Some(sx)),
+// (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)),
+// (None, None, None, None) => (None, None),
+// _ => bail!("invalid node. source and media dont agree."),
+// };
+// Ok(Node {
+// public: NodePublic {
+// kind: x.public.kind.or(y.public.kind),
+// title: x.public.title.or(y.public.title),
+// subtitle: x.public.subtitle.or(y.public.subtitle),
+// id: x.public.id.or(y.public.id),
+// path: vec![],
+// children: merge_children(x.public.children, y.public.children),
+// tagline: x.public.tagline.or(y.public.tagline),
+// description: x.public.description.or(y.public.description),
+// release_date: x.public.release_date.or(y.public.release_date),
+// index: x.public.index.or(y.public.index),
+// media,
+// ratings: x
+// .public
+// .ratings
+// .into_iter()
+// .chain(y.public.ratings)
+// .collect(),
+// federated: None,
+// poster: x.public.poster.or(y.public.poster),
+// backdrop: x.public.backdrop.or(y.public.backdrop),
+// },
+// private: NodePrivate {
+// id: x.private.id.or(y.private.id),
+// source,
+// backdrop: None,
+// poster: None,
+// },
+// })
+// }
-fn merge_children(mut a: Vec<String>, b: Vec<String>) -> Vec<String> {
- let acont = HashSet::<_, RandomState>::from_iter(a.clone());
- for el in b {
- if !acont.contains(&el) {
- a.push(el)
- }
- }
- a
-}
-fn merge_media(
- x: MediaInfo,
- y: MediaInfo,
- sx: Vec<TrackSource>,
- sy: Vec<TrackSource>,
-) -> (MediaInfo, Vec<TrackSource>) {
- let mut tracks: Vec<SourceTrack> = Vec::new();
- let mut source: Vec<TrackSource> = Vec::new();
- for (t, s) in x
- .tracks
- .into_iter()
- .zip(sx.into_iter())
- .chain(y.tracks.into_iter().zip(sy.into_iter()))
- {
- let mut remove = None;
- let mut skip = false;
- for (i, ot) in tracks.iter().enumerate() {
- if t.name == ot.name
- && t.kind == ot.kind
- && t.language == ot.language
- && t.codec == ot.codec
- {
- if t.federated.len() < ot.federated.len() {
- remove = Some(i);
- } else {
- skip = true;
- }
- }
- }
- if let Some(r) = remove {
- tracks.swap_remove(r);
- source.swap_remove(r);
- }
- if !skip {
- tracks.push(t);
- source.push(s);
- }
- }
- (
- MediaInfo {
- duration: x.duration * 0.5 + y.duration * 0.5,
- tracks,
- chapters: if x.chapters.len() > y.chapters.len() {
- x.chapters
- } else {
- y.chapters
- },
- },
- source,
- )
-}
+// fn merge_children(mut a: Vec<String>, b: Vec<String>) -> Vec<String> {
+// let acont = HashSet::<_, RandomState>::from_iter(a.clone());
+// for el in b {
+// if !acont.contains(&el) {
+// a.push(el)
+// }
+// }
+// a
+// }
+// fn merge_media(
+// x: MediaInfo,
+// y: MediaInfo,
+// sx: Vec<TrackSource>,
+// sy: Vec<TrackSource>,
+// ) -> (MediaInfo, Vec<TrackSource>) {
+// let mut tracks: Vec<SourceTrack> = Vec::new();
+// let mut source: Vec<TrackSource> = Vec::new();
+// for (t, s) in x
+// .tracks
+// .into_iter()
+// .zip(sx.into_iter())
+// .chain(y.tracks.into_iter().zip(sy.into_iter()))
+// {
+// let mut remove = None;
+// let mut skip = false;
+// for (i, ot) in tracks.iter().enumerate() {
+// if t.name == ot.name
+// && t.kind == ot.kind
+// && t.language == ot.language
+// && t.codec == ot.codec
+// {
+// if t.federated.len() < ot.federated.len() {
+// remove = Some(i);
+// } else {
+// skip = true;
+// }
+// }
+// }
+// if let Some(r) = remove {
+// tracks.swap_remove(r);
+// source.swap_remove(r);
+// }
+// if !skip {
+// tracks.push(t);
+// source.push(s);
+// }
+// }
+// (
+// MediaInfo {
+// duration: x.duration * 0.5 + y.duration * 0.5,
+// tracks,
+// chapters: if x.chapters.len() > y.chapters.len() {
+// x.chapters
+// } else {
+// y.chapters
+// },
+// },
+// source,
+// )
+// }
-static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16);
+// static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16);
-#[async_recursion]
-async fn import_remote(
- id: String,
- host: &str,
- db: &impl ImportStorage,
- session: &Arc<Session>,
- index_path: &[usize],
-) -> anyhow::Result<()> {
- let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();
- info!("loading federated node {id:?}");
+// #[async_recursion]
+// async fn import_remote(
+// id: String,
+// host: &str,
+// db: &impl ImportStorage,
+// session: &Arc<Session>,
+// index_path: &[usize],
+// ) -> anyhow::Result<()> {
+// let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();
+// info!("loading federated node {id:?}");
- let mut node = session.node(&id).await.context("fetching remote node")?;
- let mut node_ext = session
- .node_extended(&id)
- .await
- .context("fetching extended remote node")?;
+// let mut node = session.node(&id).await.context("fetching remote node")?;
+// let mut node_ext = session
+// .node_extended(&id)
+// .await
+// .context("fetching extended remote node")?;
- let track_sources = if let Some(media) = &mut node.media {
- let mut track_sources = Vec::new();
- for (i, t) in media.tracks.iter_mut().enumerate() {
- t.federated.push(host.to_owned());
- track_sources.push(TrackSource::Remote(i))
- }
- Some(track_sources)
- } else {
- None
- };
+// let track_sources = if let Some(media) = &mut node.media {
+// let mut track_sources = Vec::new();
+// for (i, t) in media.tracks.iter_mut().enumerate() {
+// t.federated.push(host.to_owned());
+// track_sources.push(TrackSource::Remote(i))
+// }
+// Some(track_sources)
+// } else {
+// None
+// };
- drop(_permit);
+// drop(_permit);
- let mut node = Node {
- public: node.clone(),
- private: NodePrivate {
- backdrop: None,
- poster: None,
- id: None,
- source: track_sources,
- },
- };
- make_opt_asset_federated(host, &mut node.public.backdrop)?;
- make_opt_asset_federated(host, &mut node.public.poster)?;
- for g in node_ext.people.values_mut() {
- for a in g {
- make_opt_asset_federated(host, &mut a.person.headshot)?;
- }
- }
+// let mut node = Node {
+// public: node.clone(),
+// private: NodePrivate {
+// backdrop: None,
+// poster: None,
+// id: None,
+// source: track_sources,
+// },
+// };
+// make_opt_asset_federated(host, &mut node.public.backdrop)?;
+// make_opt_asset_federated(host, &mut node.public.poster)?;
+// for g in node_ext.people.values_mut() {
+// for a in g {
+// make_opt_asset_federated(host, &mut a.person.headshot)?;
+// }
+// }
- debug!("adding {id}");
- db.add_partial_node(&id, index_path, node.clone())?;
- db.add_partial_node_ext(&id, index_path, node_ext)?;
+// debug!("adding {id}");
+// db.add_partial_node(&id, index_path, node.clone())?;
+// db.add_partial_node_ext(&id, index_path, node_ext)?;
- let mut children: FuturesUnordered<_> = node
- .public
- .children
- .iter()
- .map(|c| import_remote(c.to_owned(), host, db, session, index_path))
- .collect();
+// let mut children: FuturesUnordered<_> = node
+// .public
+// .children
+// .iter()
+// .map(|c| import_remote(c.to_owned(), host, db, session, index_path))
+// .collect();
- while let Some(r) = children.next().await {
- r?;
- }
+// while let Some(r) = children.next().await {
+// r?;
+// }
- Ok(())
-}
+// Ok(())
+// }
-pub fn make_opt_asset_federated(host: &str, p: &mut Option<Asset>) -> anyhow::Result<()> {
- if let Some(a) = p {
- make_asset_federated(host, a)?
- }
- Ok(())
-}
-pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> {
- let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?;
- *p = AssetInner::Federated {
- host: host.to_owned(),
- asset: data,
- }
- .ser();
- Ok(())
-}
+// pub fn make_opt_asset_federated(host: &str, p: &mut Option<Asset>) -> anyhow::Result<()> {
+// if let Some(a) = p {
+// make_asset_federated(host, a)?
+// }
+// Ok(())
+// }
+// pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> {
+// let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?;
+// *p = AssetInner::Federated {
+// host: host.to_owned(),
+// asset: data,
+// }
+// .ser();
+// Ok(())
+// }