diff options
author | metamuffin <metamuffin@disroot.org> | 2025-01-29 22:34:57 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-01-29 22:34:57 +0100 |
commit | 8099c51e56b6d253c05cac9c235f52027ad736fa (patch) | |
tree | d1b8d02ef332caa407ec8937fed56b6e5d5a5d3d | |
parent | db511d3fe50f05329615f718515fab1b80d9e06a (diff) | |
download | jellything-8099c51e56b6d253c05cac9c235f52027ad736fa.tar jellything-8099c51e56b6d253c05cac9c235f52027ad736fa.tar.bz2 jellything-8099c51e56b6d253c05cac9c235f52027ad736fa.tar.zst |
individual immediate file import
-rw-r--r-- | Cargo.lock | 8 | ||||
-rw-r--r-- | base/src/database.rs | 83 | ||||
-rw-r--r-- | import/Cargo.toml | 4 | ||||
-rw-r--r-- | import/src/lib.rs | 1078 | ||||
-rw-r--r-- | server/src/routes/mod.rs | 6 | ||||
-rw-r--r-- | server/src/routes/ui/admin/mod.rs | 69 | ||||
-rw-r--r-- | server/src/routes/ui/node.rs | 4 | ||||
-rw-r--r-- | web/script/player/mod.ts | 7 |
8 files changed, 338 insertions, 921 deletions
@@ -802,6 +802,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" [[package]] +name = "ebml-struct" +version = "0.1.0" +source = "git+https://codeberg.org/metamuffin/ebml-struct#5574bebaa4e2104a5fd8ce7eba410e8453b99cf7" + +[[package]] name = "ebml_derive" version = "0.1.0" dependencies = [ @@ -1743,12 +1748,11 @@ dependencies = [ "async-recursion", "base64", "bincode", + "ebml-struct", "futures", "jellybase", "jellyclient", "jellycommon", - "jellymatroska", - "jellyremuxer", "log", "regex", "reqwest", diff --git a/base/src/database.rs b/base/src/database.rs index 0f18097..1f3efbf 100644 --- a/base/src/database.rs +++ b/base/src/database.rs @@ -29,6 +29,7 @@ const T_USER_NODE: TableDefinition<(&str, [u8; 32]), Ser<NodeUserData>> = TableDefinition::new("user_node"); const T_INVITE: TableDefinition<&str, ()> = TableDefinition::new("invite"); const T_NODE: TableDefinition<[u8; 32], Ser<Node>> = TableDefinition::new("node"); +const T_IMPORT_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import-mtime"); #[derive(Clone)] pub struct Database { @@ -268,6 +269,21 @@ impl Database { txn.commit().unwrap(); Ok(()) } + pub fn update_node_init( + &self, + id: NodeID, + update: impl FnOnce(&mut Node) -> Result<()>, + ) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut t_nodes = txn.open_table(T_NODE)?; + let mut node = t_nodes.get(id.0)?.map(|v| v.value().0).unwrap_or_default(); + update(&mut node)?; + t_nodes.insert(&id.0, Ser(node))?; + drop(t_nodes); + txn.commit()?; + + Ok(()) + } } pub struct NodeIndex { @@ -322,73 +338,6 @@ impl NodeIndex { } } -// pub trait TableExt<Key, KeyRef, Value> { -// fn get(self, db: &Database, key: KeyRef) -> anyhow::Result<Option<Value>>; -// fn insert(self, db: &Database, key: KeyRef, value: Value) -> anyhow::Result<()>; -// fn remove(self, db: &Database, key: KeyRef) -> anyhow::Result<Option<Value>>; -// } -// impl<'a, 'b, 'c, Key, Value, KeyRef> TableExt<Key, KeyRef, Value> -// for redb::TableDefinition<'a, Key, Ser<Value>> -// where -// Key: Borrow<<Key as redb::Value>::SelfType<'b>> + redb::Key, -// Value: Encode + Decode + std::fmt::Debug + Serialize + for<'x> Deserialize<'x>, -// KeyRef: Borrow<<Key as redb::Value>::SelfType<'c>>, -// { -// fn get(self, db: &Database, key: KeyRef) -> anyhow::Result<Option<Value>> { -// let txn = db.inner.begin_read()?; -// let table = txn.open_table(self)?; -// let user = table.get(key)?.map(|v| v.value().0); -// drop(table); -// Ok(user) -// } -// fn insert(self, db: &Database, key: KeyRef, value: Value) -> anyhow::Result<()> { -// let txn = db.inner.begin_write()?; -// let mut table = txn.open_table(self)?; -// table.insert(key, Ser(value))?; -// drop(table); -// txn.commit()?; -// Ok(()) -// } -// fn remove(self, db: &Database, key: KeyRef) -> anyhow::Result<Option<Value>> { -// let txn = db.inner.begin_write()?; -// let mut table = txn.open_table(self)?; -// let prev = table.remove(key)?.map(|v| v.value().0); -// drop(table); -// txn.commit()?; -// Ok(prev) -// } -// } - -// pub trait TableIterExt< -// 'a, -// Key: redb::redb::Key + 'static, -// Value: redb::redb::Value + 'static, -// F: FnOnce(&redb::Range<'a, Key, Value>) -> anyhow::Result<T>, -// T: 'static, -// > -// { -// fn iter(self, db: &'a DataAcid, f: F) -> anyhow::Result<T>; -// } -// impl<'a, Key, Value, F, T> TableIterExt<'a, Key, Value, F, T> -// for TableDefinition<'static, Key, Value> -// where -// Key: redb::redb::Key, -// Value: redb::redb::Value, -// F: FnOnce(&redb::Range<'a, Key, Value>) -> anyhow::Result<T>, -// T: 'static, -// { -// fn iter(self, db: &DataAcid, f: F) -> anyhow::Result<T> { -// let txn = db.begin_read()?; -// let table = txn.open_table(self)?; -// let iter = table.iter()?; -// let ret = f(&iter)?; -// drop(iter); -// drop(table); -// drop(txn); -// Ok(ret) -// } -// } - #[derive(Debug)] #[cfg(not(feature = "db_json"))] pub struct Ser<T>(pub T); diff --git a/import/Cargo.toml b/import/Cargo.toml index e218cb0..b9bd6db 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -7,8 +7,8 @@ edition = "2021" jellycommon = { path = "../common" } jellybase = { path = "../base" } jellyclient = { path = "../client" } -jellymatroska = { path = "../matroska" } -jellyremuxer = { path = "../remuxer" } + +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" } log = { workspace = true } anyhow = "1.0.95" diff --git a/import/src/lib.rs b/import/src/lib.rs index 787c0cf..ab410eb 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -3,840 +3,302 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ +use anyhow::{anyhow, Context, Result}; +use ebml_struct::{ + ids::*, + matroska::*, + read::{EbmlReadExt, TagRead}, +}; +use jellybase::{assetfed::AssetInner, cache::cache_file, database::Database, CONF, SECRETS}; +use jellycommon::{ + Chapter, LocalTrack, MediaInfo, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind, TrackSource, +}; +use log::info; +use regex::Regex; +use std::{ + collections::{HashMap, VecDeque}, + fs::File, + io::{BufReader, ErrorKind, Read, Write}, + path::Path, + sync::LazyLock, +}; +use tmdb::Tmdb; +use tokio::{ + sync::{RwLock, Semaphore}, + task::spawn_blocking, +}; +use trakt::Trakt; + pub mod infojson; pub mod tmdb; pub mod trakt; +static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); +pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new()); -// use anyhow::{anyhow, bail, Context, Error, Ok}; -// use async_recursion::async_recursion; -// use base64::Engine; -// use db::{DatabaseStorage, ImportStorage, MemoryStorage}; -// use futures::{stream::FuturesUnordered, StreamExt}; -// use jellybase::{ -// assetfed::AssetInner, -// cache::{async_cache_file, cache_memory}, -// database::DataAcid, -// federation::Federation, -// CONF, SECRETS, -// }; -// use jellyclient::Session; -// use jellycommon::{ -// chrono::{DateTime, Datelike}, -// Asset, ExtendedNode, ImportOptions, ImportSource, MediaInfo, Node, NodeKind, NodePrivate, -// NodePublic, PeopleGroup, Rating, SourceTrack, TmdbKind, TrackSource, TraktKind, -// }; -// use jellymatroska::read::EbmlReader; -// use jellyremuxer::metadata::import_metadata; -// use log::{debug, info, warn}; -// use regex::Regex; -// use std::{ -// cmp::Ordering, -// collections::HashSet, -// ffi::OsStr, -// fs::File, -// hash::RandomState, -// io::BufReader, -// ops::Deref, -// path::{Path, PathBuf}, -// sync::{Arc, LazyLock}, -// }; -// use tmdb::{parse_release_date, Tmdb}; -// use tokio::{ -// io::AsyncWriteExt, -// sync::{RwLock, Semaphore}, -// task::spawn_blocking, -// }; -// use trakt::Trakt; - -// static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); -// pub static IMPORT_ERRORS: RwLock<Vec<String>> = RwLock::const_new(Vec::new()); - -// static RE_EPISODE_FILENAME: LazyLock<Regex> = -// LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); -// static RE_YOUTUBE_ID: LazyLock<Regex> = -// LazyLock::new(|| Regex::new(r#"\[([A-Za-z0-9_-]{11})\]"#).unwrap()); - -// struct Apis { -// trakt: Option<Trakt>, -// tmdb: Option<Tmdb>, -// } - -// pub fn is_importing() -> bool { -// IMPORT_SEM.available_permits() == 0 -// } - -// pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { -// let permit = IMPORT_SEM.try_acquire()?; - -// let ap = Apis { -// trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), -// tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), -// }; - -// let e = if CONF.use_in_memory_import_storage { -// import_inner(&MemoryStorage::new(db), fed, &ap).await -// } else { -// import_inner(&DatabaseStorage::new(db), fed, &ap).await -// }; -// let e = match e { -// Result::Ok(e) => e, -// Result::Err(e) => vec![e], -// }; -// *IMPORT_ERRORS.write().await = e.into_iter().map(|e| format!("{e:?}")).collect(); - -// drop(permit); -// Ok(()) -// } - -// pub(crate) async fn import_inner( -// db: &impl ImportStorage, -// fed: &Federation, -// ap: &Apis, -// ) -> anyhow::Result<Vec<anyhow::Error>> { -// db.pre_clean()?; -// info!("loading sources..."); -// let mut errors = Vec::new(); -// match import_path(CONF.library_path.clone(), vec![], db, fed, ap) -// .await -// .context("indexing") -// { -// Result::Ok(o) => errors.extend(o), -// Result::Err(e) => errors.push(e), -// }; -// db.remove_prev_nodes()?; -// info!("merging nodes..."); -// match generate_node_paths(db).context("merging nodes") { -// Result::Ok(o) => errors.extend(o), -// Result::Err(e) => errors.push(e), -// } -// db.finish()?; -// info!("import completed"); -// Ok(errors) -// } - -// fn generate_node_paths(db: &impl ImportStorage) -> anyhow::Result<Vec<Error>> { -// // TODO mark nodes done to allow recursion -// fn traverse( -// db: &impl ImportStorage, -// id: String, -// mut path: Vec<String>, -// parent_title: &str, -// ) -> anyhow::Result<Vec<Error>> { -// let mut errors = Vec::new(); -// let node = { -// let mut parts = db -// .get_partial_parts(&id) -// .context(anyhow!("path = {path:?}"))?; - -// parts.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); - -// let mut node = parts -// .into_iter() -// .map(|(_, x)| x) -// .reduce(|x, y| merge_node(x, y).unwrap()) -// .unwrap(); - -// node.public.id = Some(id.to_owned()); -// node.public.path = vec![]; // will be reconstructed in the next pass -// node.public.federated = None; - -// // TODO this discardes a lot of information. maybe change this. -// if let Some(media) = &node.public.media { -// for t in &media.tracks { -// if let Some(host) = t.federated.first() { -// if host != &CONF.hostname { -// node.public.federated = Some(host.to_string()) -// } -// } -// } -// } - -// if node.public.path.is_empty() { -// node.public.path = path.clone(); -// } -// node.public.subtitle = match node.public.kind.unwrap_or_default() { -// NodeKind::Movie => node.public.release_date.map(|date| { -// format!( -// "{}", -// DateTime::from_timestamp_millis(date) -// .unwrap() -// .date_naive() -// .year() -// ) -// }), -// NodeKind::Season -// | NodeKind::Episode -// | NodeKind::ShortFormVideo -// | NodeKind::Video => Some(parent_title.to_string()), -// _ => None, -// }; - -// db.insert_complete_node(&id, node.clone())?; - -// node -// }; - -// path.push(id); -// let ps = node.public.title.unwrap_or_default(); -// for c in node.public.children { -// match traverse(db, c, path.clone(), &ps) { -// Result::Ok(o) => errors.extend(o), -// Result::Err(e) => errors.push(e), -// } -// } -// Ok(errors) -// } -// traverse(db, "library".to_string(), vec![], "Root") -// } - -// fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { -// if x.is_empty() { -// Ordering::Greater -// } else if y.is_empty() { -// Ordering::Less -// } else { -// match x[0].cmp(&y[0]) { -// o @ (Ordering::Less | Ordering::Greater) => o, -// Ordering::Equal => compare_index_path(&x[1..], &y[1..]), -// } -// } -// } - -// #[async_recursion] -// async fn import_path( -// path: PathBuf, -// mut index_path: Vec<usize>, -// db: &impl ImportStorage, -// fed: &Federation, -// ap: &Apis, -// ) -> anyhow::Result<Vec<anyhow::Error>> { -// let mut errors = Vec::new(); -// if path.is_dir() { -// let mut children_paths = path -// .read_dir()? -// .map(Result::unwrap) -// .filter_map(|e| { -// if e.path().extension() == Some(OsStr::new("yaml")) -// || e.path().extension() == Some(OsStr::new("jelly")) -// || e.metadata().unwrap().is_dir() -// { -// Some(e.path()) -// } else { -// None -// } -// }) -// .collect::<Vec<_>>(); - -// children_paths.sort(); - -// let mut children: FuturesUnordered<_> = children_paths -// .into_iter() -// .enumerate() -// .map(|(i, p)| { -// import_path( -// p.clone(), -// { -// let mut path = index_path.clone(); -// path.push(i); -// path -// }, -// db, -// fed, -// ap, -// ) -// }) -// .collect(); - -// while let Some(k) = children.next().await { -// match k { -// Result::Ok(o) => errors.extend(o), -// Result::Err(e) => errors.push(e), -// } -// } -// } else { -// info!("reading {path:?}"); -// let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; -// let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { -// serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? -// } else { -// serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? -// }; - -// for (i, s) in opts.sources.into_iter().enumerate() { -// index_path.push(i); -// if let Err(e) = process_source(opts.id.clone(), s, &path, &index_path, db, fed, ap) -// .await -// .context(anyhow!("processing source in {path:?}")) -// { -// errors.push(e) -// } -// index_path.pop(); -// } -// } -// Ok(errors) -// } - -// static SEM_IMPORT: Semaphore = Semaphore::const_new(2); - -// #[async_recursion] -// async fn process_source( -// id: String, -// s: ImportSource, -// path: &Path, -// index_path: &[usize], -// db: &impl ImportStorage, -// fed: &Federation, -// ap: &Apis, -// ) -> anyhow::Result<Vec<anyhow::Error>> { -// let mut errors = vec![]; -// match s { -// ImportSource::Override(mut n) => { -// if let Some(backdrop) = n.private.backdrop.clone() { -// n.public.backdrop = Some(AssetInner::Library(backdrop).ser()); -// } -// if let Some(poster) = n.private.poster.clone() { -// n.public.poster = Some(AssetInner::Library(poster).ser()); -// } -// db.add_partial_node(&id, index_path, n)? -// } -// ImportSource::Trakt { id: tid, kind } => { -// info!("trakt {id}"); -// let trakt = ap -// .trakt -// .as_ref() -// .ok_or(anyhow!("trakt api key is required"))?; -// let trakt_object = trakt -// .lookup(kind, tid, true) -// .await -// .context("looking up metadata")?; -// let trakt_people = trakt -// .people(kind, tid, true) -// .await -// .context("looking up people")?; - -// let mut node = Node::default(); -// let mut node_ext = ExtendedNode::default(); -// { -// node.public.kind = Some(match kind { -// TraktKind::Movie => NodeKind::Movie, -// TraktKind::Show => NodeKind::Show, -// TraktKind::Season => NodeKind::Season, -// TraktKind::Episode => NodeKind::Episode, -// _ => bail!("unexpected kind for trakt import"), -// }); -// node.public.title = Some(trakt_object.title.to_owned()); -// if let Some(overview) = &trakt_object.overview { -// node.public.description = Some(overview.to_owned()) -// } -// if let Some(tagline) = &trakt_object.tagline { -// node.public.tagline = Some(tagline.to_owned()) -// } -// if let Some(rating) = &trakt_object.rating { -// node.public.ratings.insert(Rating::Trakt, *rating); -// } -// for p in trakt_people.cast.iter() { -// node_ext -// .people -// .entry(PeopleGroup::Cast) -// .or_default() -// .push(p.a()) -// } -// for (group, people) in trakt_people.crew.iter() { -// for p in people { -// node_ext.people.entry(group.a()).or_default().push(p.a()) -// } -// } -// // TODO lazy assets -// for ps in node_ext.people.values_mut() { -// for p in ps { -// if let Some(id) = p.person.ids.tmdb { -// if let Some(tmdb) = &ap.tmdb { -// let k = tmdb.person_image(id).await?; -// if let Some(prof) = k.profiles.first() { -// p.person.headshot = Some( -// AssetInner::Cache(tmdb.image(&prof.file_path).await?).ser(), -// ); -// } -// } -// } -// } -// } -// } -// db.add_partial_node(&id, index_path, node)?; -// db.add_partial_node_ext(&id, index_path, node_ext)?; - -// if let Some(tid) = trakt_object.ids.tmdb { -// if let Some(kind) = match kind { -// TraktKind::Movie => Some(TmdbKind::Movie), -// TraktKind::Show => Some(TmdbKind::Tv), -// TraktKind::Season => Some(TmdbKind::Tv), // TODO -// TraktKind::Episode | TraktKind::Person | TraktKind::User => None, -// } { -// let mut index_path = index_path.to_vec(); -// index_path.push(1); -// match process_source( -// id, -// ImportSource::Tmdb { id: tid, kind }, -// path, -// &index_path, -// db, -// fed, -// ap, -// ) -// .await -// { -// Result::Ok(o) => errors.extend(o), -// Result::Err(e) => errors.push(e), -// } -// } -// } -// } -// ImportSource::Tmdb { id: tid, kind } => { -// info!("tmdb {id}"); -// let tmdb = ap -// .tmdb -// .as_ref() -// .ok_or(anyhow!("tmdb api key is required"))?; - -// let details = tmdb.details(kind, tid).await?; - -// let mut node = Node::default(); - -// // TODO lazy assets -// if let Some(poster) = &details.poster_path { -// node.public.poster = Some(AssetInner::Cache(tmdb.image(poster).await?).ser()); -// } -// if let Some(backdrop) = &details.backdrop_path { -// node.public.backdrop = Some(AssetInner::Cache(tmdb.image(backdrop).await?).ser()); -// } - -// node.public.tagline = details.tagline.clone(); -// node.public.title = details.title.clone(); -// node.public.description = Some(details.overview.clone()); -// node.public -// .ratings -// .insert(Rating::Tmdb, details.vote_average); -// if let Some(date) = details.release_date.clone() { -// node.public.release_date = -// parse_release_date(&date).context("parsing release date")?; -// } - -// db.add_partial_node(&id, index_path, node)?; -// } -// ImportSource::Media { -// path: mpath, -// ignore_attachments, -// ignore_chapters, -// ignore_metadata, -// } => { -// info!("media import {mpath:?}"); -// let abspath = CONF.media_path.join(&mpath); -// if !abspath.exists() { -// bail!("media missing at {abspath:?}"); -// } -// if abspath.is_dir() { -// let mut node = Node::default(); -// for f in abspath.read_dir()? { -// let f = f?; -// let child_path = f.path(); -// if child_path.is_dir() -// || matches!( -// child_path.extension().map(|o| o.to_str().unwrap()), -// Some("mks" | "mka" | "mkv" | "webm") -// ) -// { -// let inf_id = -// infer_id_from_path(&child_path).context("inferring child id")?; +static RE_EPISODE_FILENAME: LazyLock<Regex> = + LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); -// match process_source( -// inf_id.clone(), -// ImportSource::Media { -// path: mpath.join(f.file_name()), -// ignore_attachments, -// ignore_chapters, -// ignore_metadata, -// }, -// path, -// index_path, -// db, -// fed, -// ap, -// ) -// .await -// .context(anyhow!("recursive media import: {:?}", f.path())) -// { -// Result::Ok(o) => errors.extend(o), -// Result::Err(e) => errors.push(e), -// }; -// node.public.children.push(inf_id); -// } -// } -// db.add_partial_node(&id, index_path, node)?; -// } else if abspath.is_file() { -// let _permit = SEM_IMPORT.acquire().await.unwrap(); -// let metadata = { -// let abspath = abspath.clone(); -// let mpath = mpath.to_owned(); -// spawn_blocking(move || { -// cache_memory(&["probe", mpath.to_str().unwrap()], || { -// let input = File::open(&abspath).context("opening media file")?; -// let mut input = EbmlReader::new(BufReader::new(input)); -// import_metadata(&mut input) -// }) -// }) -// } -// .await? -// .context(anyhow!("probing {abspath:?}"))? -// .deref() -// .to_owned(); +struct Apis { + trakt: Option<Trakt>, + tmdb: Option<Tmdb>, +} -// let mut node = Node::default(); +pub fn is_importing() -> bool { + IMPORT_SEM.available_permits() == 0 +} -// if !ignore_metadata { -// if let Some(captures) = -// RE_EPISODE_FILENAME.captures(abspath.file_stem().unwrap().to_str().unwrap()) -// { -// node.public.index = captures.get(4).and_then(|a| a.as_str().parse().ok()); -// if let Some(title) = captures.get(6) { -// node.public.title = Some(title.as_str().to_string()); -// } -// } -// node.public.title = metadata.title; -// node.public.description = metadata.description; -// node.public.tagline = metadata.tagline; -// } -// node.public.media = Some(MediaInfo { -// duration: metadata.duration, -// tracks: metadata.tracks, -// chapters: if ignore_chapters { -// vec![] -// } else { -// metadata.chapters -// }, -// }); -// node.private.source = Some( -// metadata -// .track_sources -// .into_iter() -// .map(|mut ts| { -// ts.path = mpath.to_owned(); -// TrackSource::Local(ts) -// }) -// .collect(), -// ); +pub async fn import_wrap(db: Database) -> Result<()> { + let _sem = IMPORT_SEM.try_acquire()?; -// if !ignore_attachments { -// if let Some((filename, data)) = metadata.cover { -// node.public.poster = Some( -// AssetInner::Cache( -// async_cache_file( -// &["att-cover", mpath.to_str().unwrap(), &filename], -// |mut f| async move { -// f.write_all(&data).await?; -// Ok(()) -// }, -// ) -// .await?, -// ) -// .ser(), -// ) -// }; + let jh = spawn_blocking(move || { + let errs = match import(&db) { + Err(e) => vec![format!("{e:#}")], + Ok(e) => e, + }; + *IMPORT_ERRORS.blocking_write() = errs; + }); -// if let Some(infojson) = metadata.infojson { -// let infojson: infojson::YVideo = -// serde_json::from_str(&infojson).context("parsing infojson")?; + let _ = jh.await; -// node.public.kind = Some( -// if infojson.duration.unwrap_or(0.) < 120. -// && infojson.aspect_ratio.unwrap_or(2.) < 1. -// { -// NodeKind::ShortFormVideo -// } else { -// NodeKind::Video -// }, -// ); -// node.public.title = Some(infojson.title); -// node.public.description = Some(infojson.description); -// node.public.tagline = Some(infojson.webpage_url); -// node.public -// .ratings -// .insert(Rating::YoutubeViews, infojson.view_count as f64); -// node.public.release_date = Some( -// infojson::parse_upload_date(&infojson.upload_date) -// .context("parsing upload date")?, -// ); -// node.public.ratings.extend( -// infojson -// .like_count -// .map(|l| (Rating::YoutubeLikes, l as f64)), -// ); -// } -// } -// drop(_permit); -// db.add_partial_node(&id, index_path, node)?; -// } else { -// warn!("non file/dir import ignored: {abspath:?}") -// } -// } -// ImportSource::Federated { host } => { -// info!("federated import of {id:?} from {host:?}"); -// let session = fed.get_session(&host).await.context("creating session")?; + Ok(()) +} -// import_remote(id.clone(), &host, db, &session, index_path) -// .await -// .context("federated import")? -// } -// ImportSource::AutoChildren { path: cpath } => { -// info!("auto children at {path:?}"); -// let paths = cpath -// .unwrap_or_else(|| path.parent().unwrap().to_path_buf()) -// .read_dir()? -// .map(Result::unwrap) -// .map(|e| e.path()) -// .filter(|e| { -// e.extension() == Some(OsStr::new("yaml")) -// || e.extension() == Some(OsStr::new("jelly")) -// }); +fn import(db: &Database) -> Result<Vec<String>> { + let mut queue = VecDeque::from_iter(Some(CONF.media_path.clone())); + let mut errors = Vec::new(); -// let mut children = Vec::new(); -// for p in paths { -// let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { -// serde_json::from_reader(File::open(&p)?)? -// } else { -// serde_yaml::from_reader(File::open(&p)?)? -// }; -// if opts.id != id { -// children.push(opts.id); -// } -// } -// db.add_partial_node( -// &id, -// index_path, -// Node { -// private: NodePrivate::default(), -// public: NodePublic { -// children, -// ..Default::default() -// }, -// }, -// )?; -// } -// } -// Ok(errors) -// } + let apis = Apis { + trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), + tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), + }; -// pub fn infer_id_from_path(path: &Path) -> anyhow::Result<String> { -// let f = path -// .file_stem() -// .ok_or(anyhow!("no filename"))? -// .to_str() -// .ok_or(anyhow!("non utf8 filename"))?; + while let Some(path) = queue.pop_front() { + if path.is_dir() { + for e in path.read_dir()? { + queue.push_back(e?.path()); + } + } + if path.is_file() { + if let Err(e) = import_file(db, &path).context(anyhow!("{path:?}")) { + errors.push(format!("{e:#}")); + } + } + } + Ok(errors) +} -// if let Some(mat) = RE_YOUTUBE_ID.captures(f) { -// let id = mat.get(1).unwrap().as_str(); -// return Ok(format!("youtube-{id}")); -// } +fn import_file(db: &Database, path: &Path) -> Result<()> { + let filename = path.file_stem().unwrap().to_string_lossy(); + match filename.as_ref() { + "poster" => (), + _ => (), + } -// let mut fsan = String::with_capacity(f.len()); -// for c in f.chars() { -// fsan.extend(match c { -// 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), -// ' ' => Some('-'), -// _ => None, -// }); -// } -// Ok(fsan) -// } + let mut magic = [0; 4]; + File::open(path)?.read_exact(&mut magic).ok(); + if matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { + import_media_file(db, path).context("media file")?; + } -// fn merge_node(x: Node, y: Node) -> anyhow::Result<Node> { -// let (media, source) = match ( -// x.public.media, -// y.public.media, -// x.private.source, -// y.private.source, -// ) { -// (Some(x), Some(y), Some(sx), Some(sy)) => { -// let k = merge_media(x, y, sx, sy); -// (Some(k.0), Some(k.1)) -// } -// (Some(x), None, Some(sx), None) => (Some(x), Some(sx)), -// (None, Some(y), None, Some(sy)) => (Some(y), Some(sy)), -// (None, None, None, None) => (None, None), -// _ => bail!("invalid node. source and media dont agree."), -// }; -// Ok(Node { -// public: NodePublic { -// kind: x.public.kind.or(y.public.kind), -// title: x.public.title.or(y.public.title), -// subtitle: x.public.subtitle.or(y.public.subtitle), -// id: x.public.id.or(y.public.id), -// path: vec![], -// children: merge_children(x.public.children, y.public.children), -// tagline: x.public.tagline.or(y.public.tagline), -// description: x.public.description.or(y.public.description), -// release_date: x.public.release_date.or(y.public.release_date), -// index: x.public.index.or(y.public.index), -// media, -// ratings: x -// .public -// .ratings -// .into_iter() -// .chain(y.public.ratings) -// .collect(), -// federated: None, -// poster: x.public.poster.or(y.public.poster), -// backdrop: x.public.backdrop.or(y.public.backdrop), -// }, -// private: NodePrivate { -// id: x.private.id.or(y.private.id), -// source, -// backdrop: None, -// poster: None, -// }, -// }) -// } + Ok(()) +} -// fn merge_children(mut a: Vec<String>, b: Vec<String>) -> Vec<String> { -// let acont = HashSet::<_, RandomState>::from_iter(a.clone()); -// for el in b { -// if !acont.contains(&el) { -// a.push(el) -// } -// } -// a -// } -// fn merge_media( -// x: MediaInfo, -// y: MediaInfo, -// sx: Vec<TrackSource>, -// sy: Vec<TrackSource>, -// ) -> (MediaInfo, Vec<TrackSource>) { -// let mut tracks: Vec<SourceTrack> = Vec::new(); -// let mut source: Vec<TrackSource> = Vec::new(); -// for (t, s) in x -// .tracks -// .into_iter() -// .zip(sx.into_iter()) -// .chain(y.tracks.into_iter().zip(sy.into_iter())) -// { -// let mut remove = None; -// let mut skip = false; -// for (i, ot) in tracks.iter().enumerate() { -// if t.name == ot.name -// && t.kind == ot.kind -// && t.language == ot.language -// && t.codec == ot.codec -// { -// if t.federated.len() < ot.federated.len() { -// remove = Some(i); -// } else { -// skip = true; -// } -// } -// } -// if let Some(r) = remove { -// tracks.swap_remove(r); -// source.swap_remove(r); -// } -// if !skip { -// tracks.push(t); -// source.push(s); -// } -// } -// ( -// MediaInfo { -// duration: x.duration * 0.5 + y.duration * 0.5, -// tracks, -// chapters: if x.chapters.len() > y.chapters.len() { -// x.chapters -// } else { -// y.chapters -// }, -// }, -// source, -// ) -// } +fn import_media_file(db: &Database, path: &Path) -> Result<()> { + info!("reading media file {path:?}"); + let mut file = BufReader::new(File::open(path)?); + let mut file = file.by_ref().take(u64::MAX); -// static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); + let (x, mut ebml) = file.read_tag()?; + assert_eq!(x, EL_EBML); + let ebml = Ebml::read(&mut ebml).unwrap(); + assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); + let (x, mut segment) = file.read_tag()?; + assert_eq!(x, EL_SEGMENT); -// #[async_recursion] -// async fn import_remote( -// id: String, -// host: &str, -// db: &impl ImportStorage, -// session: &Arc<Session>, -// index_path: &[usize], -// ) -> anyhow::Result<()> { -// let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); -// info!("loading federated node {id:?}"); + let mut info = None; + let mut infojson = None; + let mut tracks = None; + let mut cover = None; + let mut chapters = None; + let mut tags = None; + loop { + let (x, mut seg) = match segment.read_tag() { + Ok(o) => o, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + }; + match x { + EL_INFO => info = Some(Info::read(&mut seg).context("info")?), + EL_TRACKS => tracks = Some(Tracks::read(&mut seg).context("tracks")?), + EL_CHAPTERS => chapters = Some(Chapters::read(&mut seg).context("chapters")?), + EL_TAGS => tags = Some(Tags::read(&mut seg).context("tags")?), + EL_ATTACHMENTS => { + let attachments = Attachments::read(&mut seg).context("attachments")?; + for f in attachments.files { + match f.name.as_str() { + "info.json" => { + infojson = Some( + serde_json::from_slice::<infojson::YVideo>(&f.data) + .context("infojson")?, + ); + } + "cover.webp" => { + cover = Some( + AssetInner::Cache(cache_file( + &["att-cover", path.to_string_lossy().as_ref()], + move |mut file| { + file.write_all(&f.data)?; + Ok(()) + }, + )?) + .ser(), + ) + } + a => println!("{a:?}"), + } + } + } -// let mut node = session.node(&id).await.context("fetching remote node")?; -// let mut node_ext = session -// .node_extended(&id) -// .await -// .context("fetching extended remote node")?; + EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { + seg.consume()?; + } + EL_CLUSTER => { + break; + } + id => { + eprintln!("unknown top-level element {id:x}"); + seg.consume()?; + } + } + } -// let track_sources = if let Some(media) = &mut node.media { -// let mut track_sources = Vec::new(); -// for (i, t) in media.tracks.iter_mut().enumerate() { -// t.federated.push(host.to_owned()); -// track_sources.push(TrackSource::Remote(i)) -// } -// Some(track_sources) -// } else { -// None -// }; + let info = info.ok_or(anyhow!("no info"))?; + let tracks = tracks.ok_or(anyhow!("no tracks"))?; -// drop(_permit); + let mut tags = tags + .map(|tags| { + tags.tags + .into_iter() + .flat_map(|t| t.simple_tags) + .map(|st| (st.name, st.string.unwrap_or_default())) + .collect::<HashMap<_, _>>() + }) + .unwrap_or_default(); -// let mut node = Node { -// public: node.clone(), -// private: NodePrivate { -// backdrop: None, -// poster: None, -// id: None, -// source: track_sources, -// }, -// }; -// make_opt_asset_federated(host, &mut node.public.backdrop)?; -// make_opt_asset_federated(host, &mut node.public.poster)?; -// for g in node_ext.people.values_mut() { -// for a in g { -// make_opt_asset_federated(host, &mut a.person.headshot)?; -// } -// } + let filepath_stem = path + .file_stem() + .ok_or(anyhow!("no file stem"))? + .to_string_lossy() + .to_string(); -// debug!("adding {id}"); -// db.add_partial_node(&id, index_path, node.clone())?; -// db.add_partial_node_ext(&id, index_path, node_ext)?; + let slug = infojson + .as_ref() + .map(|ij| ij.id.to_owned()) + .unwrap_or(make_kebab(&filepath_stem)); -// let mut children: FuturesUnordered<_> = node -// .public -// .children -// .iter() -// .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) -// .collect(); + db.update_node_init(NodeID::from_slug(&slug), |node| { + node.slug = slug; + node.title = info.title; + node.poster = cover; + node.description = tags.remove("DESCRIPTION"); + node.tagline = tags.remove("COMMENT"); + if let Some(infojson) = infojson { + node.kind = Some( + if infojson.duration.unwrap_or(0.) < 600. + && infojson.aspect_ratio.unwrap_or(2.) < 1. + { + NodeKind::ShortFormVideo + } else { + NodeKind::Video + }, + ); + node.title = Some(infojson.title); + node.description = Some(infojson.description); + node.tagline = Some(infojson.webpage_url); + node.release_date = Some( + infojson::parse_upload_date(&infojson.upload_date) + .context("parsing upload date")?, + ); + node.ratings + .insert(Rating::YoutubeViews, infojson.view_count as f64); + if let Some(lc) = infojson.like_count { + node.ratings.insert(Rating::YoutubeLikes, lc as f64); + } + } + node.media = Some(MediaInfo { + chapters: chapters + .map(|c| { + let mut chaps = Vec::new(); + if let Some(ee) = c.edition_entries.first() { + for ca in &ee.chapter_atoms { + let mut labels = Vec::new(); + for cd in &ca.displays { + for lang in &cd.languages { + labels.push((lang.to_owned(), cd.string.clone())) + } + } + chaps.push(Chapter { + labels, + time_start: Some(ca.time_start as f64 * 1e-9), + time_end: ca.time_end.map(|ts| ts as f64 * 1e-9), + }) + } + } + chaps + }) + .unwrap_or_default(), + duration: (info.duration.unwrap_or_default() * info.timestamp_scale as f64) * 1e-9, + tracks: tracks + .entries + .into_iter() + .map(|track| SourceTrack { + codec: track.codec_id, + language: track.language, + name: track.name.unwrap_or_default(), + default_duration: track.default_duration, + federated: Vec::new(), + kind: if let Some(video) = track.video { + SourceTrackKind::Video { + width: video.pixel_width, + height: video.pixel_height, + display_width: video.display_width, + display_height: video.display_height, + display_unit: Some(video.display_unit), + fps: video.frame_rate, + } + } else if let Some(audio) = track.audio { + SourceTrackKind::Audio { + channels: audio.channels as usize, + sample_rate: audio.sampling_frequency, + bit_depth: audio.bit_depth.map(|r| r as usize), + } + } else { + SourceTrackKind::Subtitles + }, + source: TrackSource::Local(LocalTrack { + codec_private: track.codec_private, + path: path.to_owned(), + track: track.track_number as usize, + }), + }) + .collect(), + }); -// while let Some(r) = children.next().await { -// r?; -// } + Ok(()) + })?; -// Ok(()) -// } + Ok(()) +} -// pub fn make_opt_asset_federated(host: &str, p: &mut Option<Asset>) -> anyhow::Result<()> { -// if let Some(a) = p { -// make_asset_federated(host, a)? -// } -// Ok(()) -// } -// pub fn make_asset_federated(host: &str, p: &mut Asset) -> anyhow::Result<()> { -// let data = base64::engine::general_purpose::URL_SAFE.decode(&p.0)?; -// *p = AssetInner::Federated { -// host: host.to_owned(), -// asset: data, -// } -// .ser(); -// Ok(()) -// } +fn make_kebab(i: &str) -> String { + let mut o = String::with_capacity(i.len()); + for c in i.chars() { + o.extend(match c { + 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), + ' ' => Some('-'), + _ => None, + }); + } + o +} diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index 98bde38..93a5c88 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -29,10 +29,7 @@ use ui::{ settings::{r_account_settings, r_account_settings_post}, }, admin::{ - log::r_admin_log, - r_admin_dashboard, r_admin_delete_cache, r_admin_invite, r_admin_remove_invite, - r_admin_transcode_posters, - user::{r_admin_remove_user, r_admin_user, r_admin_user_permission, r_admin_users}, + log::r_admin_log, r_admin_dashboard, r_admin_delete_cache, r_admin_import, r_admin_invite, r_admin_remove_invite, r_admin_transcode_posters, user::{r_admin_remove_user, r_admin_user, r_admin_user_permission, r_admin_users} }, assets::{r_asset, r_item_backdrop, r_item_poster, r_node_thumbnail, r_person_asset}, browser::r_all_items_filter, @@ -133,6 +130,7 @@ pub fn build_rocket(database: Database, federation: Federation) -> Rocket<Build> r_admin_delete_cache, r_admin_transcode_posters, r_admin_log, + r_admin_import, r_account_settings, r_account_settings_post, r_api_version, diff --git a/server/src/routes/ui/admin/mod.rs b/server/src/routes/ui/admin/mod.rs index 160999b..2993be0 100644 --- a/server/src/routes/ui/admin/mod.rs +++ b/server/src/routes/ui/admin/mod.rs @@ -17,7 +17,8 @@ use crate::{ uri, }; use anyhow::{anyhow, Context}; -use jellybase::CONF; +use jellybase::{federation::Federation, CONF}; +use jellyimport::{import_wrap, is_importing, IMPORT_ERRORS}; use markup::DynRender; use rand::Rng; use rocket::{form::Form, get, post, FromForm, State}; @@ -40,7 +41,7 @@ pub async fn admin_dashboard<'a>( let invites = database.list_invites()?; let flash = flash.map(|f| f.map_err(|e| format!("{e:?}"))); - // let last_import_err = IMPORT_ERRORS.read().await.to_owned(); + let last_import_err = IMPORT_ERRORS.read().await.to_owned(); let database = database.to_owned(); Ok(LayoutPage { @@ -48,28 +49,28 @@ pub async fn admin_dashboard<'a>( content: markup::new! { h1 { "Admin Panel" } @FlashDisplay { flash: flash.clone() } - // @if !last_import_err.is_empty() { - // section.message.error { - // p.error {"The last import resulted in at least one error:"} - // ol { @for e in &last_import_err { - // li.error { pre.error { @e } } - // }} - // } - // } + @if !last_import_err.is_empty() { + section.message.error { + p.error {"The last import resulted in at least one error:"} + ol { @for e in &last_import_err { + li.error { pre.error { @e } } + }} + } + } ul { li{a[href=uri!(r_admin_log(true))] { "Server Log (Warnings only)" }} li{a[href=uri!(r_admin_log(false))] { "Server Log (Full) " }} } h2 { "Library" } - // @if is_importing() { - // section.message { p.warn { "An import is currently running." } } - // } + @if is_importing() { + section.message { p.warn { "An import is currently running." } } + } @if is_transcoding() { section.message { p.warn { "Currently transcoding posters." } } } - // form[method="POST", action=uri!(r_admin_import())] { - // input[type="submit", disabled=is_importing(), value="(Re-)Import Library"]; - // } + form[method="POST", action=uri!(r_admin_import())] { + input[type="submit", disabled=is_importing(), value="(Re-)Import Library"]; + } form[method="POST", action=uri!(r_admin_transcode_posters())] { input[type="submit", disabled=is_transcoding(), value="Transcode all posters with low resolution"]; } @@ -130,24 +131,24 @@ pub async fn r_admin_remove_invite( admin_dashboard(database, Some(Ok("Invite invalidated".into()))).await } -// #[post("/admin/import")] -// pub async fn r_admin_import( -// session: AdminSession, -// database: &State<DataAcid>, -// federation: &State<Federation>, -// ) -> MyResult<DynLayoutPage<'static>> { -// drop(session); -// let t = Instant::now(); -// let r = import(database, federation).await; -// admin_dashboard( -// database, -// Some( -// r.map_err(|e| e.into()) -// .map(|_| format!("Import successful; took {:?}", t.elapsed())), -// ), -// ) -// .await -// } +#[post("/admin/import")] +pub async fn r_admin_import( + session: AdminSession, + database: &State<Database>, + _federation: &State<Federation>, +) -> MyResult<DynLayoutPage<'static>> { + drop(session); + let t = Instant::now(); + let r = import_wrap((*database).clone()).await; + admin_dashboard( + database, + Some( + r.map_err(|e| e.into()) + .map(|_| format!("Import successful; took {:?}", t.elapsed())), + ), + ) + .await +} #[post("/admin/delete_cache")] pub async fn r_admin_delete_cache( diff --git a/server/src/routes/ui/node.rs b/server/src/routes/ui/node.rs index 5cc8a2f..3332483 100644 --- a/server/src/routes/ui/node.rs +++ b/server/src/routes/ui/node.rs @@ -298,6 +298,8 @@ pub fn aspect_class(kind: NodeKind) -> &'static str { pub fn format_duration(mut d: f64) -> String { let mut s = String::new(); + let sign = if d > 0. { "" } else { "-" }; + d = d.abs(); for (unit, k) in [("h", 60. * 60.), ("m", 60.), ("s", 1.)] { let mut h = 0; // TODO dont iterate like that. can be a simple rem and div @@ -309,7 +311,7 @@ pub fn format_duration(mut d: f64) -> String { s += &format!("{h}{unit}") } } - s + format!("{sign}{s}") } pub trait DatabaseNodeUserDataExt { diff --git a/web/script/player/mod.ts b/web/script/player/mod.ts index 093f806..97fd605 100644 --- a/web/script/player/mod.ts +++ b/web/script/player/mod.ts @@ -15,6 +15,7 @@ import { Playersync, playersync_controls } from "./sync.ts" globalThis.addEventListener("DOMContentLoaded", () => { if (document.body.classList.contains("player")) { + if (globalThis.location.search.search("nojsp") != -1) return if (!globalThis.MediaSource) return alert("Media Source Extension API required") const node_id = globalThis.location.pathname.split("/")[2]; const main = document.getElementById("main")!; @@ -160,7 +161,7 @@ function initialize_player(el: HTMLElement, node_id: string) { playersync_controls(sync_state, player), e("button", "Launch Native Player", { onclick: () => { - window.location.href = `?kind=nativefullscreen&t=${player.position.value}` + globalThis.location.href = `?kind=nativefullscreen&t=${player.position.value}` } }) )) @@ -255,7 +256,7 @@ function initialize_player(el: HTMLElement, node_id: string) { else if (k.code == "Space") toggle_playing() else if (k.code == "KeyP") toggle_playing() else if (k.code == "KeyF") toggle_fullscreen() - else if (k.code == "KeyQ") window.history.back() + else if (k.code == "KeyQ") globalThis.history.back() else if (k.code == "KeyS") screenshot_video(player.video) else if (k.code == "KeyJ") step_track_kind("subtitles") else if (k.code == "KeyM") toggle_mute() @@ -289,7 +290,7 @@ function screenshot_video(video: HTMLVideoElement) { if (!blob) throw new Error("failed to create blob"); const a = document.createElement("a"); a.download = "screenshot.webp"; - a.href = window.URL.createObjectURL(blob) + a.href = globalThis.URL.createObjectURL(blob) a.click() setTimeout(() => URL.revokeObjectURL(a.href), 0) }, "image/webp", 0.95) |