From 80e545d06c4a0f0841d4b40e3aff479ef8d864f9 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 22 Dec 2023 15:16:58 +0100 Subject: rework import system pt. 5: local import and playback working again --- common/src/lib.rs | 8 +++--- import/src/lib.rs | 71 ++++++++++++++++++++++++++++++++------------------ remuxer/src/extract.rs | 2 +- remuxer/src/remux.rs | 2 +- remuxer/src/snippet.rs | 27 +++++++++---------- stream/src/hls.rs | 8 +++--- stream/src/jhls.rs | 13 +++++---- stream/src/lib.rs | 52 ++++++++++++++++++++++-------------- stream/src/segment.rs | 11 +++++--- stream/src/webvtt.rs | 7 ++--- 10 files changed, 119 insertions(+), 82 deletions(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index e348d21..73eb860 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -63,14 +63,16 @@ pub struct ImportOptions { #[serde(rename_all = "snake_case")] pub enum ImportSource { Override(Node), - Tmdb(u64), + Tmdb { + id: u64, + }, AutoChildren { path: Option, }, Media { location: AssetLocation, - ignore_attachments: bool, - ignore_metadata: bool, + #[serde(default)] ignore_attachments: bool, + #[serde(default)] ignore_metadata: bool, // TODO all ignore options }, Federated { diff --git a/import/src/lib.rs b/import/src/lib.rs index 6639789..e69e203 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -11,11 +11,15 @@ use anyhow::{anyhow, Context, Ok}; use async_recursion::async_recursion; use futures::{stream::FuturesUnordered, StreamExt}; use jellybase::{ - cache::async_cache_file, database::Database, federation::Federation, AssetLocationExt, CONF, + cache::{async_cache_file, cache_file}, + database::Database, + federation::Federation, + AssetLocationExt, CONF, }; use jellyclient::Session; use jellycommon::{ - AssetLocation, AssetRole, ImportOptions, ImportSource, Node, NodePrivate, NodePublic, + AssetLocation, AssetRole, ImportOptions, ImportSource, MediaInfo, Node, NodePrivate, + NodePublic, TrackSource, }; use jellymatroska::read::EbmlReader; use jellyremuxer::import::import_metadata; @@ -24,12 +28,12 @@ use std::{ cmp::Ordering, ffi::OsStr, fs::File, - io::BufReader, + io::{BufReader, Write}, os::unix::prelude::OsStrExt, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; -use tokio::{sync::Semaphore, task::spawn_blocking}; +use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); @@ -179,45 +183,62 @@ async fn process_source( }; match s { ImportSource::Override(n) => insert_node(&id, n)?, - ImportSource::Tmdb(_) => todo!(), + ImportSource::Tmdb { id } => { + todo!() + } ImportSource::Media { location, .. } => { // TODO use ignore options let media_path = location.path(); let metadata = spawn_blocking(move || { - let input = BufReader::new(File::open(&media_path).unwrap()); + let input = + BufReader::new(File::open(&location.path()).context("opening media file")?); let mut input = EbmlReader::new(input); import_metadata(&mut input) }) .await??; - // if let Some(cover) = metadata.cover { - // let pu = path.join(format!( - // "cover.{}", - // match mime.as_str() { - // "image/webp" => "webp", - // "image/jpeg" => "jpeg", - // "image/png" => "png", - // _ => { - // warn!("unknown mime, just using webp"); - // "webp" - // } - // } - // )); - // if !pu.exists() { - // let mut f = tokio::fs::File::create(&pu).await?; - // f.write_all(&data).await?; - // } - // } + let poster = if let Some((filename, data)) = metadata.cover { + Some( + async_cache_file( + &[media_path.to_str().unwrap(), &filename], + |mut f| async move { + f.write_all(&data).await?; + Ok(()) + }, + ) + .await?, + ) + } else { + None + }; let node = Node { public: NodePublic { title: metadata.title, description: metadata.description, tagline: metadata.tagline, + media: Some(MediaInfo { + chapters: metadata.chapters, + duration: metadata.duration, + tracks: metadata.tracks, + }), + ..Default::default() + }, + private: NodePrivate { + poster, + source: Some( + metadata + .track_sources + .into_iter() + .map(|mut ts| { + ts.path = media_path.to_owned(); + TrackSource::Local(ts) + }) + .collect(), + ), ..Default::default() }, - private: NodePrivate::default(), }; insert_node(&id, node)?; } diff --git a/remuxer/src/extract.rs b/remuxer/src/extract.rs index a948b49..5ec43bb 100644 --- a/remuxer/src/extract.rs +++ b/remuxer/src/extract.rs @@ -20,7 +20,7 @@ pub fn extract_track( let index = get_seek_index(&source_path)?; let index = index .get(&(track_info.track as u64)) - .ok_or(anyhow!("track missing"))?; + .ok_or(anyhow!("track missing 4"))?; let mut out = Vec::new(); for b in &index.blocks { diff --git a/remuxer/src/remux.rs b/remuxer/src/remux.rs index b705572..637d445 100644 --- a/remuxer/src/remux.rs +++ b/remuxer/src/remux.rs @@ -81,7 +81,7 @@ pub fn remux_stream_into( let index = get_seek_index(&source_path)?; let index = index .get(&(private.track as u64)) - .ok_or(anyhow!("track missing"))? + .ok_or(anyhow!("track missing 3"))? .to_owned(); debug!("\t seek index: {} blocks loaded", index.blocks.len()); let reader = EbmlReader::new(BufReader::new(file)); diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs index cd45f8b..0d5c715 100644 --- a/remuxer/src/snippet.rs +++ b/remuxer/src/snippet.rs @@ -24,16 +24,14 @@ const SNIPPET_LENGTH: f64 = 2.; pub fn snippet_index( path_base: &Path, item: &NodePublic, - track_sources: &Vec, - track: usize, + local_track: &LocalTrack, ) -> Result>> { let media_info = item.media.as_ref().unwrap(); - let private = &track_sources[track]; - let source_path = path_base.join(&private.path); + let source_path = path_base.join(&local_track.path); let index = get_seek_index(&source_path)?; let index = index - .get(&(private.track as u64)) - .ok_or(anyhow!("track missing"))?; + .get(&(local_track.track as u64)) + .ok_or(anyhow!("seek index track missing"))?; let average_kf_interval = media_info.duration / index.keyframes.len() as f64; let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize; debug!("average keyframe interval: {average_kf_interval}"); @@ -57,12 +55,12 @@ pub fn write_snippet_into( writer: impl Write, path_base: &Path, item: &NodePublic, - track_sources: Vec, + local_track: &LocalTrack, track: usize, webm: bool, n: usize, ) -> anyhow::Result<()> { - info!("writing snippet {n} of {:?} (track #{track})", item.title); + info!("writing snippet {n} of {:?} (track {track})", item.title); let mut output = EbmlWriter::new(BufWriter::new(writer), 0); let media_info = item.media.as_ref().unwrap(); @@ -71,19 +69,18 @@ pub fn write_snippet_into( .get(track) .ok_or(anyhow!("track not available"))? .to_owned(); - let private = &track_sources[track]; - let source_path = path_base.join(&private.path); + let source_path = path_base.join(&local_track.path); let mapped = 1; info!( "\t- {track} {source_path:?} ({} => {mapped})", - private.track + local_track.track ); info!("\t {}", info); let file = File::open(&source_path).context("opening source file")?; let index = get_seek_index(&source_path)?; let index = index - .get(&(private.track as u64)) - .ok_or(anyhow!("track missing"))? + .get(&(local_track.track as u64)) + .ok_or(anyhow!("track missing 2"))? .to_owned(); debug!("\t seek index: {} blocks loaded", index.blocks.len()); let mut reader = EbmlReader::new(file); @@ -118,10 +115,10 @@ pub fn write_snippet_into( ))?; output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; output.write_tag(&MatroskaTag::Tracks(Master::Collected(vec![ - ebml_track_entry(mapped, &info, private.codec_private.clone()), + ebml_track_entry(mapped, &info, local_track.codec_private.clone()), ])))?; - let mut reader = SegmentExtractIter::new(&mut reader, private.track as u64); + let mut reader = SegmentExtractIter::new(&mut reader, local_track.track as u64); { // TODO this one caused snippets to get dropped MSE for no reason diff --git a/stream/src/hls.rs b/stream/src/hls.rs index ea82fed..74f18b4 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -18,7 +18,7 @@ use tokio::{ pub async fn hls_master_stream( _node: Node, - _track_sources: Vec, + _local_tracks: Vec, spec: StreamSpec, mut b: DuplexStream, ) -> Result<()> { @@ -45,17 +45,15 @@ pub async fn hls_master_stream( pub async fn hls_variant_stream( node: Node, - track_sources: Vec, + local_tracks: Vec, mut spec: StreamSpec, mut b: DuplexStream, ) -> Result<()> { - let track = *spec.tracks.get(0).ok_or(anyhow!("no track"))?; let snips = spawn_blocking(move || { jellyremuxer::snippet::snippet_index( &CONF.library_path, &node.public, - &track_sources, - track, + local_tracks.get(0).ok_or(anyhow!("no track"))?, ) }) .await??; diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs index 5fe2cab..600d945 100644 --- a/stream/src/jhls.rs +++ b/stream/src/jhls.rs @@ -3,23 +3,24 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ -use anyhow::Result; +use anyhow::{anyhow, Result}; use jellybase::{permission::PermissionSetExt, CONF}; use jellycommon::{ jhls::{JhlsMetadata, JhlsTrack}, stream::StreamSpec, user::{PermissionSet, UserPermission}, - LocalTrack, Node, + Node, TrackSource, }; use tokio::io::{AsyncWriteExt, DuplexStream}; pub async fn jhls_stream( node: Node, - track_sources: Vec, + track_sources: &[TrackSource], _spec: StreamSpec, mut b: DuplexStream, perms: &PermissionSet, ) -> Result<()> { + let track_sources = track_sources.to_vec(); let media = node.public.media.clone().unwrap(); let tracks = tokio::task::spawn_blocking(move || { media @@ -38,8 +39,10 @@ pub async fn jhls_stream( match jellyremuxer::snippet::snippet_index( &CONF.library_path, &node.public, - &track_sources, - i, + match &track_sources[i] { + TrackSource::Local(x) => x, + TrackSource::Remote => return Some(Err(anyhow!("das geht nicht"))), + }, ) { Ok(segments) => Some(Ok::<_, anyhow::Error>(JhlsTrack { info: t.to_owned(), diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 1ee0690..ee5c78a 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -15,7 +15,7 @@ use jellybase::{permission::PermissionSetExt, CONF}; use jellycommon::{ stream::{StreamFormat, StreamSpec}, user::{PermissionSet, UserPermission}, - LocalTrack, Node, + LocalTrack, Node, TrackSource, }; use jhls::jhls_stream; use segment::segment_stream; @@ -56,24 +56,38 @@ pub async fn stream( let (a, b) = duplex(4096); // TODO remux of mixed remote and local tracks?! - let track_sources = match node + let track_sources = node .private .source - .as_ref() - .ok_or(anyhow!("node has no media"))? - { - // MediaSource::Local { tracks } => tracks.to_owned(), - _ => bail!("node tracks are not local"), - }; + .to_owned() + .ok_or(anyhow!("node has no media"))?; + + let local_tracks = spec + .tracks + .iter() + .map(|i| { + anyhow::Ok( + match track_sources + .get(*i) + .ok_or(anyhow!("track does not exist"))? + { + TrackSource::Local(t) => t.to_owned(), + TrackSource::Remote => bail!("track is not local"), + }, + ) + }) + .collect::>>()? + .into_iter() + .collect::>(); match spec.format { - StreamFormat::Original => original_stream(track_sources, spec, range, b).await?, - StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?, - StreamFormat::HlsMaster => hls_master_stream(node, track_sources, spec, b).await?, - StreamFormat::HlsVariant => hls_variant_stream(node, track_sources, spec, b).await?, - StreamFormat::Jhls => jhls_stream(node, track_sources, spec, b, perms).await?, - StreamFormat::Segment => segment_stream(node, track_sources, spec, b, perms).await?, - StreamFormat::Webvtt => webvtt_stream(node, track_sources, spec, b).await?, + StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, + StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, + StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, + StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, + StreamFormat::Jhls => jhls_stream(node, &track_sources, spec, b, perms).await?, + StreamFormat::Segment => segment_stream(node, local_tracks, spec, b, perms).await?, + StreamFormat::Webvtt => webvtt_stream(node, local_tracks, spec, b).await?, } Ok(a) @@ -81,7 +95,7 @@ pub async fn stream( async fn remux_stream( node: Node, - track_sources: Vec, + local_tracks: Vec, spec: StreamSpec, range: Range, b: DuplexStream, @@ -94,7 +108,7 @@ async fn remux_stream( range, CONF.library_path.to_owned(), node.public, - track_sources, + local_tracks, spec.tracks, spec.webm.unwrap_or(false), ) @@ -104,7 +118,7 @@ async fn remux_stream( } async fn original_stream( - track_sources: Vec, + local_tracks: Vec, spec: StreamSpec, range: Range, b: DuplexStream, @@ -113,7 +127,7 @@ async fn original_stream( bail!("invalid amout of source \"tracks\". original only allows for exactly one.") } - let source = track_sources[spec.tracks[0]].clone(); + let source = local_tracks[spec.tracks[0]].clone(); let mut file = File::open(CONF.library_path.join(source.path)) .await .context("opening source")?; diff --git a/stream/src/segment.rs b/stream/src/segment.rs index 309da1d..a2553bc 100644 --- a/stream/src/segment.rs +++ b/stream/src/segment.rs @@ -17,7 +17,7 @@ use tokio_util::io::SyncIoBridge; pub async fn segment_stream( node: Node, - track_sources: Vec, + local_tracks: Vec, spec: StreamSpec, mut b: DuplexStream, perms: &PermissionSet, @@ -28,6 +28,11 @@ pub async fn segment_stream( let track = spec.tracks[0]; let n = spec.index.ok_or(anyhow!("segment index missing"))?; + let local_track = local_tracks + .get(0) + .ok_or(anyhow!("track missing"))? + .to_owned(); + if let Some(profile) = spec.profile { perms.assert(&UserPermission::Transcode)?; let location = transcode( @@ -41,7 +46,7 @@ pub async fn segment_stream( SyncIoBridge::new(b), &CONF.library_path, &node.public, - track_sources, + &local_track, track, false, n, @@ -65,7 +70,7 @@ pub async fn segment_stream( b, &CONF.library_path, &node.public, - track_sources, + &local_track, track, spec.webm.unwrap_or(false), n, diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index faf0cd3..b1eed2f 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -12,7 +12,7 @@ use tokio::io::{AsyncWriteExt, DuplexStream}; pub async fn webvtt_stream( node: Node, - track_sources: Vec, + local_tracks: Vec, spec: StreamSpec, mut b: DuplexStream, ) -> Result<()> { @@ -21,10 +21,7 @@ pub async fn webvtt_stream( // TODO should use snippets too? big films take too long... let tracki = *spec.tracks.get(0).ok_or(anyhow!("no track selected"))?; - let local_track = track_sources - .get(tracki) - .ok_or(anyhow!("track does not exist"))? - .clone(); + let local_track = local_tracks.get(0).ok_or(anyhow!("no tracks"))?.clone(); let track = &node.public.media.unwrap().tracks[tracki]; match track.codec.as_str() { -- cgit v1.2.3-70-g09d2