From f0dbf139d8708194d1ff7e887b1dff48ccc166fa Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 28 Feb 2025 13:52:41 +0100 Subject: spec + break things --- Cargo.lock | 2 + client/src/lib.rs | 3 +- common/src/stream.rs | 120 ++++++++++++++++----------------------- common/src/user.rs | 46 ++++----------- doc/api.md | 13 +++-- stream/Cargo.toml | 2 + stream/src/lib.rs | 157 ++++++++++++++++++++++----------------------------- 7 files changed, 143 insertions(+), 200 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed04edd..9c9b347 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1803,7 +1803,9 @@ name = "jellystream" version = "0.1.0" dependencies = [ "anyhow", + "ebml-struct", "jellybase", + "jellymatroska", "jellyremuxer", "jellytranscoder", "log", diff --git a/client/src/lib.rs b/client/src/lib.rs index 96c39b6..1497e45 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -168,7 +168,8 @@ impl Session { "{}/n/{}/stream?{}&{}", self.instance.base(), id, - stream_spec.to_query(), + todo!(), + // stream_spec.to_query(), self.session_param() ) } diff --git a/common/src/stream.rs b/common/src/stream.rs index 3e227e1..46f6abc 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -1,82 +1,60 @@ -use bincode::{Decode, Encode}; /* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -#[cfg(feature = "rocket")] -use rocket::{FromForm, FromFormField, UriDisplayQuery}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "rocket", derive(FromForm, UriDisplayQuery))] -pub struct StreamSpec { - pub track: Vec, - pub format: StreamFormat, - pub webm: Option, - pub profile: Option, - pub index: Option, +pub enum StreamSpec { + Whep { + track: u64, + seek: u64, + }, + WhepControl { + token: String, + }, + Remux { + track: Vec, + container: StreamContainer, + }, + Original { + track: u64, + }, + HlsSuperMultiVariant { + container: StreamContainer, + }, + HlsMultiVariant { + segment: u64, + container: StreamContainer, + }, + HlsVariant { + segment: u64, + track: u64, + container: StreamContainer, + format: usize, + }, + Info { + segment: Option, + }, + FragmentIndex { + segment: u64, + track: u64, + }, + Fragment { + segment: u64, + track: u64, + index: u64, + container: StreamContainer, + format: usize, + }, } -#[rustfmt::skip] -#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Hash, Encode, Decode)] -#[serde(rename_all = "snake_case")] -#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))] -pub enum StreamFormat { - #[cfg_attr(feature = "rocket", field(value = "original"))] Original, - #[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska, - #[cfg_attr(feature = "rocket", field(value = "hlsmaster"))] HlsMaster, - #[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant, - #[cfg_attr(feature = "rocket", field(value = "jhlsi"))] JhlsIndex, - #[cfg_attr(feature = "rocket", field(value = "frag"))] Fragment, - #[cfg_attr(feature = "rocket", field(value = "webvtt"))] Webvtt, - #[cfg_attr(feature = "rocket", field(value = "jvtt"))] Jvtt, -} - -impl Default for StreamSpec { - fn default() -> Self { - Self { - track: Vec::new(), - format: StreamFormat::Matroska, - webm: Some(true), - profile: None, - index: None, - } - } -} - -impl StreamSpec { - pub fn to_query(&self) -> String { - use std::fmt::Write; - let mut u = String::new(); - write!(u, "format={}", self.format.ident()).unwrap(); - for t in &self.track { - write!(u, "&track={}", t).unwrap(); - } - if let Some(profile) = self.profile { - write!(u, "&profile={profile}").unwrap(); - } - if let Some(index) = self.index { - write!(u, "&index={index}").unwrap(); - } - if let Some(webm) = self.webm { - write!(u, "&webm={webm}").unwrap(); - } - u - } -} - -impl StreamFormat { - pub fn ident(&self) -> &'static str { - match self { - StreamFormat::Jvtt => "jvtt", - StreamFormat::Original => "original", - StreamFormat::Matroska => "matroska", - StreamFormat::HlsMaster => "hlsmaster", - StreamFormat::HlsVariant => "hlsvariant", - StreamFormat::JhlsIndex => "jhlsi", - StreamFormat::Fragment => "frag", - StreamFormat::Webvtt => "webvtt", - } - } +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum StreamContainer { + WebM, + Matroska, + WebVTT, + JVTT, } diff --git a/common/src/user.rs b/common/src/user.rs index ef78eca..e0e7a0d 100644 --- a/common/src/user.rs +++ b/common/src/user.rs @@ -3,7 +3,6 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use crate::{stream::StreamFormat, user}; use bincode::{Decode, Encode}; #[cfg(feature = "rocket")] use rocket::{FromFormField, UriDisplayQuery}; @@ -99,7 +98,6 @@ pub enum UserPermission { ManageSelf, AccessNode(String), - StreamFormat(StreamFormat), Transcode, FederatedContent, } @@ -107,33 +105,11 @@ pub enum UserPermission { impl UserPermission { pub const ALL_ENUMERABLE: &'static [UserPermission] = { use UserPermission::*; - &[ - Admin, - Transcode, - ManageSelf, - FederatedContent, - StreamFormat(user::StreamFormat::Original), - ] + &[Admin, Transcode, ManageSelf, FederatedContent] }; pub fn default_value(&self) -> bool { - use user::StreamFormat::*; use UserPermission::*; - matches!( - self, - Transcode - | ManageSelf - | FederatedContent - | StreamFormat( - JhlsIndex - | Jvtt - | HlsMaster - | HlsVariant - | Matroska - | Fragment - | Webvtt - | Original // TODO remove later - ) - ) + matches!(self, Transcode | ManageSelf | FederatedContent) } } @@ -143,15 +119,15 @@ impl Display for UserPermission { UserPermission::ManageSelf => "manage self (password, display name, etc.)".to_string(), UserPermission::FederatedContent => "access to federated content".to_string(), UserPermission::Admin => "administrative rights".to_string(), - UserPermission::StreamFormat(StreamFormat::Original) => { - "downloading the original media".to_string() - } - UserPermission::StreamFormat(StreamFormat::Matroska) => { - "downloading a remuxed WebM/Matroska version of the media ".to_string() - } - UserPermission::StreamFormat(x) => { - format!("downloading media via {x:?}") - } + // UserPermission::StreamFormat(StreamFormat::Original) => { + // "downloading the original media".to_string() + // } + // UserPermission::StreamFormat(StreamFormat::Matroska) => { + // "downloading a remuxed WebM/Matroska version of the media ".to_string() + // } + // UserPermission::StreamFormat(x) => { + // format!("downloading media via {x:?}") + // } UserPermission::Transcode => "transcoding".to_string(), // UserPermission::ManageUsers => format!("management of all users"), // UserPermission::GenerateInvite => format!("inviting new users"), diff --git a/doc/api.md b/doc/api.md index 5713df7..b24fd46 100644 --- a/doc/api.md +++ b/doc/api.md @@ -111,19 +111,19 @@ federation. by the server. - `?whepcontrol&` - WebSocket endpoint for controlling WHEP playback. TODO schema -- `?remux&&` -- `?hlssupermultivariant&` +- `?remux&&` +- `?hlssupermultivariant&` - Returns m3u8/HLS playlist of all known multi-variant playlists, one for each segment. The plylist is updated for live media. -- `?hlsmultivariant&&` +- `?hlsmultivariant&&` - Returns m3u8/HLS playlist of all track formats' variant playlists. -- `?hlsvariant&&&&` +- `?hlsvariant&&&&` - Returns m3u8/HLS playlist of all known fragments of this track format. The playlist is updated for live media. - `?info&` - Returns JSON `SegmentInfo` if a segment index is provided, else `MediaInfo` - `?fragmentindex&&` -- `?fragment&&&&&` +- `?fragment&&&&&` ```ts interface MediaInfo { @@ -147,10 +147,13 @@ interface TrackFormat { bandwidth: number; remux: bool; title?: string; + containers: StreamContainer[]; a_sampling_frequency?: number; a_channels?: number; v_resolution_width?: number; av_bit_depth?: number; } +type FragmentIndex = number[]; +type StreamContainer = "webm" | "matroska" | "webvtt" | "jvtt"; ``` diff --git a/stream/Cargo.toml b/stream/Cargo.toml index 36979c9..21d1650 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" jellybase = { path = "../base", features = ["rocket"] } jellytranscoder = { path = "../transcoder" } jellyremuxer = { path = "../remuxer" } +jellymatroska = { path = "../matroska" } +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" } log = { workspace = true } anyhow = { workspace = true } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 00338c1..6f31e6b 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -9,93 +9,57 @@ pub mod hls; pub mod jhls; pub mod webvtt; -use anyhow::{anyhow, bail, Context, Result}; -use fragment::fragment_stream; -use hls::{hls_master_stream, hls_variant_stream}; -use jellybase::{ - assetfed::AssetInner, - common::{ - stream::{StreamFormat, StreamSpec}, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, TrackSource, - }, - permission::PermissionSetExt, - CONF, +use anyhow::Result; +use ebml_struct::matroska::{Info, Tracks}; +use jellybase::common::{ + stream::{StreamContainer, StreamSpec}, + LocalTrack, MediaInfo, Node, }; -use jhls::jhls_index; -use std::{io::SeekFrom, ops::Range, sync::Arc}; +use jellymatroska::block::LacingType; +use std::{ops::Range, sync::Arc}; use tokio::{ fs::File, - io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, + io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, }; use tokio_util::io::SyncIoBridge; -use webvtt::vtt_stream; pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } -#[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; - match spec.format { - StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, - StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, - StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, - StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, - StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, - StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false }, - StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false }, + let cons = |ct: &'static str, rs: bool| StreamHead { + content_type: ct, + range_supported: rs, + }; + let container_ct = |x: StreamContainer| match x { + StreamContainer::WebM => "video/webm", + StreamContainer::Matroska => "video/x-matroska", + StreamContainer::WebVTT => "text/vtt", + StreamContainer::JVTT => "application/jellything-vtt+json", + }; + match spec { + StreamSpec::Whep { .. } => cons("application/x-todo", false), + StreamSpec::WhepControl { .. } => cons("application/x-todo", false), + StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), + StreamSpec::Original { .. } => cons("video/x-matroska", true), + StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), + StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), + StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( - node: Arc, + info: Arc, spec: StreamSpec, range: Range, - perms: &PermissionSet, ) -> Result { - perms.assert(&UserPermission::StreamFormat(spec.format))?; - let (a, b) = duplex(4096); - // TODO remux of mixed remote and local tracks?! - let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?; - - let local_tracks = spec - .track - .iter() - .map(|i| { - anyhow::Ok( - match &track_sources - .tracks - .get(*i) - .ok_or(anyhow!("track does not exist"))? - .source - { - TrackSource::Local(t) => AssetInner::deser(&t.0)? - .as_local_track() - .ok_or(anyhow!("asset not a track"))?, - TrackSource::Remote(_) => bail!("track is not local"), - }, - ) - }) - .collect::>>()? - .into_iter() - .collect::>(); - - match spec.format { - StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, - StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, - StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, - StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, - StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, - StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?, - StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?, - StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?, - } - Ok(a) } @@ -108,17 +72,17 @@ async fn remux_stream( ) -> Result<()> { let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - jellyremuxer::remux_stream_into( - b, - range, - CONF.media_path.to_owned(), - &node, - local_tracks, - spec.track, - spec.webm.unwrap_or(false), - ) - }); + // tokio::task::spawn_blocking(move || { + // jellyremuxer::remux_stream_into( + // b, + // range, + // CONF.media_path.to_owned(), + // &node, + // local_tracks, + // spec.track, + // spec.webm.unwrap_or(false), + // ) + // }); Ok(()) } @@ -129,19 +93,19 @@ async fn original_stream( range: Range, b: DuplexStream, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - } + // if spec.track.len() != 1 { + // bail!("invalid amout of source \"tracks\". original only allows for exactly one.") + // } - let source = local_tracks[spec.track[0]].clone(); - let mut file = File::open(CONF.media_path.join(source.path)) - .await - .context("opening source")?; - file.seek(SeekFrom::Start(range.start as u64)) - .await - .context("seek source")?; + // let source = local_tracks[spec.track[0]].clone(); + // let mut file = File::open(CONF.media_path.join(source.path)) + // .await + // .context("opening source")?; + // file.seek(SeekFrom::Start(range.start as u64)) + // .await + // .context("seek source")?; - tokio::task::spawn(copy_stream(file, b, range.end - range.start)); + // tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } @@ -157,3 +121,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> amount -= size; } } + +// TODO functions to test seekability, get live status and enumate segments +trait MediaSource { + /// Seeks to some position close to, but before, `time` ticks. + fn seek(&mut self, time: u64) -> Result<()>; + /// Retrieve headers (info and tracks) for some segment. + fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; + /// Returns the next block and the current segment index + fn next(&mut self) -> Result>; +} +pub struct AbsBlock { + track: u64, + pts: u64, + keyframe: bool, + lacing: Option, + data: Vec, +} -- cgit v1.2.3-70-g09d2 From b9539ee3afbf1440b8628bf0609dc0e24aed116c Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 2 Mar 2025 13:43:59 +0100 Subject: change things --- common/src/stream.rs | 72 +++++++++++++++++++-- server/src/routes/compat/jellyfin/mod.rs | 27 ++++---- stream/src/fragment.rs | 104 +++++++++++++++---------------- stream/src/hls.rs | 42 +++++++++---- stream/src/jhls.rs | 34 +++++----- stream/src/lib.rs | 24 +++++++ stream/src/webvtt.rs | 97 ++++++++++++++++++---------- 7 files changed, 265 insertions(+), 135 deletions(-) diff --git a/common/src/stream.rs b/common/src/stream.rs index 46f6abc..0e8f810 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -4,22 +4,23 @@ Copyright (C) 2025 metamuffin */ use serde::{Deserialize, Serialize}; +use std::fmt::Display; #[derive(Debug, Clone, Deserialize, Serialize)] pub enum StreamSpec { Whep { - track: u64, + track: usize, seek: u64, }, WhepControl { token: String, }, Remux { - track: Vec, + tracks: Vec, container: StreamContainer, }, Original { - track: u64, + track: usize, }, HlsSuperMultiVariant { container: StreamContainer, @@ -30,7 +31,7 @@ pub enum StreamSpec { }, HlsVariant { segment: u64, - track: u64, + track: usize, container: StreamContainer, format: usize, }, @@ -39,11 +40,11 @@ pub enum StreamSpec { }, FragmentIndex { segment: u64, - track: u64, + track: usize, }, Fragment { segment: u64, - track: u64, + track: usize, index: u64, container: StreamContainer, format: usize, @@ -58,3 +59,62 @@ pub enum StreamContainer { WebVTT, JVTT, } + +impl StreamSpec { + pub fn to_query(&self) -> String { + match self { + StreamSpec::Whep { track, seek } => format!("?whep&track={track}&seek={seek}"), + StreamSpec::WhepControl { token } => format!("?whepcontrol&token={token}"), + StreamSpec::Remux { tracks, container } => { + format!( + "?remux&tracks={}&container={container}", + tracks + .iter() + .map(|t| t.to_string()) + .collect::>() + .join(",") + ) + } + StreamSpec::Original { track } => format!("?original&track={track}"), + StreamSpec::HlsSuperMultiVariant { container } => { + format!("?hlssupermultivariant&container={container}") + } + StreamSpec::HlsMultiVariant { segment, container } => { + format!("?hlsmultivariant&segment={segment}&container={container}") + } + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => format!( + "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}" + ), + StreamSpec::Info { + segment: Some(segment), + } => format!("?info&segment={segment}"), + StreamSpec::Info { segment: None } => format!("?info"), + StreamSpec::FragmentIndex { segment, track } => { + format!("?fragmentindex&segment={segment}&track={track}") + } + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => format!("?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}"), + } + } +} + +impl Display for StreamContainer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + StreamContainer::WebM => "webm", + StreamContainer::Matroska => "matroska", + StreamContainer::WebVTT => "webvtt", + StreamContainer::JVTT => "jvtt", + }) + } +} diff --git a/server/src/routes/compat/jellyfin/mod.rs b/server/src/routes/compat/jellyfin/mod.rs index ab36a8c..6066760 100644 --- a/server/src/routes/compat/jellyfin/mod.rs +++ b/server/src/routes/compat/jellyfin/mod.rs @@ -21,7 +21,7 @@ use crate::routes::{ use anyhow::{anyhow, Context}; use jellybase::{database::Database, CONF}; use jellycommon::{ - stream::{StreamFormat, StreamSpec}, + stream::{StreamContainer, StreamSpec}, user::{NodeUserData, WatchedState}, MediaInfo, Node, NodeID, NodeKind, SourceTrack, SourceTrackKind, Visibility, }; @@ -446,16 +446,12 @@ pub fn r_jellyfin_video_stream( .get_node_slug(id)? .ok_or(anyhow!("node does not exist"))?; let media = node.media.as_ref().ok_or(anyhow!("node has no media"))?; - Ok(Redirect::temporary(rocket::uri!(r_stream( - id, - StreamSpec { - format: StreamFormat::Matroska, - webm: Some(true), - track: (0..media.tracks.len()).collect(), - index: None, - profile: None, - } - )))) + let params = StreamSpec::Remux { + tracks: (0..media.tracks.len()).collect(), + container: StreamContainer::WebM, + } + .to_query(); + Ok(Redirect::temporary(format!("/n/{id}/stream{params}"))) } #[derive(Deserialize)] @@ -498,9 +494,7 @@ pub fn r_jellyfin_playback_bitratetest(_session: Session, Size: usize) -> Vec JellyfinItem { location_type: node.media.as_ref().map(|_| "FileSystem".to_owned()), play_access: node.media.as_ref().map(|_| "Full".to_owned()), container: node.media.as_ref().map(|_| "webm".to_owned()), - run_time_ticks: node.media.as_ref().map(|m| (m.duration * 10_000_000.) as i64), + run_time_ticks: node + .media + .as_ref() + .map(|m| (m.duration * 10_000_000.) as i64), media_sources: media_source.as_ref().map(|s| vec![s.clone()]), media_streams: media_source.as_ref().map(|s| s.media_streams.clone()), path: node diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e276d29..a34bb8d 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use jellybase::{ common::{ stream::StreamSpec, @@ -25,64 +25,62 @@ pub async fn fragment_stream( spec: StreamSpec, mut b: DuplexStream, perms: &PermissionSet, + webm: bool, + track: u64, + segment: u64, + index: usize, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("unsupported number of tracks for segment, must be exactly one"); - } - let track = spec.track[0]; - let n = spec.index.ok_or(anyhow!("segment index missing"))?; - let local_track = local_tracks .first() .ok_or(anyhow!("track missing"))? .to_owned(); - if let Some(profile) = spec.profile { - perms.assert(&UserPermission::Transcode)?; - let location = transcode( - &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source - CONF.transcoding_profiles - .get(profile) - .ok_or(anyhow!("profile out of range"))?, - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &CONF.media_path, - &node, - &local_track, - track, - false, - n, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - b, - &CONF.media_path, - &node, - &local_track, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); - } - }); - } + // if let Some(profile) = None { + // perms.assert(&UserPermission::Transcode)?; + // let location = transcode( + // &format!("{track} {index} {:?}", node), // TODO maybe not use the entire source + // CONF.transcoding_profiles + // .get(profile) + // .ok_or(anyhow!("profile out of range"))?, + // move |b| { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &CONF.media_path, + // &node, + // &local_track, + // track as usize, + // false, + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // }, + // ) + // .await?; + // let mut output = File::open(location.abs()).await?; + // tokio::task::spawn(async move { + // if let Err(err) = tokio::io::copy(&mut output, &mut b).await { + // warn!("cannot write stream: {err}") + // } + // }); + // } else { + let b = SyncIoBridge::new(b); + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + b, + &CONF.media_path, + &node, + &local_track, + track as usize, + webm, + index, + ) { + warn!("segment stream error: {err}"); + } + }); + // } Ok(()) } diff --git a/stream/src/hls.rs b/stream/src/hls.rs index dca1036..56edd2d 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Result}; use jellybase::{ common::{ - stream::{StreamFormat, StreamSpec}, + stream::{StreamContainer, StreamSpec}, LocalTrack, Node, SourceTrackKind, }, CONF, @@ -21,7 +21,8 @@ use tokio::{ pub async fn hls_master_stream( node: Arc, _local_tracks: Vec, - _spec: StreamSpec, + segment: u64, + container: StreamContainer, mut b: DuplexStream, ) -> Result<()> { let media = node.media.as_ref().ok_or(anyhow!("no media"))?; @@ -32,10 +33,11 @@ pub async fn hls_master_stream( for (i, t) in media.tracks.iter().enumerate() { let uri = format!( "stream?{}", - StreamSpec { - track: vec![i], - format: StreamFormat::HlsVariant, - ..Default::default() + StreamSpec::HlsVariant { + track: i, + segment, + container, + format: 0 } .to_query() ); @@ -54,14 +56,21 @@ pub async fn hls_master_stream( pub async fn hls_variant_stream( node: Arc, local_tracks: Vec, - mut spec: StreamSpec, + segment: u64, + track: usize, + format: usize, + container: StreamContainer, mut b: DuplexStream, ) -> Result<()> { let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned(); - let track_index = spec.track[0]; let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?; let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, track_index) + jellyremuxer::fragment::fragment_index( + &CONF.media_path, + &node, + &local_track, + track as usize, + ) }) .await??; @@ -72,11 +81,20 @@ pub async fn hls_variant_stream( writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - spec.format = StreamFormat::Fragment; for (i, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; - spec.index = Some(i); - writeln!(out, "stream?{}", spec.to_query())?; + writeln!( + out, + "stream?{}", + StreamSpec::Fragment { + segment, + track, + index: i as u64, + container, + format, + } + .to_query() + )?; } writeln!(out, "#EXT-X-ENDLIST")?; diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs index b222e39..2a2faec 100644 --- a/stream/src/jhls.rs +++ b/stream/src/jhls.rs @@ -24,24 +24,24 @@ pub async fn jhls_index( mut b: DuplexStream, perms: &PermissionSet, ) -> Result<()> { - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + // let local_track = local_tracks + // .first() + // .ok_or(anyhow!("track missing"))? + // .to_owned(); - let fragments = tokio::task::spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) - }) - .await??; + // let fragments = tokio::task::spawn_blocking(move || { + // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) + // }) + // .await??; - let out = serde_json::to_string(&JhlsTrackIndex { - extra_profiles: if perms.check(&UserPermission::Transcode) { - CONF.transcoding_profiles.clone() - } else { - vec![] - }, - fragments, - })?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + // let out = serde_json::to_string(&JhlsTrackIndex { + // extra_profiles: if perms.check(&UserPermission::Transcode) { + // CONF.transcoding_profiles.clone() + // } else { + // vec![] + // }, + // fragments, + // })?; + // tokio::spawn(async move { b.write_all(out.as_bytes()).await }); Ok(()) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 6f31e6b..68b7e44 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -60,6 +60,30 @@ pub async fn stream( ) -> Result { let (a, b) = duplex(4096); + match spec { + StreamSpec::Whep { track, seek } => todo!(), + StreamSpec::WhepControl { token } => todo!(), + StreamSpec::Remux { tracks, container } => todo!(), + StreamSpec::Original { track } => todo!(), + StreamSpec::HlsSuperMultiVariant { container } => todo!(), + StreamSpec::HlsMultiVariant { segment, container } => todo!(), + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => todo!(), + StreamSpec::Info { segment } => todo!(), + StreamSpec::FragmentIndex { segment, track } => todo!(), + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => todo!(), + } + Ok(a) } diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index f78ac2f..fbd6382 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -25,39 +25,72 @@ pub async fn vtt_stream( // TODO should use fragments too? big films take too long... - let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; - let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); - let track = &node.media.as_ref().unwrap().tracks[tracki]; - let cp = local_track.codec_private.clone(); + // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let subtitles = async_cache_memory( - &[ - "vtt", - &format!( - "{} {}", - local_track.path.to_str().unwrap(), - local_track.track - ), - ], - move || async move { - let blocks = tokio::task::spawn_blocking(move || { - extract_track(CONF.media_path.clone(), local_track) - }) - .await??; - let subtitles = parse_subtitles(&track.codec, cp, blocks)?; - Ok(subtitles) - }, - ) - .await?; + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // )spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let output = if json { - serde_json::to_string(subtitles.as_ref())? - } else { - write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) - .context("writing webvtt")? - }; - tokio::task::spawn(async move { - let _ = b.write_all(output.as_bytes()).await; - }); + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // ) + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); Ok(()) } -- cgit v1.2.3-70-g09d2 From 8ee25c9ddd5ba5b6f74f7ec3b212020886e366c1 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 2 Mar 2025 21:01:28 +0100 Subject: a --- common/src/stream.rs | 39 ++++++++++++++- server/src/routes/stream.rs | 111 +++++++++++++++++++++-------------------- server/src/routes/ui/player.rs | 60 ++++++++++------------ 3 files changed, 123 insertions(+), 87 deletions(-) diff --git a/common/src/stream.rs b/common/src/stream.rs index 0e8f810..9a00ce0 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -4,7 +4,7 @@ Copyright (C) 2025 metamuffin */ use serde::{Deserialize, Serialize}; -use std::fmt::Display; +use std::{collections::BTreeMap, fmt::Display, str::FromStr}; #[derive(Debug, Clone, Deserialize, Serialize)] pub enum StreamSpec { @@ -106,6 +106,31 @@ impl StreamSpec { } => format!("?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}"), } } + pub fn from_query_kv(query: &BTreeMap) -> Result { + let get_num = |k: &'static str| { + query + .get(k) + .ok_or(k) + .and_then(|a| a.parse().map_err(|_| "invalid number")) + }; + let get_container = || { + query + .get("container") + .ok_or("container") + .and_then(|s| s.parse().map_err(|()| "unknown container")) + }; + if query.contains_key("fragment") { + Ok(Self::Fragment { + segment: get_num("segment")?, + track: get_num("track")? as usize, + index: get_num("index")?, + container: get_container()?, + format: get_num("format")? as usize, + }) + } else { + Err("invalid stream spec") + } + } } impl Display for StreamContainer { @@ -118,3 +143,15 @@ impl Display for StreamContainer { }) } } +impl FromStr for StreamContainer { + type Err = (); + fn from_str(s: &str) -> Result { + Ok(match s { + "webm" => StreamContainer::WebM, + "matroska" => StreamContainer::Matroska, + "webvtt" => StreamContainer::WebVTT, + "jvtt" => StreamContainer::JVTT, + _ => return Err(()), + }) + } +} diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 1fb136c..d65b346 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -21,7 +21,11 @@ use rocket::{ response::{self, Redirect, Responder}, Either, Request, Response, State, }; -use std::{collections::HashSet, ops::Range}; +use std::{ + collections::{BTreeMap, HashSet}, + ops::Range, + sync::Arc, +}; use tokio::io::{duplex, DuplexStream}; #[head("/n/<_id>/stream?")] @@ -46,65 +50,66 @@ pub async fn r_stream( db: &State, id: &str, range: Option, - spec: StreamSpec, + spec: BTreeMap, ) -> Result, MyError> { // TODO perm let node = db .get_node_slug(id)? .ok_or(anyhow!("node does not exist"))?; - let media = node - .media - .as_ref() - .ok_or(anyhow!("item does not contain media"))?; + let media = Arc::new( + node.media + .clone() + .ok_or(anyhow!("item does not contain media"))?, + ); // TODO its unclear how requests with multiple tracks should be handled. - if spec.track.len() == 1 { - let ti = spec.track[0]; - if let TrackSource::Remote(remote_index) = media.tracks[ti].source { - session - .user - .permissions - .assert(&UserPermission::FederatedContent)?; - - let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; - let host = track - .federated - .last() - .ok_or(anyhow!("federation inconsistent"))?; - - let FederationAccount { - password, username, .. - } = SECRETS - .federation - .get(host) - .ok_or(anyhow!("no credentials on the server-side"))?; - - info!("creating session on {host}"); - let instance = federation.get_instance(host)?.to_owned(); - let session = instance - .login(CreateSessionParams { - username: username.to_owned(), - password: password.to_owned(), - expire: Some(60), - drop_permissions: Some(HashSet::from_iter([ - UserPermission::ManageSelf, - UserPermission::Admin, // in case somebody federated the admin :))) - ])), - }) - .await?; - - let uri = session.stream_url( - node.slug.clone().into(), - &StreamSpec { - track: vec![remote_index], - ..spec - }, - ); - info!("federation redirect"); - return Ok(Either::Right(RedirectResponse(uri))); - } - } + // if spec.track.len() == 1 { + // let ti = spec.track[0]; + // if let TrackSource::Remote(remote_index) = media.tracks[ti].source { + // session + // .user + // .permissions + // .assert(&UserPermission::FederatedContent)?; + + // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; + // let host = track + // .federated + // .last() + // .ok_or(anyhow!("federation inconsistent"))?; + + // let FederationAccount { + // password, username, .. + // } = SECRETS + // .federation + // .get(host) + // .ok_or(anyhow!("no credentials on the server-side"))?; + + // info!("creating session on {host}"); + // let instance = federation.get_instance(host)?.to_owned(); + // let session = instance + // .login(CreateSessionParams { + // username: username.to_owned(), + // password: password.to_owned(), + // expire: Some(60), + // drop_permissions: Some(HashSet::from_iter([ + // UserPermission::ManageSelf, + // UserPermission::Admin, // in case somebody federated the admin :))) + // ])), + // }) + // .await?; + + // let uri = session.stream_url( + // node.slug.clone().into(), + // &StreamSpec { + // track: vec![remote_index], + // ..spec + // }, + // ); + // info!("federation redirect"); + // return Ok(Either::Right(RedirectResponse(uri))); + // } + // } info!( "stream request (range={})", @@ -124,7 +129,7 @@ pub async fn r_stream( let head = jellystream::stream_head(&spec); - match jellystream::stream(node, spec, urange, &session.user.permissions).await { + match jellystream::stream(media, spec, urange, &session.user.permissions).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, range, diff --git a/server/src/routes/ui/player.rs b/server/src/routes/ui/player.rs index 2f28f74..aa567ab 100644 --- a/server/src/routes/ui/player.rs +++ b/server/src/routes/ui/player.rs @@ -9,16 +9,14 @@ use super::{ }; use crate::{ database::Database, - routes::{ - stream::rocket_uri_macro_r_stream, - ui::{assets::rocket_uri_macro_r_item_backdrop, error::MyResult, layout::DynLayoutPage}, + routes::ui::{ + assets::rocket_uri_macro_r_item_backdrop, error::MyResult, layout::DynLayoutPage, }, uri, }; use anyhow::anyhow; use jellybase::{permission::PermissionSetExt, CONF}; use jellycommon::{ - stream::{StreamFormat, StreamSpec}, user::{PermissionSet, PlayerKind, UserPermission}, Node, NodeID, SourceTrackKind, TrackID, }; @@ -47,13 +45,15 @@ impl PlayerConfig { fn jellynative_url(action: &str, seek: f64, secret: &str, node: &str, session: &str) -> String { let protocol = if CONF.tls { "https" } else { "http" }; let host = &CONF.hostname; - let stream_url = uri!(r_stream( - node, - StreamSpec { - format: StreamFormat::HlsMaster, - ..Default::default() - } - )); + let stream_url = ""; + // TODO + // uri!(r_stream( + // node, + // StreamSpec { + // format: StreamFormat::HlsMaster, + // ..Default::default() + // } + // )); format!("jellynative://{action}/{secret}/{session}/{seek}/{protocol}://{host}{stream_url}",) } @@ -67,14 +67,6 @@ pub fn r_player( let node = db.get_node(id)?.ok_or(anyhow!("node does not exist"))?; let native_session = |action: &str| { - let perm = [ - UserPermission::StreamFormat(StreamFormat::HlsMaster), - UserPermission::StreamFormat(StreamFormat::HlsVariant), - UserPermission::StreamFormat(StreamFormat::Fragment), - ]; - for perm in &perm { - sess.user.permissions.assert(perm)?; - } Ok(Either::Right(Redirect::temporary(jellynative_url( action, conf.t.unwrap_or(0.), @@ -82,7 +74,7 @@ pub fn r_player( &id.to_string(), &token::create( sess.user.name, - PermissionSet(perm.map(|e| (e, true)).into()), + PermissionSet::default(), // TODO chrono::Duration::hours(24), ), )))) @@ -98,19 +90,20 @@ pub fn r_player( } } - let spec = StreamSpec { - track: None - .into_iter() - .chain(conf.v) - .chain(conf.a) - .chain(conf.s) - .collect::>(), - format: StreamFormat::Matroska, - webm: Some(true), - ..Default::default() - }; + // TODO + // let spec = StreamSpec { + // track: None + // .into_iter() + // .chain(conf.v) + // .chain(conf.a) + // .chain(conf.s) + // .collect::>(), + // format: StreamFormat::Matroska, + // webm: Some(true), + // ..Default::default() + // }; - let playing = !spec.track.is_empty(); + let playing = false; // !spec.track.is_empty(); let conf = player_conf(node.clone(), playing)?; Ok(Either::Left(LayoutPage { @@ -118,7 +111,8 @@ pub fn r_player( class: Some("player"), content: markup::new! { @if playing { - video[src=uri!(r_stream(&node.slug, &spec)), controls, preload="auto"]{} + // TODO + // video[src=uri!(r_stream(&node.slug, &spec)), controls, preload="auto"]{} } else { img.backdrop[src=uri!(r_item_backdrop(&node.slug, Some(2048))).to_string()]; } -- cgit v1.2.3-70-g09d2 From 80343d02e9e29e4bc55d790b491ce0d0c7bff201 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 3 Mar 2025 16:58:59 +0100 Subject: a --- server/src/routes/stream.rs | 8 +++++--- stream/src/lib.rs | 3 ++- transcoder/src/fragment.rs | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index d65b346..4b3d02e 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -28,12 +28,13 @@ use std::{ }; use tokio::io::{duplex, DuplexStream}; -#[head("/n/<_id>/stream?")] +#[head("/n/<_id>/stream?")] pub async fn r_stream_head( _sess: Session, _id: &str, - spec: StreamSpec, + spec: BTreeMap, ) -> Result, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; let head = jellystream::stream_head(&spec); Ok(Either::Left(StreamResponse { stream: duplex(0).0, @@ -52,6 +53,7 @@ pub async fn r_stream( range: Option, spec: BTreeMap, ) -> Result, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; // TODO perm let node = db .get_node_slug(id)? @@ -129,7 +131,7 @@ pub async fn r_stream( let head = jellystream::stream_head(&spec); - match jellystream::stream(media, spec, urange, &session.user.permissions).await { + match jellystream::stream(media, spec, urange).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, range, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 68b7e44..1f32239 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -148,8 +148,9 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> // TODO functions to test seekability, get live status and enumate segments trait MediaSource { + fn loaded_ranges(&self) -> Result>>; /// Seeks to some position close to, but before, `time` ticks. - fn seek(&mut self, time: u64) -> Result<()>; + fn seek(&mut self, segment: u64, time: u64) -> Result<()>; /// Retrieve headers (info and tracks) for some segment. fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; /// Returns the next block and the current segment index diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 8822fa2..ff6a9db 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -17,7 +17,7 @@ use tokio::{ }; // TODO odd video resolutions can cause errors when transcoding to YUV42{0,2} -// TODO with an implementation that cant handle it (SVT-AV1 such an impl). +// TODO with an implementation that cant handle it (SVT-AV1 is such an impl). pub async fn transcode( key: &str, -- cgit v1.2.3-70-g09d2 From 7acb520f552bd1edde5c29fbf5baf6643ec4b14e Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 6 Apr 2025 15:40:58 +0200 Subject: a bit more progress on new streaming api --- Cargo.lock | 3 +- client/src/lib.rs | 5 +- common/src/stream.rs | 47 +++++++++++++++++- import/Cargo.toml | 5 +- import/src/lib.rs | 3 +- import/src/matroska.rs | 116 -------------------------------------------- remuxer/Cargo.toml | 4 ++ remuxer/src/lib.rs | 1 + remuxer/src/metadata.rs | 116 ++++++++++++++++++++++++++++++++++++++++++++ server/src/routes/stream.rs | 25 +++++++--- stream/src/hls.rs | 2 +- stream/src/lib.rs | 113 +++++++++++++++++++++++++++++++++--------- 12 files changed, 282 insertions(+), 158 deletions(-) delete mode 100644 import/src/matroska.rs create mode 100644 remuxer/src/metadata.rs diff --git a/Cargo.lock b/Cargo.lock index 9c9b347..aabeff6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1760,10 +1760,10 @@ dependencies = [ "base64", "bincode", "crossbeam-channel", - "ebml-struct", "futures", "jellybase", "jellyclient", + "jellyremuxer", "log", "rayon", "regex", @@ -1791,6 +1791,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "ebml-struct", "jellybase", "jellymatroska", "log", diff --git a/client/src/lib.rs b/client/src/lib.rs index 1497e45..d3172fd 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -165,11 +165,10 @@ impl Session { pub fn stream_url(&self, id: NodeIDOrSlug, stream_spec: &StreamSpec) -> String { format!( - "{}/n/{}/stream?{}&{}", + "{}/n/{}/stream{}&{}", self.instance.base(), id, - todo!(), - // stream_spec.to_query(), + stream_spec.to_query(), self.session_param() ) } diff --git a/common/src/stream.rs b/common/src/stream.rs index 9a00ce0..a06dad5 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -51,6 +51,47 @@ pub enum StreamSpec { }, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamInfo { + pub name: Option, + pub segments: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamSegmentInfo { + pub name: Option, + pub duration: u64, + pub tracks: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamTrackInfo { + pub name: Option, + pub kind: TrackKind, + pub formats: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TrackKind { + Video, + Audio, + Subtitle, +} + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct StreamFormatInfo { + pub codec: String, + pub byterate: f64, + pub remux: bool, + pub containers: Vec, + + pub pixel_count: Option, + pub samplerate: Option, + pub channels: Option, + pub bit_depth: Option, +} + #[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum StreamContainer { @@ -119,7 +160,11 @@ impl StreamSpec { .ok_or("container") .and_then(|s| s.parse().map_err(|()| "unknown container")) }; - if query.contains_key("fragment") { + if query.contains_key("info") { + Ok(Self::Info { + segment: get_num("segment").ok(), + }) + } else if query.contains_key("fragment") { Ok(Self::Fragment { segment: get_num("segment")?, track: get_num("track")? as usize, diff --git a/import/Cargo.toml b/import/Cargo.toml index 645326d..37b5a77 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -6,10 +6,7 @@ edition = "2021" [dependencies] jellybase = { path = "../base" } jellyclient = { path = "../client" } - -ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [ - "bincode", -] } +jellyremuxer = { path = "../remuxer" } rayon = "1.10.0" crossbeam-channel = "0.5.14" diff --git a/import/src/lib.rs b/import/src/lib.rs index 3226a0a..d7f9dd7 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -15,8 +15,8 @@ use jellybase::{ CONF, SECRETS, }; use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility}; +use jellyremuxer::metadata::matroska_metadata; use log::info; -use matroska::matroska_metadata; use rayon::iter::{ParallelBridge, ParallelIterator}; use regex::Regex; use std::{ @@ -36,7 +36,6 @@ use tokio::{ use trakt::Trakt; pub mod infojson; -pub mod matroska; pub mod tmdb; pub mod trakt; diff --git a/import/src/matroska.rs b/import/src/matroska.rs deleted file mode 100644 index 1593463..0000000 --- a/import/src/matroska.rs +++ /dev/null @@ -1,116 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ -use anyhow::{Context, Result}; -use bincode::{Decode, Encode}; -use ebml_struct::{ - ids::*, - matroska::*, - read::{EbmlReadExt, TagRead}, -}; -use jellybase::{ - assetfed::AssetInner, - cache::{cache_file, cache_memory}, - common::Asset, -}; -use log::{info, warn}; -use std::{ - fs::File, - io::{BufReader, ErrorKind, Read, Write}, - path::Path, - sync::Arc, -}; - -#[derive(Encode, Decode, Clone)] -pub(crate) struct MatroskaMetadata { - pub info: Option, - pub tracks: Option, - pub cover: Option, - pub chapters: Option, - pub tags: Option, - pub infojson: Option>, -} -pub(crate) fn matroska_metadata(path: &Path) -> Result>> { - cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || { - let mut magic = [0; 4]; - File::open(path)?.read_exact(&mut magic).ok(); - if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { - return Ok(None); - } - - info!("reading {path:?}"); - let mut file = BufReader::new(File::open(path)?); - let mut file = file.by_ref().take(u64::MAX); - - let (x, mut ebml) = file.read_tag()?; - assert_eq!(x, EL_EBML); - let ebml = Ebml::read(&mut ebml).unwrap(); - assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); - let (x, mut segment) = file.read_tag()?; - assert_eq!(x, EL_SEGMENT); - - let mut info = None; - let mut infojson = None; - let mut tracks = None; - let mut cover = None; - let mut chapters = None; - let mut tags = None; - loop { - let (x, mut seg) = match segment.read_tag() { - Ok(o) => o, - Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, - Err(e) => return Err(e.into()), - }; - match x { - EL_INFO => info = Some(Info::read(&mut seg).context("info")?), - EL_TRACKS => tracks = Some(Tracks::read(&mut seg).context("tracks")?), - EL_CHAPTERS => chapters = Some(Chapters::read(&mut seg).context("chapters")?), - EL_TAGS => tags = Some(Tags::read(&mut seg).context("tags")?), - EL_ATTACHMENTS => { - let attachments = Attachments::read(&mut seg).context("attachments")?; - for f in attachments.files { - match f.name.as_str() { - "info.json" => { - infojson = Some(f.data); - } - "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" - | "cover.avif" => { - cover = Some( - AssetInner::Cache(cache_file( - &["att-cover", path.to_string_lossy().as_ref()], - move |mut file| { - file.write_all(&f.data)?; - Ok(()) - }, - )?) - .ser(), - ) - } - _ => (), - } - } - } - EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { - seg.consume()?; - } - EL_CLUSTER => { - break; - } - id => { - warn!("unknown top-level element {id:x}"); - seg.consume()?; - } - } - } - Ok(Some(MatroskaMetadata { - chapters, - cover, - info, - infojson, - tags, - tracks, - })) - }) -} diff --git a/remuxer/Cargo.toml b/remuxer/Cargo.toml index 2313dcc..16713df 100644 --- a/remuxer/Cargo.toml +++ b/remuxer/Cargo.toml @@ -13,3 +13,7 @@ log = { workspace = true } serde = { version = "1.0.217", features = ["derive"] } bincode = { version = "2.0.0-rc.3", features = ["serde"] } + +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [ + "bincode", +] } diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index a98ffad..cc4b39b 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -9,6 +9,7 @@ pub mod remux; pub mod seek_index; pub mod segment_extractor; pub mod trim_writer; +pub mod metadata; pub use fragment::write_fragment_into; pub use remux::remux_stream_into; diff --git a/remuxer/src/metadata.rs b/remuxer/src/metadata.rs new file mode 100644 index 0000000..4ddad20 --- /dev/null +++ b/remuxer/src/metadata.rs @@ -0,0 +1,116 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +use anyhow::{Context, Result}; +use bincode::{Decode, Encode}; +use ebml_struct::{ + ids::*, + matroska::*, + read::{EbmlReadExt, TagRead}, +}; +use jellybase::{ + assetfed::AssetInner, + cache::{cache_file, cache_memory}, + common::Asset, +}; +use log::{info, warn}; +use std::{ + fs::File, + io::{BufReader, ErrorKind, Read, Write}, + path::Path, + sync::Arc, +}; + +#[derive(Debug, Encode, Decode, Clone)] +pub struct MatroskaMetadata { + pub info: Option, + pub tracks: Option, + pub cover: Option, + pub chapters: Option, + pub tags: Option, + pub infojson: Option>, +} +pub fn matroska_metadata(path: &Path) -> Result>> { + cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || { + let mut magic = [0; 4]; + File::open(path)?.read_exact(&mut magic).ok(); + if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { + return Ok(None); + } + + info!("reading {path:?}"); + let mut file = BufReader::new(File::open(path)?); + let mut file = file.by_ref().take(u64::MAX); + + let (x, mut ebml) = file.read_tag()?; + assert_eq!(x, EL_EBML); + let ebml = Ebml::read(&mut ebml).unwrap(); + assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); + let (x, mut segment) = file.read_tag()?; + assert_eq!(x, EL_SEGMENT); + + let mut info = None; + let mut infojson = None; + let mut tracks = None; + let mut cover = None; + let mut chapters = None; + let mut tags = None; + loop { + let (x, mut seg) = match segment.read_tag() { + Ok(o) => o, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + }; + match x { + EL_INFO => info = Some(Info::read(&mut seg).context("info")?), + EL_TRACKS => tracks = Some(Tracks::read(&mut seg).context("tracks")?), + EL_CHAPTERS => chapters = Some(Chapters::read(&mut seg).context("chapters")?), + EL_TAGS => tags = Some(Tags::read(&mut seg).context("tags")?), + EL_ATTACHMENTS => { + let attachments = Attachments::read(&mut seg).context("attachments")?; + for f in attachments.files { + match f.name.as_str() { + "info.json" => { + infojson = Some(f.data); + } + "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" + | "cover.avif" => { + cover = Some( + AssetInner::Cache(cache_file( + &["att-cover", path.to_string_lossy().as_ref()], + move |mut file| { + file.write_all(&f.data)?; + Ok(()) + }, + )?) + .ser(), + ) + } + _ => (), + } + } + } + EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { + seg.consume()?; + } + EL_CLUSTER => { + break; + } + id => { + warn!("unknown top-level element {id:x}"); + seg.consume()?; + } + } + } + Ok(Some(MatroskaMetadata { + chapters, + cover, + info, + infojson, + tags, + tracks, + })) + }) +} diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 4b3d02e..8f97aec 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -6,13 +6,9 @@ use super::ui::{account::session::Session, error::MyError}; use crate::database::Database; use anyhow::{anyhow, Result}; -use jellybase::{federation::Federation, permission::PermissionSetExt, SECRETS}; -use jellycommon::{ - config::FederationAccount, - stream::StreamSpec, - user::{CreateSessionParams, UserPermission}, - TrackSource, -}; +use jellybase::{assetfed::AssetInner, federation::Federation}; +use jellycommon::{stream::StreamSpec, TrackSource}; +use jellystream::SMediaInfo; use log::{info, warn}; use rocket::{ get, head, @@ -22,7 +18,7 @@ use rocket::{ Either, Request, Response, State, }; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, ops::Range, sync::Arc, }; @@ -131,6 +127,19 @@ pub async fn r_stream( let head = jellystream::stream_head(&spec); + let mut sources = BTreeSet::new(); + for t in &media.tracks { + if let TrackSource::Local(x) = &t.source { + if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { + sources.insert(m.path); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + info: node, + }); + match jellystream::stream(media, spec, urange).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 56edd2d..27630b2 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -34,8 +34,8 @@ pub async fn hls_master_stream( let uri = format!( "stream?{}", StreamSpec::HlsVariant { - track: i, segment, + track: i, container, format: 0 } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 1f32239..751ecfa 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -12,17 +12,28 @@ pub mod webvtt; use anyhow::Result; use ebml_struct::matroska::{Info, Tracks}; use jellybase::common::{ - stream::{StreamContainer, StreamSpec}, - LocalTrack, MediaInfo, Node, + stream::{ + StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, + StreamTrackInfo, TrackKind, + }, + LocalTrack, Node, }; use jellymatroska::block::LacingType; -use std::{ops::Range, sync::Arc}; +use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; +use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, + task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; +#[derive(Debug)] +pub struct SMediaInfo { + pub info: Arc, + pub files: BTreeSet, +} + pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, @@ -54,7 +65,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead { } pub async fn stream( - info: Arc, + info: Arc, spec: StreamSpec, range: Range, ) -> Result { @@ -73,7 +84,7 @@ pub async fn stream( container, format, } => todo!(), - StreamSpec::Info { segment } => todo!(), + StreamSpec::Info { segment } => write_stream_info(info, b).await?, StreamSpec::FragmentIndex { segment, track } => todo!(), StreamSpec::Fragment { segment, @@ -87,6 +98,64 @@ pub async fn stream( Ok(a) } +async fn async_matroska_metadata(path: PathBuf) -> Result>> { + Ok(spawn_blocking(move || matroska_metadata(&path)).await??) +} + +async fn stream_info(info: Arc) -> Result { + let mut metadata = Vec::new(); + for path in &info.files { + metadata.extend((*async_matroska_metadata(path.clone()).await?).clone()); + } + + let mut tracks = Vec::new(); + + for m in &metadata { + if let Some(t) = &m.tracks { + for t in &t.entries { + let mut formats = Vec::new(); + formats.push(StreamFormatInfo { + codec: t.codec_id.to_string(), + remux: true, + byterate: 10., // TODO + containers: [StreamContainer::Matroska].to_vec(), + bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), + samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), + channels: t.audio.as_ref().map(|a| a.channels as usize), + pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height), + ..Default::default() + }); + tracks.push(StreamTrackInfo { + name: None, + kind: match t.track_type { + 1 => TrackKind::Video, + 2 => TrackKind::Audio, + 17 => TrackKind::Subtitle, + _ => todo!(), + }, + formats, + }) + } + } + } + + let segment = StreamSegmentInfo { + name: None, + duration: 0, + tracks, + }; + Ok(StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }) +} + +async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { + let info = stream_info(info).await?; + b.write_all(&serde_json::to_vec(&info)?).await?; + Ok(()) +} + async fn remux_stream( node: Arc, local_tracks: Vec, @@ -146,20 +215,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> } } -// TODO functions to test seekability, get live status and enumate segments -trait MediaSource { - fn loaded_ranges(&self) -> Result>>; - /// Seeks to some position close to, but before, `time` ticks. - fn seek(&mut self, segment: u64, time: u64) -> Result<()>; - /// Retrieve headers (info and tracks) for some segment. - fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; - /// Returns the next block and the current segment index - fn next(&mut self) -> Result>; -} -pub struct AbsBlock { - track: u64, - pts: u64, - keyframe: bool, - lacing: Option, - data: Vec, -} +// // TODO functions to test seekability, get live status and enumate segments +// trait MediaSource { +// fn loaded_range(&self) -> Result>; +// /// Seeks to some position close to, but before, `time` ticks. +// fn seek(&mut self, segment: u64, time: u64) -> Result<()>; +// /// Retrieve headers (info and tracks) for some segment. +// fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; +// /// Returns the next block and the current segment index +// fn next(&mut self) -> Result>; +// } +// pub struct AbsBlock { +// track: u64, +// pts: u64, +// keyframe: bool, +// lacing: Option, +// data: Vec, +// } -- cgit v1.2.3-70-g09d2 From 68c579bc4ecbbf65cf4588adf88957de4c3444f9 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 13 Apr 2025 15:57:09 +0200 Subject: original stream --- stream/src/lib.rs | 78 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 751ecfa..59b4960 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -9,8 +9,7 @@ pub mod hls; pub mod jhls; pub mod webvtt; -use anyhow::Result; -use ebml_struct::matroska::{Info, Tracks}; +use anyhow::{anyhow, Context, Result}; use jellybase::common::{ stream::{ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, @@ -18,12 +17,11 @@ use jellybase::common::{ }, LocalTrack, Node, }; -use jellymatroska::block::LacingType; use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; -use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc}; +use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; use tokio::{ fs::File, - io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, + io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; @@ -75,7 +73,7 @@ pub async fn stream( StreamSpec::Whep { track, seek } => todo!(), StreamSpec::WhepControl { token } => todo!(), StreamSpec::Remux { tracks, container } => todo!(), - StreamSpec::Original { track } => todo!(), + StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => todo!(), StreamSpec::HlsMultiVariant { segment, container } => todo!(), StreamSpec::HlsVariant { @@ -102,15 +100,26 @@ async fn async_matroska_metadata(path: PathBuf) -> Result) -> Result { +struct InternalStreamInfo { + paths: Vec, + metadata: Vec, + track_to_file: Vec, +} + +async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { let mut metadata = Vec::new(); + let mut paths = Vec::new(); for path in &info.files { - metadata.extend((*async_matroska_metadata(path.clone()).await?).clone()); + if let Some(meta) = (*async_matroska_metadata(path.clone()).await?).clone() { + metadata.push(meta); + paths.push(path.clone()); + } } let mut tracks = Vec::new(); + let mut track_to_file = Vec::new(); - for m in &metadata { + for (i, m) in metadata.iter().enumerate() { if let Some(t) = &m.tracks { for t in &t.entries { let mut formats = Vec::new(); @@ -134,7 +143,8 @@ async fn stream_info(info: Arc) -> Result { _ => todo!(), }, formats, - }) + }); + track_to_file.push(i); } } } @@ -144,14 +154,21 @@ async fn stream_info(info: Arc) -> Result { duration: 0, tracks, }; - Ok(StreamInfo { - name: info.info.title.clone(), - segments: vec![segment], - }) + Ok(( + InternalStreamInfo { + metadata, + paths, + track_to_file, + }, + StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }, + )) } async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { - let info = stream_info(info).await?; + let (_, info) = stream_info(info).await?; b.write_all(&serde_json::to_vec(&info)?).await?; Ok(()) } @@ -181,24 +198,25 @@ async fn remux_stream( } async fn original_stream( - local_tracks: Vec, - spec: StreamSpec, + info: Arc, + track: usize, range: Range, b: DuplexStream, ) -> Result<()> { - // if spec.track.len() != 1 { - // bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - // } - - // let source = local_tracks[spec.track[0]].clone(); - // let mut file = File::open(CONF.media_path.join(source.path)) - // .await - // .context("opening source")?; - // file.seek(SeekFrom::Start(range.start as u64)) - // .await - // .context("seek source")?; - - // tokio::task::spawn(copy_stream(file, b, range.end - range.start)); + let (iinfo, _info) = stream_info(info).await?; + + let file_index = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("unknown track"))?; + let mut file = File::open(&iinfo.paths[file_index]) + .await + .context("opening source")?; + file.seek(SeekFrom::Start(range.start as u64)) + .await + .context("seek source")?; + + tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } -- cgit v1.2.3-70-g09d2 From 48a57a52d85d387efe122fb4d9fb113f577a0a98 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 13 Apr 2025 18:19:03 +0200 Subject: arc mkmeta --- import/src/lib.rs | 4 ++-- remuxer/src/metadata.rs | 27 +++++++++++++++++---------- stream/src/lib.rs | 34 +++++++--------------------------- 3 files changed, 26 insertions(+), 39 deletions(-) diff --git a/import/src/lib.rs b/import/src/lib.rs index d7f9dd7..5607450 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -15,7 +15,7 @@ use jellybase::{ CONF, SECRETS, }; use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility}; -use jellyremuxer::metadata::matroska_metadata; +use jellyremuxer::metadata::checked_matroska_metadata; use log::info; use rayon::iter::{ParallelBridge, ParallelIterator}; use regex::Regex; @@ -278,7 +278,7 @@ fn import_media_file( visibility: Visibility, ) -> Result<()> { info!("media file {path:?}"); - let Some(m) = (*matroska_metadata(path)?).to_owned() else { + let Some(m) = (*checked_matroska_metadata(path)?).to_owned() else { return Ok(()); }; let infojson = m diff --git a/remuxer/src/metadata.rs b/remuxer/src/metadata.rs index 4ddad20..c8a5f8f 100644 --- a/remuxer/src/metadata.rs +++ b/remuxer/src/metadata.rs @@ -32,14 +32,21 @@ pub struct MatroskaMetadata { pub tags: Option, pub infojson: Option>, } -pub fn matroska_metadata(path: &Path) -> Result>> { - cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || { - let mut magic = [0; 4]; - File::open(path)?.read_exact(&mut magic).ok(); - if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { - return Ok(None); - } - +pub fn checked_matroska_metadata(path: &Path) -> Result>> { + cache_memory( + &["mkmeta-check-v1", path.to_string_lossy().as_ref()], + || { + let mut magic = [0; 4]; + File::open(path)?.read_exact(&mut magic).ok(); + if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { + return Ok(None); + } + Ok(Some((*matroska_metadata(path)?).clone())) + }, + ) +} +pub fn matroska_metadata(path: &Path) -> Result> { + cache_memory(&["mkmeta-v3", path.to_string_lossy().as_ref()], || { info!("reading {path:?}"); let mut file = BufReader::new(File::open(path)?); let mut file = file.by_ref().take(u64::MAX); @@ -104,13 +111,13 @@ pub fn matroska_metadata(path: &Path) -> Result>> { } } } - Ok(Some(MatroskaMetadata { + Ok(MatroskaMetadata { chapters, cover, info, infojson, tags, tracks, - })) + }) }) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 59b4960..d09759f 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -96,24 +96,22 @@ pub async fn stream( Ok(a) } -async fn async_matroska_metadata(path: PathBuf) -> Result>> { +async fn async_matroska_metadata(path: PathBuf) -> Result> { Ok(spawn_blocking(move || matroska_metadata(&path)).await??) } -struct InternalStreamInfo { - paths: Vec, - metadata: Vec, - track_to_file: Vec, +pub(crate) struct InternalStreamInfo { + pub paths: Vec, + pub metadata: Vec>, + pub track_to_file: Vec, } async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { let mut metadata = Vec::new(); let mut paths = Vec::new(); for path in &info.files { - if let Some(meta) = (*async_matroska_metadata(path.clone()).await?).clone() { - metadata.push(meta); - paths.push(path.clone()); - } + metadata.push(async_matroska_metadata(path.clone()).await?); + paths.push(path.clone()); } let mut tracks = Vec::new(); @@ -232,21 +230,3 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> amount -= size; } } - -// // TODO functions to test seekability, get live status and enumate segments -// trait MediaSource { -// fn loaded_range(&self) -> Result>; -// /// Seeks to some position close to, but before, `time` ticks. -// fn seek(&mut self, segment: u64, time: u64) -> Result<()>; -// /// Retrieve headers (info and tracks) for some segment. -// fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; -// /// Returns the next block and the current segment index -// fn next(&mut self) -> Result>; -// } -// pub struct AbsBlock { -// track: u64, -// pts: u64, -// keyframe: bool, -// lacing: Option, -// data: Vec, -// } -- cgit v1.2.3-70-g09d2 From a3afc2756a52f7d6fedc928b97c8ff3eb1ade338 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 14 Apr 2025 13:41:42 +0200 Subject: lots of rewriting and removing dumb code --- base/src/assetfed.rs | 7 - base/src/database.rs | 27 +- common/src/lib.rs | 1 - common/src/stream.rs | 53 ++-- import/src/lib.rs | 10 +- remuxer/src/extract.rs | 17 +- remuxer/src/fragment.rs | 101 ++++---- remuxer/src/lib.rs | 63 ++--- remuxer/src/remux.rs | 572 ++++++++++++++++++++--------------------- server/src/routes/ui/player.rs | 22 +- stream/src/fragment.rs | 45 ++-- stream/src/fragment_index.rs | 32 +++ stream/src/hls.rs | 72 +++--- stream/src/jhls.rs | 47 ---- stream/src/lib.rs | 36 ++- stream/src/webvtt.rs | 3 +- 16 files changed, 542 insertions(+), 566 deletions(-) create mode 100644 stream/src/fragment_index.rs delete mode 100644 stream/src/jhls.rs diff --git a/base/src/assetfed.rs b/base/src/assetfed.rs index 575188d..697cacb 100644 --- a/base/src/assetfed.rs +++ b/base/src/assetfed.rs @@ -78,11 +78,4 @@ impl AssetInner { pub fn is_federated(&self) -> bool { matches!(self, Self::Federated { .. }) } - - pub fn as_local_track(self) -> Option { - match self { - AssetInner::LocalTrack(x) => Some(x), - _ => None, - } - } } diff --git a/base/src/database.rs b/base/src/database.rs index 407db29..32f1464 100644 --- a/base/src/database.rs +++ b/base/src/database.rs @@ -14,7 +14,8 @@ use redb::{Durability, ReadableTable, StorageError, TableDefinition}; use std::{ fs::create_dir_all, hash::{DefaultHasher, Hasher}, - path::Path, + path::{Path, PathBuf}, + str::FromStr, sync::{Arc, RwLock}, time::SystemTime, }; @@ -38,6 +39,8 @@ const T_NODE_EXTERNAL_ID: TableDefinition<(&str, &str), [u8; 32]> = TableDefinition::new("node_external_id"); const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime"); const T_NODE_MTIME: TableDefinition<[u8; 32], u64> = TableDefinition::new("node_mtime"); +const T_NODE_MEDIA_PATHS: TableDefinition<([u8; 32], &str), ()> = + TableDefinition::new("node_media_paths"); #[derive(Clone)] pub struct Database { @@ -67,6 +70,7 @@ impl Database { txn.open_table(T_NODE_MTIME)?; txn.open_table(T_NODE_CHILDREN)?; txn.open_table(T_NODE_EXTERNAL_ID)?; + txn.open_table(T_NODE_MEDIA_PATHS)?; txn.open_table(T_IMPORT_FILE_MTIME)?; txn.commit()?; } @@ -123,17 +127,20 @@ impl Database { let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; let mut t_import_file_mtime = txn.open_table(T_IMPORT_FILE_MTIME)?; + let mut t_node_media_paths = txn.open_table(T_NODE_MEDIA_PATHS)?; t_node.retain(|_, _| false)?; t_node_mtime.retain(|_, _| false)?; t_node_children.retain(|_, _| false)?; t_node_external_id.retain(|_, _| false)?; t_import_file_mtime.retain(|_, _| false)?; + t_node_media_paths.retain(|_, _| false)?; drop(( t_node, t_node_mtime, t_node_children, t_node_external_id, t_import_file_mtime, + t_node_media_paths, )); txn.set_durability(Durability::Eventual); txn.commit()?; @@ -189,6 +196,24 @@ impl Database { txn.commit()?; Ok(()) } + pub fn get_node_media_paths(&self, id: NodeID) -> Result> { + let txn = self.inner.begin_read()?; + let table = txn.open_table(T_NODE_MEDIA_PATHS)?; + let mut paths = Vec::new(); + // TODO fix this + for p in table.range((id.0, "\0")..(id.0, "\x7f"))? { + paths.push(PathBuf::from_str(p?.0.value().1)?); + } + Ok(paths) + } + pub fn insert_node_media_path(&self, id: NodeID, path: &Path) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_MEDIA_PATHS)?; + table.insert((id.0, path.to_str().unwrap()), ())?; + drop(table); + txn.commit()?; + Ok(()) + } pub fn update_node_udata( &self, diff --git a/common/src/lib.rs b/common/src/lib.rs index ce333eb..00f07b6 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -171,7 +171,6 @@ pub type TrackID = usize; pub struct LocalTrack { pub path: PathBuf, pub track: TrackID, - pub codec_private: Option>, } #[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)] diff --git a/common/src/stream.rs b/common/src/stream.rs index a06dad5..75349cc 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -6,10 +6,15 @@ use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, fmt::Display, str::FromStr}; +pub type SegmentNum = usize; +pub type TrackNum = usize; +pub type FormatNum = usize; +pub type IndexNum = usize; + #[derive(Debug, Clone, Deserialize, Serialize)] pub enum StreamSpec { Whep { - track: usize, + track: TrackNum, seek: u64, }, WhepControl { @@ -20,34 +25,34 @@ pub enum StreamSpec { container: StreamContainer, }, Original { - track: usize, + track: TrackNum, }, HlsSuperMultiVariant { container: StreamContainer, }, HlsMultiVariant { - segment: u64, + segment: SegmentNum, container: StreamContainer, }, HlsVariant { - segment: u64, - track: usize, + segment: SegmentNum, + track: TrackNum, container: StreamContainer, - format: usize, + format: FormatNum, }, Info { segment: Option, }, FragmentIndex { - segment: u64, - track: usize, + segment: SegmentNum, + track: TrackNum, }, Fragment { - segment: u64, - track: usize, - index: u64, + segment: SegmentNum, + track: TrackNum, + index: IndexNum, container: StreamContainer, - format: usize, + format: FormatNum, }, } @@ -60,7 +65,7 @@ pub struct StreamInfo { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct StreamSegmentInfo { pub name: Option, - pub duration: u64, + pub duration: f64, pub tracks: Vec, } @@ -92,7 +97,7 @@ pub struct StreamFormatInfo { pub bit_depth: Option, } -#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum StreamContainer { WebM, @@ -164,13 +169,25 @@ impl StreamSpec { Ok(Self::Info { segment: get_num("segment").ok(), }) + } else if query.contains_key("hlsmultivariant") { + Ok(Self::HlsMultiVariant { + segment: get_num("segment")? as SegmentNum, + container: get_container()?, + }) + } else if query.contains_key("hlsvariant") { + Ok(Self::HlsVariant { + segment: get_num("segment")? as SegmentNum, + track: get_num("track")? as TrackNum, + format: get_num("format")? as FormatNum, + container: get_container()?, + }) } else if query.contains_key("fragment") { Ok(Self::Fragment { - segment: get_num("segment")?, - track: get_num("track")? as usize, - index: get_num("index")?, + segment: get_num("segment")? as SegmentNum, + track: get_num("track")? as TrackNum, + format: get_num("format")? as FormatNum, + index: get_num("index")? as IndexNum, container: get_container()?, - format: get_num("format")? as usize, }) } else { Err("invalid stream spec") diff --git a/import/src/lib.rs b/import/src/lib.rs index 5607450..3ea42f1 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -7,14 +7,13 @@ use anyhow::{anyhow, bail, Context, Result}; use infojson::YVideo; use jellybase::{ assetfed::AssetInner, - common::{ - Chapter, LocalTrack, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, - SourceTrackKind, TrackSource, - }, + common::{Chapter, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind}, database::Database, CONF, SECRETS, }; -use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility}; +use jellyclient::{ + Appearance, LocalTrack, PeopleGroup, TmdbKind, TrackSource, TraktKind, Visibility, +}; use jellyremuxer::metadata::checked_matroska_metadata; use log::info; use rayon::iter::{ParallelBridge, ParallelIterator}; @@ -397,7 +396,6 @@ fn import_media_file( }, source: TrackSource::Local( AssetInner::LocalTrack(LocalTrack { - codec_private: track.codec_private, path: path.to_owned(), track: track.track_number as usize, }) diff --git a/remuxer/src/extract.rs b/remuxer/src/extract.rs index 12e4003..15c1e9d 100644 --- a/remuxer/src/extract.rs +++ b/remuxer/src/extract.rs @@ -5,29 +5,22 @@ */ use crate::seek_index::get_seek_index; use anyhow::{anyhow, bail}; -use jellybase::common::LocalTrack; use jellymatroska::{block::Block, read::EbmlReader, Master, MatroskaTag}; use log::debug; use std::{fs::File, io::BufReader, path::PathBuf}; pub type TrackExtract = Vec<(u64, Option, Vec)>; -pub fn extract_track(path_base: PathBuf, track_info: LocalTrack) -> anyhow::Result { - let source_path = path_base.join(track_info.path); - let file = File::open(&source_path)?; +pub fn extract_track(path: PathBuf, track: u64) -> anyhow::Result { + let file = File::open(&path)?; let mut reader = EbmlReader::new(BufReader::new(file)); - let index = get_seek_index(&source_path)?; - let index = index - .get(&(track_info.track as u64)) - .ok_or(anyhow!("track missing"))?; + let index = get_seek_index(&path)?; + let index = index.get(&track).ok_or(anyhow!("track missing"))?; let mut out = Vec::new(); for b in &index.blocks { reader.seek(b.source_off, MatroskaTag::BlockGroup(Master::Start))?; let (duration, block) = read_group(&mut reader)?; - assert_eq!( - track_info.track, block.track as usize, - "seek index is wrong" - ); + assert_eq!(track, block.track, "seek index is wrong"); out.push((b.pts, duration, block.data)) } Ok(out) diff --git a/remuxer/src/fragment.rs b/remuxer/src/fragment.rs index 9fa68f3..73fe046 100644 --- a/remuxer/src/fragment.rs +++ b/remuxer/src/fragment.rs @@ -5,11 +5,10 @@ */ use crate::{ - ebml_header, ebml_segment_info, ebml_track_entry, seek_index::get_seek_index, - segment_extractor::SegmentExtractIter, + ebml_header, ebml_segment_info, ebml_track_entry, metadata::matroska_metadata, + seek_index::get_seek_index, segment_extractor::SegmentExtractIter, }; use anyhow::{anyhow, Context, Result}; -use jellybase::common::{LocalTrack, Node, SourceTrackKind}; use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag}; use log::{debug, info}; use std::{ @@ -21,32 +20,33 @@ use std::{ const FRAGMENT_LENGTH: f64 = 2.; -pub fn fragment_index( - path_base: &Path, - item: &Node, - local_track: &LocalTrack, - track_index: usize, -) -> Result>> { - let media_info = item.media.as_ref().unwrap(); - let source_path = path_base.join(&local_track.path); - let index = get_seek_index(&source_path)?; +pub fn fragment_index(path: &Path, track: u64) -> Result>> { + let meta = matroska_metadata(path)?; + let duration = meta.info.as_ref().unwrap().duration.unwrap(); + let force_kf = meta + .as_ref() + .tracks + .as_ref() + .unwrap() + .entries + .iter() + .find(|t| t.track_number == track) + .unwrap() + .track_type + == 17; + + let index = get_seek_index(&path)?; let index = index - .get(&(local_track.track as u64)) + .get(&track) .ok_or(anyhow!("seek index track missing"))?; - // everything is a keyframe (even though nothing is...) - let force_kf = matches!( - media_info.tracks[track_index].kind, - SourceTrackKind::Subtitles { .. } - ); - let n_kf = if force_kf { index.blocks.len() } else { index.keyframes.len() }; - let average_kf_interval = media_info.duration / n_kf as f64; + let average_kf_interval = duration / n_kf as f64; let kf_per_frag = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize; debug!("average keyframe interval: {average_kf_interval}"); debug!(" => keyframes per frag {kf_per_frag}"); @@ -72,7 +72,7 @@ pub fn fragment_index( index.keyframes.get((i + 1) * kf_per_frag).copied() } .map(|i| index.blocks[i].pts as f64 / 1000.) - .unwrap_or(media_info.duration); + .unwrap_or(duration); start..end }) .collect()) @@ -80,45 +80,45 @@ pub fn fragment_index( pub fn write_fragment_into( writer: impl Write, - path_base: &Path, - item: &Node, - local_track: &LocalTrack, - track: usize, + path: &Path, + track: u64, webm: bool, + title: &str, n: usize, ) -> anyhow::Result<()> { - info!("writing fragment {n} of {:?} (track {track})", item.title); - let mut output = EbmlWriter::new(BufWriter::new(writer), 0); - let media_info = item.media.as_ref().unwrap(); - let info = media_info + let meta = matroska_metadata(path)?; + let duration = meta.info.as_ref().unwrap().duration.unwrap(); + let track_meta = meta + .as_ref() .tracks - .get(track) - .ok_or(anyhow!("track not available"))? - .to_owned(); - let source_path = path_base.join(&local_track.path); + .as_ref() + .unwrap() + .entries + .iter() + .find(|t| t.track_number == track) + .unwrap(); + let force_kf = track_meta.track_type == 17; + + info!("writing fragment {n} of {:?} (track {track})", title); + let mut output = EbmlWriter::new(BufWriter::new(writer), 0); let mapped = 1; - info!( - "\t- {track} {source_path:?} ({} => {mapped})", - local_track.track - ); - info!("\t {}", info); - let file = File::open(&source_path).context("opening source file")?; - let index = get_seek_index(&source_path)?; + info!("\t- {track} {path:?} ({} => {mapped})", track); + // info!("\t {}", info); + let file = File::open(&path).context("opening source file")?; + let index = get_seek_index(&path)?; let index = index - .get(&(local_track.track as u64)) + .get(&track) .ok_or(anyhow!("track missing 2"))? .to_owned(); debug!("\t seek index: {} blocks loaded", index.blocks.len()); let mut reader = EbmlReader::new(BufReader::new(file)); - let force_kf = matches!(info.kind, SourceTrackKind::Subtitles { .. }); let n_kf = if force_kf { index.blocks.len() } else { index.keyframes.len() }; - - let average_kf_interval = media_info.duration / n_kf as f64; + let average_kf_interval = duration / n_kf as f64; let kf_per_frag = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize; debug!("average keyframe interval: {average_kf_interval}"); debug!(" => keyframes per frag {kf_per_frag}"); @@ -144,25 +144,20 @@ pub fn write_fragment_into( .blocks .get(end_block_index) .map(|b| b.pts) - .unwrap_or((media_info.duration * 1000.) as u64); + .unwrap_or((duration * 1000.) as u64); output.write_tag(&ebml_header(webm))?; output.write_tag(&MatroskaTag::Segment(Master::Start))?; output.write_tag(&ebml_segment_info( - format!("{}: {info}", item.title.clone().unwrap_or_default()), + title.to_string(), (last_block_pts - start_block.pts) as f64 / 1000., ))?; output.write_tag(&MatroskaTag::Tracks(Master::Collected(vec![ - ebml_track_entry( - mapped, - local_track.track as u64 * 100, // TODO something else that is unique to the track - &info, - local_track.codec_private.clone(), - ), + ebml_track_entry(mapped, track_meta), ])))?; reader.seek(start_block.source_off, MatroskaTag::Cluster(Master::Start))?; - let mut reader = SegmentExtractIter::new(&mut reader, local_track.track as u64); + let mut reader = SegmentExtractIter::new(&mut reader, track); { // TODO this one caused fragments to get dropped by MSE for no reason diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index cc4b39b..9ddf7c1 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -5,17 +5,16 @@ */ pub mod extract; pub mod fragment; +pub mod metadata; pub mod remux; pub mod seek_index; pub mod segment_extractor; pub mod trim_writer; -pub mod metadata; +use ebml_struct::matroska::TrackEntry; pub use fragment::write_fragment_into; -pub use remux::remux_stream_into; - -use jellybase::common::{SourceTrack, SourceTrackKind}; use jellymatroska::{Master, MatroskaTag}; +pub use remux::remux_stream_into; pub fn ebml_header(webm: bool) -> MatroskaTag { MatroskaTag::Ebml(Master::Collected(vec![ @@ -42,66 +41,56 @@ pub fn ebml_segment_info(title: String, duration: f64) -> MatroskaTag { ])) } -pub fn ebml_track_entry( - number: u64, - uid: u64, - track: &SourceTrack, - codec_private: Option>, -) -> MatroskaTag { +pub fn ebml_track_entry(number: u64, track: &TrackEntry) -> MatroskaTag { let mut els = vec![ MatroskaTag::TrackNumber(number), - MatroskaTag::TrackUID(uid), MatroskaTag::FlagLacing(track.flag_lacing), MatroskaTag::Language(track.language.clone()), - MatroskaTag::CodecID(track.codec.clone()), + MatroskaTag::CodecID(track.codec_id.clone()), MatroskaTag::CodecDelay(track.codec_delay), MatroskaTag::SeekPreRoll(track.seek_pre_roll), ]; if let Some(d) = &track.default_duration { els.push(MatroskaTag::DefaultDuration(*d)); } - match track.kind { - SourceTrackKind::Video { - width, - height, - display_height, - display_width, - display_unit, - fps, - } => { + match track.track_type { + 1 => { + let video = track.video.as_ref().unwrap(); els.push(MatroskaTag::TrackType(1)); let mut props = vec![ - MatroskaTag::PixelWidth(width), - MatroskaTag::PixelHeight(height), + MatroskaTag::PixelWidth(video.pixel_width), + MatroskaTag::PixelHeight(video.pixel_height), ]; - props.push(MatroskaTag::DisplayWidth(display_width.unwrap_or(width))); - props.push(MatroskaTag::DisplayHeight(display_height.unwrap_or(height))); - props.push(MatroskaTag::DisplayUnit(display_unit)); - if let Some(fps) = fps { + props.push(MatroskaTag::DisplayWidth( + video.display_width.unwrap_or(video.pixel_width), + )); + props.push(MatroskaTag::DisplayHeight( + video.display_height.unwrap_or(video.pixel_height), + )); + props.push(MatroskaTag::DisplayUnit(video.display_unit)); + if let Some(fps) = video.frame_rate { props.push(MatroskaTag::FrameRate(fps)) } els.push(MatroskaTag::Video(Master::Collected(props))) } - SourceTrackKind::Audio { - channels, - sample_rate, - bit_depth, - } => { + 2 => { + let audio = track.audio.as_ref().unwrap(); els.push(MatroskaTag::TrackType(2)); let mut props = vec![ - MatroskaTag::SamplingFrequency(sample_rate), - MatroskaTag::Channels(channels.try_into().unwrap()), + MatroskaTag::SamplingFrequency(audio.sampling_frequency), + MatroskaTag::Channels(audio.channels), ]; - if let Some(bit_depth) = bit_depth { + if let Some(bit_depth) = audio.bit_depth { props.push(MatroskaTag::BitDepth(bit_depth.try_into().unwrap())); } els.push(MatroskaTag::Audio(Master::Collected(props))); } - SourceTrackKind::Subtitles => { + 17 => { els.push(MatroskaTag::TrackType(17)); } + _ => unreachable!(), } - if let Some(d) = &codec_private { + if let Some(d) = &track.codec_private { els.push(MatroskaTag::CodecPrivate(d.clone())); } MatroskaTag::TrackEntry(Master::Collected(els)) diff --git a/remuxer/src/remux.rs b/remuxer/src/remux.rs index 0507f1e..a44c58b 100644 --- a/remuxer/src/remux.rs +++ b/remuxer/src/remux.rs @@ -3,333 +3,311 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use crate::{ - ebml_header, ebml_track_entry, seek_index::get_seek_index, - segment_extractor::SegmentExtractIter, trim_writer::TrimWriter, -}; -use anyhow::{anyhow, Context}; -use jellybase::common::{ - seek_index::{BlockIndex, SeekIndex}, - LocalTrack, Node, SourceTrack, -}; -use jellymatroska::{ - read::EbmlReader, - write::{bad_vint_length, vint_length, EbmlWriter}, - Master, MatroskaTag, -}; -use log::{debug, info, trace, warn}; -use std::{ - fs::File, - io::{BufReader, BufWriter, Seek, SeekFrom, Write}, - ops::Range, - path::PathBuf, - sync::Arc, - time::Instant, -}; +use jellybase::common::Node; +use std::{io::Write, ops::Range, path::PathBuf}; -struct ClusterLayout { - position: usize, - timestamp: u64, - source_offsets: Vec>, - blocks: Vec<(usize, BlockIndex)>, -} +// struct ClusterLayout { +// position: usize, +// timestamp: u64, +// source_offsets: Vec>, +// blocks: Vec<(usize, BlockIndex)>, +// } pub fn remux_stream_into( - writer: impl Write, - range: Range, - path_base: PathBuf, - item: &Node, - track_sources: Vec, - selection: Vec, - webm: bool, + _writer: impl Write, + _range: Range, + _path_base: PathBuf, + _item: &Node, + _selection: Vec, + _webm: bool, ) -> anyhow::Result<()> { - info!("remuxing {:?} to have tracks {selection:?}", item.title); - let writer = TrimWriter::new(BufWriter::new(writer), range.clone()); - let mut output = EbmlWriter::new(writer, 0); + // info!("remuxing {:?} to have tracks {selection:?}", item.title); + // let writer = TrimWriter::new(BufWriter::new(writer), range.clone()); + // let mut output = EbmlWriter::new(writer, 0); - struct ReaderC { - info: SourceTrack, - reader: EbmlReader, - mapped: u64, - index: Arc, - source_track_index: usize, - codec_private: Option>, - layouting_progress_index: usize, - } + // struct ReaderC { + // info: SourceTrack, + // reader: EbmlReader, + // mapped: u64, + // index: Arc, + // source_track_index: usize, + // codec_private: Option>, + // layouting_progress_index: usize, + // } - let timing_cp = Instant::now(); + // let timing_cp = Instant::now(); - let mut inputs = selection - .iter() - .enumerate() - .map(|(index, sel)| { - let info = item - .media - .as_ref() - .unwrap() - .tracks - .get(*sel) - .ok_or(anyhow!("track not available"))? - .to_owned(); - let private = &track_sources[index]; - let source_path = path_base.join(&private.path); - let mapped = index as u64 + 1; - info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track); - info!("\t {}", info); - let file = File::open(&source_path).context("opening source file")?; - let index = get_seek_index(&source_path)?; - let index = index - .get(&(private.track as u64)) - .ok_or(anyhow!("track missing 3"))? - .to_owned(); - debug!("\t seek index: {} blocks loaded", index.blocks.len()); - let reader = EbmlReader::new(BufReader::new(file)); - Ok(ReaderC { - index, - reader, - info, - mapped, - source_track_index: private.track, - codec_private: private.codec_private.clone(), - layouting_progress_index: 0, - }) - }) - .collect::>>()?; + // let mut inputs = selection + // .iter() + // .enumerate() + // .map(|(index, sel)| { + // let info = item + // .media + // .as_ref() + // .unwrap() + // .tracks + // .get(*sel) + // .ok_or(anyhow!("track not available"))? + // .to_owned(); + // let source_path = path_base.join(&private.path); + // let mapped = index as u64 + 1; + // info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track); + // info!("\t {}", info); + // let file = File::open(&source_path).context("opening source file")?; + // let index = get_seek_index(&source_path)?; + // let index = index + // .get(&(private.track as u64)) + // .ok_or(anyhow!("track missing 3"))? + // .to_owned(); + // debug!("\t seek index: {} blocks loaded", index.blocks.len()); + // let reader = EbmlReader::new(BufReader::new(file)); + // Ok(ReaderC { + // index, + // reader, + // info, + // mapped, + // source_track_index: private.track, + // codec_private: private.codec_private.clone(), + // layouting_progress_index: 0, + // }) + // }) + // .collect::>>()?; - info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp); - let timing_cp = Instant::now(); + // info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp); + // let timing_cp = Instant::now(); - output.write_tag(&ebml_header(webm))?; + // output.write_tag(&ebml_header(webm))?; - output.write_tag(&MatroskaTag::Segment(Master::Start))?; - let segment_offset = output.position(); + // output.write_tag(&MatroskaTag::Segment(Master::Start))?; + // let segment_offset = output.position(); - output.write_tag(&MatroskaTag::Info(Master::Collected(vec![ - MatroskaTag::TimestampScale(1_000_000), - MatroskaTag::Duration(item.media.as_ref().unwrap().duration * 1000.0), - MatroskaTag::Title(item.title.clone().unwrap_or_default()), - MatroskaTag::MuxingApp("jellyremux".to_string()), - MatroskaTag::WritingApp("jellything".to_string()), - ])))?; + // output.write_tag(&MatroskaTag::Info(Master::Collected(vec![ + // MatroskaTag::TimestampScale(1_000_000), + // MatroskaTag::Duration(item.media.as_ref().unwrap().duration * 1000.0), + // MatroskaTag::Title(item.title.clone().unwrap_or_default()), + // MatroskaTag::MuxingApp("jellyremux".to_string()), + // MatroskaTag::WritingApp("jellything".to_string()), + // ])))?; - let tracks_header = inputs - .iter_mut() - .map(|rc| ebml_track_entry(rc.mapped, rc.mapped, &rc.info, rc.codec_private.take())) - .collect(); - output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?; + // let tracks_header = inputs + // .iter_mut() + // .map(|rc| ebml_track_entry(rc.mapped, rc.mapped, &rc.info, rc.codec_private.take())) + // .collect(); + // output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?; - let mut segment_layout: Vec = { - let mut cluster_pts = 0; - let mut clusters = vec![]; - let mut cluster = vec![]; - let mut source_offsets = vec![None; inputs.len()]; - let mut gp = 0usize; // cluster position (in the segment) - let mut p = 0usize; // block position (in the cluster) - loop { - let (track, block) = { - let mut best_block = BlockIndex { - pts: u64::MAX, - size: 0, - source_off: 0, - }; - let mut best_track = 0; - for (i, r) in inputs.iter().enumerate() { - if let Some(v) = r.index.blocks.get(r.layouting_progress_index) { - if v.pts < best_block.pts { - best_block = v.to_owned(); - best_track = i; - } - }; - } - (best_track, best_block) - }; - inputs[track].layouting_progress_index += 1; - source_offsets[track].get_or_insert(block.source_off); - if block.pts > cluster_pts + 1_000 { - let cluster_content_size = 1 + 1 // timestamp {tag, size} - + bad_vint_length(cluster_pts) // timestamp tag value - + p; - let cluster_size = 4 // tag length - + vint_length(cluster_content_size as u64) // size varint - + cluster_content_size; - clusters.push(ClusterLayout { - position: gp, // relative to the first cluster - timestamp: cluster_pts, - source_offsets, - blocks: std::mem::take(&mut cluster), - }); + // let mut segment_layout: Vec = { + // let mut cluster_pts = 0; + // let mut clusters = vec![]; + // let mut cluster = vec![]; + // let mut source_offsets = vec![None; inputs.len()]; + // let mut gp = 0usize; // cluster position (in the segment) + // let mut p = 0usize; // block position (in the cluster) + // loop { + // let (track, block) = { + // let mut best_block = BlockIndex { + // pts: u64::MAX, + // size: 0, + // source_off: 0, + // }; + // let mut best_track = 0; + // for (i, r) in inputs.iter().enumerate() { + // if let Some(v) = r.index.blocks.get(r.layouting_progress_index) { + // if v.pts < best_block.pts { + // best_block = v.to_owned(); + // best_track = i; + // } + // }; + // } + // (best_track, best_block) + // }; + // inputs[track].layouting_progress_index += 1; + // source_offsets[track].get_or_insert(block.source_off); + // if block.pts > cluster_pts + 1_000 { + // let cluster_content_size = 1 + 1 // timestamp {tag, size} + // + bad_vint_length(cluster_pts) // timestamp tag value + // + p; + // let cluster_size = 4 // tag length + // + vint_length(cluster_content_size as u64) // size varint + // + cluster_content_size; + // clusters.push(ClusterLayout { + // position: gp, // relative to the first cluster + // timestamp: cluster_pts, + // source_offsets, + // blocks: std::mem::take(&mut cluster), + // }); - cluster_pts = block.pts; - source_offsets = vec![None; inputs.len()]; - gp += cluster_size; - p = 0; - } - if block.pts == u64::MAX { - break; - } + // cluster_pts = block.pts; + // source_offsets = vec![None; inputs.len()]; + // gp += cluster_size; + // p = 0; + // } + // if block.pts == u64::MAX { + // break; + // } - let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags} - // TODO does not work, if more than 127 tracks are present - + block.size; // block payload - p += 1; // simpleblock tag - p += vint_length(simpleblock_size as u64); // simpleblock size vint - p += simpleblock_size; + // let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags} + // // TODO does not work, if more than 127 tracks are present + // + block.size; // block payload + // p += 1; // simpleblock tag + // p += vint_length(simpleblock_size as u64); // simpleblock size vint + // p += simpleblock_size; - cluster.push((track, block)) - } - info!("segment layout computed ({} clusters)", clusters.len()); - clusters - }; - info!( - "(perf) compute segment layout: {:?}", - Instant::now() - timing_cp - ); - let timing_cp = Instant::now(); + // cluster.push((track, block)) + // } + // info!("segment layout computed ({} clusters)", clusters.len()); + // clusters + // }; + // info!( + // "(perf) compute segment layout: {:?}", + // Instant::now() - timing_cp + // ); + // let timing_cp = Instant::now(); - let max_cue_size = 4 // cues id - + 8 // cues len - + ( // cues content - 1 // cp id - + 1 // cp len - + ( // cp content - 1 // ctime id, - + 1 // ctime len - + 8 // ctime content uint - + ( // ctps - 1 // ctp id - + 8 // ctp len - + (// ctp content - 1 // ctrack id - + 1 // ctrack size - + 1 // ctrack content int - // TODO this breaks if inputs.len() >= 127 - + 1 // ccp id - + 1 // ccp len - + 8 // ccp content offset - ) - ) - ) * inputs.len() - ) * segment_layout.len() - + 1 // void id - + 8; // void len + // let max_cue_size = 4 // cues id + // + 8 // cues len + // + ( // cues content + // 1 // cp id + // + 1 // cp len + // + ( // cp content + // 1 // ctime id, + // + 1 // ctime len + // + 8 // ctime content uint + // + ( // ctps + // 1 // ctp id + // + 8 // ctp len + // + (// ctp content + // 1 // ctrack id + // + 1 // ctrack size + // + 1 // ctrack content int + // // TODO this breaks if inputs.len() >= 127 + // + 1 // ccp id + // + 1 // ccp len + // + 8 // ccp content offset + // ) + // ) + // ) * inputs.len() + // ) * segment_layout.len() + // + 1 // void id + // + 8; // void len - let first_cluster_offset_predict = max_cue_size + output.position(); + // let first_cluster_offset_predict = max_cue_size + output.position(); - // make the cluster position relative to the segment start as they should - segment_layout - .iter_mut() - .for_each(|e| e.position += first_cluster_offset_predict - segment_offset); + // // make the cluster position relative to the segment start as they should + // segment_layout + // .iter_mut() + // .for_each(|e| e.position += first_cluster_offset_predict - segment_offset); - output.write_tag(&MatroskaTag::Cues(Master::Collected( - segment_layout - .iter() - .map(|cluster| { - MatroskaTag::CuePoint(Master::Collected( - Some(MatroskaTag::CueTime(cluster.timestamp)) - .into_iter() - // TODO: Subtitles should not have cues for every cluster - .chain(inputs.iter().map(|i| { - MatroskaTag::CueTrackPositions(Master::Collected(vec![ - MatroskaTag::CueTrack(i.mapped), - MatroskaTag::CueClusterPosition(cluster.position as u64), - ])) - })) - .collect(), - )) - }) - .collect(), - )))?; - output.write_padding(first_cluster_offset_predict)?; - let first_cluster_offset = output.position(); - assert_eq!(first_cluster_offset, first_cluster_offset_predict); + // output.write_tag(&MatroskaTag::Cues(Master::Collected( + // segment_layout + // .iter() + // .map(|cluster| { + // MatroskaTag::CuePoint(Master::Collected( + // Some(MatroskaTag::CueTime(cluster.timestamp)) + // .into_iter() + // // TODO: Subtitles should not have cues for every cluster + // .chain(inputs.iter().map(|i| { + // MatroskaTag::CueTrackPositions(Master::Collected(vec![ + // MatroskaTag::CueTrack(i.mapped), + // MatroskaTag::CueClusterPosition(cluster.position as u64), + // ])) + // })) + // .collect(), + // )) + // }) + // .collect(), + // )))?; + // output.write_padding(first_cluster_offset_predict)?; + // let first_cluster_offset = output.position(); + // assert_eq!(first_cluster_offset, first_cluster_offset_predict); - let mut skip = 0; - // TODO binary search - for (i, cluster) in segment_layout.iter().enumerate() { - if (cluster.position + segment_offset) >= range.start { - break; - } - skip = i; - } + // let mut skip = 0; + // // TODO binary search + // for (i, cluster) in segment_layout.iter().enumerate() { + // if (cluster.position + segment_offset) >= range.start { + // break; + // } + // skip = i; + // } - if skip != 0 { - info!("skipping {skip} clusters"); - output.seek(SeekFrom::Start( - (segment_layout[skip].position + segment_offset) as u64, - ))?; - } + // if skip != 0 { + // info!("skipping {skip} clusters"); + // output.seek(SeekFrom::Start( + // (segment_layout[skip].position + segment_offset) as u64, + // ))?; + // } - struct ReaderD<'a> { - stream: SegmentExtractIter<'a>, - mapped: u64, - } + // struct ReaderD<'a> { + // stream: SegmentExtractIter<'a>, + // mapped: u64, + // } - let mut track_readers = inputs - .iter_mut() - .enumerate() - .map(|(i, inp)| { - inp.reader - .seek( - // the seek target might be a hole; we continue until the next cluster of that track. - // this should be fine since tracks are only read according to segment_layout - find_first_cluster_with_off(&segment_layout, skip, i) - .ok_or(anyhow!("cluster hole at eof"))?, - MatroskaTag::Cluster(Master::Start), // TODO shouldn't this be a child of cluster? - ) - .context("seeking in input")?; - let stream = SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64); + // let mut track_readers = inputs + // .iter_mut() + // .enumerate() + // .map(|(i, inp)| { + // inp.reader + // .seek( + // // the seek target might be a hole; we continue until the next cluster of that track. + // // this should be fine since tracks are only read according to segment_layout + // find_first_cluster_with_off(&segment_layout, skip, i) + // .ok_or(anyhow!("cluster hole at eof"))?, + // MatroskaTag::Cluster(Master::Start), // TODO shouldn't this be a child of cluster? + // ) + // .context("seeking in input")?; + // let stream = SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64); - Ok(ReaderD { - mapped: inp.mapped, - stream, - }) - }) - .collect::>>()?; + // Ok(ReaderD { + // mapped: inp.mapped, + // stream, + // }) + // }) + // .collect::>>()?; - info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp); + // info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp); - for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) { - debug!( - "writing cluster {cluster_index} (pts_base={}) with {} blocks", - cluster.timestamp, - cluster.blocks.len() - ); - { - let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64; - if cue_error != 0 { - warn!("calculation was {} bytes off", cue_error); - } - } + // for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) { + // debug!( + // "writing cluster {cluster_index} (pts_base={}) with {} blocks", + // cluster.timestamp, + // cluster.blocks.len() + // ); + // { + // let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64; + // if cue_error != 0 { + // warn!("calculation was {} bytes off", cue_error); + // } + // } - let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)]; - for (block_track, index_block) in cluster.blocks { - let track_reader = &mut track_readers[block_track]; - // TODO handle duration - let mut block = track_reader.stream.next_block()?.0; + // let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)]; + // for (block_track, index_block) in cluster.blocks { + // let track_reader = &mut track_readers[block_track]; + // // TODO handle duration + // let mut block = track_reader.stream.next_block()?.0; - assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); + // assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); - block.track = track_reader.mapped; - block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap(); - trace!("n={} tso={}", block.track, block.timestamp_off); + // block.track = track_reader.mapped; + // block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap(); + // trace!("n={} tso={}", block.track, block.timestamp_off); - cluster_blocks.push(MatroskaTag::SimpleBlock(block)) - } - output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?; - } - // output.write_tag(&MatroskaTag::Segment(Master::End))?; - Ok(()) + // cluster_blocks.push(MatroskaTag::SimpleBlock(block)) + // } + // output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?; + // } + // // output.write_tag(&MatroskaTag::Segment(Master::End))?; + // Ok(()) + todo!() } -fn find_first_cluster_with_off( - segment_layout: &[ClusterLayout], - skip: usize, - track: usize, -) -> Option { - for cluster in segment_layout.iter().skip(skip) { - if let Some(off) = cluster.source_offsets[track] { - return Some(off); - } - } - None -} +// fn find_first_cluster_with_off( +// segment_layout: &[ClusterLayout], +// skip: usize, +// track: usize, +// ) -> Option { +// for cluster in segment_layout.iter().skip(skip) { +// if let Some(off) = cluster.source_offsets[track] { +// return Some(off); +// } +// } +// None +// } diff --git a/server/src/routes/ui/player.rs b/server/src/routes/ui/player.rs index aa567ab..2cc2dd4 100644 --- a/server/src/routes/ui/player.rs +++ b/server/src/routes/ui/player.rs @@ -15,9 +15,10 @@ use crate::{ uri, }; use anyhow::anyhow; -use jellybase::{permission::PermissionSetExt, CONF}; +use jellybase::CONF; use jellycommon::{ - user::{PermissionSet, PlayerKind, UserPermission}, + stream::{StreamContainer, StreamSpec}, + user::{PermissionSet, PlayerKind}, Node, NodeID, SourceTrackKind, TrackID, }; use markup::DynRender; @@ -45,15 +46,14 @@ impl PlayerConfig { fn jellynative_url(action: &str, seek: f64, secret: &str, node: &str, session: &str) -> String { let protocol = if CONF.tls { "https" } else { "http" }; let host = &CONF.hostname; - let stream_url = ""; - // TODO - // uri!(r_stream( - // node, - // StreamSpec { - // format: StreamFormat::HlsMaster, - // ..Default::default() - // } - // )); + let stream_url = format!( + "/n/{node}/stream{}", + StreamSpec::HlsMultiVariant { + segment: 0, + container: StreamContainer::Matroska + } + .to_query() + ); format!("jellynative://{action}/{secret}/{session}/{seek}/{protocol}://{host}{stream_url}",) } diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index a34bb8d..52d32f4 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,37 +3,29 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; -use jellytranscoder::fragment::transcode; +use jellybase::common::stream::StreamContainer; use log::warn; use std::sync::Arc; -use tokio::{fs::File, io::DuplexStream}; +use tokio::io::DuplexStream; use tokio_util::io::SyncIoBridge; pub async fn fragment_stream( - node: Arc, - local_tracks: Vec, - spec: StreamSpec, mut b: DuplexStream, - perms: &PermissionSet, - webm: bool, - track: u64, - segment: u64, + info: Arc, + track: usize, + segment: usize, index: usize, + format: usize, + container: StreamContainer, ) -> Result<()> { - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let path = iinfo.paths[file_index].clone(); // if let Some(profile) = None { // perms.assert(&UserPermission::Transcode)?; @@ -70,11 +62,10 @@ pub async fn fragment_stream( tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( b, - &CONF.media_path, - &node, - &local_track, - track as usize, - webm, + &path, + track_num, + container == StreamContainer::WebM, + &info.name.unwrap_or_default(), index, ) { warn!("segment stream error: {err}"); diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs new file mode 100644 index 0000000..6fbddc6 --- /dev/null +++ b/stream/src/fragment_index.rs @@ -0,0 +1,32 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +use crate::{stream_info, SMediaInfo}; +use anyhow::{anyhow, Result}; +use jellybase::common::stream::{SegmentNum, TrackNum}; +use std::sync::Arc; +use tokio::io::{AsyncWriteExt, DuplexStream}; + +pub async fn fragment_index_stream( + mut b: DuplexStream, + info: Arc, + _segment: SegmentNum, + track: TrackNum, +) -> Result<()> { + let (iinfo, _info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + + let fragments = tokio::task::spawn_blocking(move || { + jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) + }) + .await??; + + let out = serde_json::to_string(&fragments)?; + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 27630b2..f06ac72 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -4,13 +4,10 @@ Copyright (C) 2025 metamuffin */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - stream::{StreamContainer, StreamSpec}, - LocalTrack, Node, SourceTrackKind, - }, - CONF, +use jellybase::common::stream::{ + FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum, }; use std::{fmt::Write, ops::Range, sync::Arc}; use tokio::{ @@ -19,20 +16,24 @@ use tokio::{ }; pub async fn hls_master_stream( - node: Arc, - _local_tracks: Vec, - segment: u64, - container: StreamContainer, mut b: DuplexStream, + info: Arc, + segment: SegmentNum, + container: StreamContainer, ) -> Result<()> { - let media = node.media.as_ref().ok_or(anyhow!("no media"))?; + let (_iinfo, info) = stream_info(info).await?; + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-VERSION:4")?; // writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?; - for (i, t) in media.tracks.iter().enumerate() { + for (i, t) in seg.tracks.iter().enumerate() { let uri = format!( - "stream?{}", + "stream{}", StreamSpec::HlsVariant { segment, track: i, @@ -42,10 +43,11 @@ pub async fn hls_master_stream( .to_query() ); let r#type = match t.kind { - SourceTrackKind::Video { .. } => "VIDEO", - SourceTrackKind::Audio { .. } => "AUDIO", - SourceTrackKind::Subtitles => "SUBTITLES", + TrackKind::Video => "VIDEO", + TrackKind::Audio => "AUDIO", + TrackKind::Subtitle => "SUBTITLES", }; + // TODO bw writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?; writeln!(out, "{uri}")?; } @@ -54,42 +56,44 @@ pub async fn hls_master_stream( } pub async fn hls_variant_stream( - node: Arc, - local_tracks: Vec, - segment: u64, - track: usize, - format: usize, - container: StreamContainer, mut b: DuplexStream, + info: Arc, + segment: SegmentNum, + track: TrackNum, + format: FormatNum, + container: StreamContainer, ) -> Result<()> { - let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned(); - let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?; + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index( - &CONF.media_path, - &node, - &local_track, - track as usize, - ) + jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) }) .await??; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?; - writeln!(out, "#EXT-X-TARGETDURATION:{}", media_info.duration)?; + writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?; writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - for (i, Range { start, end }) in frags.iter().enumerate() { + for (index, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; writeln!( out, - "stream?{}", + "stream{}", StreamSpec::Fragment { segment, track, - index: i as u64, + index, container, format, } diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs deleted file mode 100644 index 2a2faec..0000000 --- a/stream/src/jhls.rs +++ /dev/null @@ -1,47 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ -use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - jhls::JhlsTrackIndex, - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; -use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; - -pub async fn jhls_index( - node: Arc, - local_tracks: &[LocalTrack], - spec: StreamSpec, - mut b: DuplexStream, - perms: &PermissionSet, -) -> Result<()> { - // let local_track = local_tracks - // .first() - // .ok_or(anyhow!("track missing"))? - // .to_owned(); - - // let fragments = tokio::task::spawn_blocking(move || { - // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) - // }) - // .await??; - - // let out = serde_json::to_string(&JhlsTrackIndex { - // extra_profiles: if perms.check(&UserPermission::Transcode) { - // CONF.transcoding_profiles.clone() - // } else { - // vec![] - // }, - // fragments, - // })?; - // tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) -} diff --git a/stream/src/lib.rs b/stream/src/lib.rs index d09759f..a6faf54 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -5,17 +5,20 @@ */ #![feature(iterator_try_collect)] pub mod fragment; +pub mod fragment_index; pub mod hls; -pub mod jhls; pub mod webvtt; use anyhow::{anyhow, Context, Result}; +use fragment::fragment_stream; +use fragment_index::fragment_index_stream; +use hls::{hls_master_stream, hls_variant_stream}; use jellybase::common::{ stream::{ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, StreamTrackInfo, TrackKind, }, - LocalTrack, Node, + Node, }; use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; @@ -75,22 +78,26 @@ pub async fn stream( StreamSpec::Remux { tracks, container } => todo!(), StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => todo!(), - StreamSpec::HlsMultiVariant { segment, container } => todo!(), + StreamSpec::HlsMultiVariant { segment, container } => { + hls_master_stream(b, info, segment, container).await? + } StreamSpec::HlsVariant { segment, track, container, format, - } => todo!(), - StreamSpec::Info { segment } => write_stream_info(info, b).await?, - StreamSpec::FragmentIndex { segment, track } => todo!(), + } => hls_variant_stream(b, info, segment, track, format, container).await?, + StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, + StreamSpec::FragmentIndex { segment, track } => { + fragment_index_stream(b, info, segment, track).await? + } StreamSpec::Fragment { segment, track, index, container, format, - } => todo!(), + } => fragment_stream(b, info, track, segment, index, format, container).await?, } Ok(a) @@ -103,7 +110,7 @@ async fn async_matroska_metadata(path: PathBuf) -> Result> pub(crate) struct InternalStreamInfo { pub paths: Vec, pub metadata: Vec>, - pub track_to_file: Vec, + pub track_to_file: Vec<(usize, u64)>, } async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { @@ -142,14 +149,19 @@ async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, Strea }, formats, }); - track_to_file.push(i); + track_to_file.push((i, t.track_number)); } } } let segment = StreamSegmentInfo { name: None, - duration: 0, + duration: metadata[0] + .info + .as_ref() + .unwrap() + .duration + .unwrap_or_default(), tracks, }; Ok(( @@ -173,7 +185,6 @@ async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result async fn remux_stream( node: Arc, - local_tracks: Vec, spec: StreamSpec, range: Range, b: DuplexStream, @@ -202,8 +213,7 @@ async fn original_stream( b: DuplexStream, ) -> Result<()> { let (iinfo, _info) = stream_info(info).await?; - - let file_index = *iinfo + let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index fbd6382..960849c 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, Context, Result}; use jellybase::{ cache::async_cache_memory, - common::{stream::StreamSpec, LocalTrack, Node}, + common::{stream::StreamSpec, Node}, CONF, }; use jellyremuxer::extract::extract_track; @@ -17,7 +17,6 @@ use tokio::io::{AsyncWriteExt, DuplexStream}; pub async fn vtt_stream( json: bool, node: Arc, - local_tracks: Vec, spec: StreamSpec, mut b: DuplexStream, ) -> Result<()> { -- cgit v1.2.3-70-g09d2 From 92b119f95dd1cb24054f2440533208c140b66e46 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 14 Apr 2025 13:52:16 +0200 Subject: fix duration from mkmeta in fragment --- remuxer/src/fragment.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/remuxer/src/fragment.rs b/remuxer/src/fragment.rs index 73fe046..0da1ed5 100644 --- a/remuxer/src/fragment.rs +++ b/remuxer/src/fragment.rs @@ -5,8 +5,10 @@ */ use crate::{ - ebml_header, ebml_segment_info, ebml_track_entry, metadata::matroska_metadata, - seek_index::get_seek_index, segment_extractor::SegmentExtractIter, + ebml_header, ebml_segment_info, ebml_track_entry, + metadata::{matroska_metadata, MatroskaMetadata}, + seek_index::get_seek_index, + segment_extractor::SegmentExtractIter, }; use anyhow::{anyhow, Context, Result}; use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag}; @@ -18,11 +20,11 @@ use std::{ path::Path, }; -const FRAGMENT_LENGTH: f64 = 2.; +const FRAGMENT_LENGTH: f64 = 5.; pub fn fragment_index(path: &Path, track: u64) -> Result>> { let meta = matroska_metadata(path)?; - let duration = meta.info.as_ref().unwrap().duration.unwrap(); + let duration = media_duration(&meta); let force_kf = meta .as_ref() .tracks @@ -87,7 +89,7 @@ pub fn write_fragment_into( n: usize, ) -> anyhow::Result<()> { let meta = matroska_metadata(path)?; - let duration = meta.info.as_ref().unwrap().duration.unwrap(); + let duration = media_duration(&meta); let track_meta = meta .as_ref() .tracks @@ -118,6 +120,7 @@ pub fn write_fragment_into( } else { index.keyframes.len() }; + debug!("{duration} {n_kf}"); let average_kf_interval = duration / n_kf as f64; let kf_per_frag = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize; debug!("average keyframe interval: {average_kf_interval}"); @@ -209,3 +212,8 @@ pub fn write_fragment_into( debug!("wrote {} bytes", output.position()); Ok(()) } + +fn media_duration(m: &MatroskaMetadata) -> f64 { + let info = m.info.as_ref().unwrap(); + (info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000. +} -- cgit v1.2.3-70-g09d2 From 4a36d9e96853bf04d17f8377a7fbf862d108b9f1 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 14 Apr 2025 14:33:52 +0200 Subject: start transcoding refactor --- common/src/config.rs | 61 ++++++++++------------------ common/src/jhls.rs | 27 ------------- common/src/stream.rs | 3 +- stream/src/fragment.rs | 99 ++++++++++++++++++++++++---------------------- stream/src/lib.rs | 3 +- transcoder/src/fragment.rs | 94 +++++++++++++++++++++---------------------- 6 files changed, 124 insertions(+), 163 deletions(-) diff --git a/common/src/config.rs b/common/src/config.rs index d7682df..a0dc459 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -4,7 +4,7 @@ Copyright (C) 2025 metamuffin */ -use crate::{jhls::EncodingProfile, user::PermissionSet}; +use crate::user::PermissionSet; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, path::PathBuf}; @@ -20,11 +20,30 @@ pub struct GlobalConfig { #[serde(default = "default::cache_path")] pub cache_path: PathBuf, #[serde(default = "default::media_path")] pub media_path: PathBuf, #[serde(default = "default::secrets_path")] pub secrets_path: PathBuf, - #[serde(default = "default::transcoding_profiles")] pub transcoding_profiles: Vec, #[serde(default = "default::max_in_memory_cache_size")] pub max_in_memory_cache_size: usize, #[serde(default)] pub admin_username: Option, #[serde(default = "default::login_expire")] pub login_expire: i64, #[serde(default)] pub default_permission_set: PermissionSet, + #[serde(default)] encoders: EncoderPreferences, +} + +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct EncoderPreferences { + avc: Option, + hevc: Option, + vp8: Option, + vp9: Option, + av1: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +enum EncoderClass { + Aom, + Svt, + X26n, + Vpx, + Vaapi, + Rkmpp, } #[derive(Serialize, Deserialize, Debug, Default)] @@ -59,7 +78,6 @@ pub struct ApiSecrets { } mod default { - use crate::jhls::EncodingProfile; use std::path::PathBuf; pub fn login_expire() -> i64 { @@ -83,43 +101,6 @@ mod default { pub fn max_in_memory_cache_size() -> usize { 50_000_000 } - pub fn transcoding_profiles() -> Vec { - vec![ - EncodingProfile::Video { - codec: "libsvtav1".to_string(), - preset: Some(8), - bitrate: 2_000_000, - width: Some(1920), - }, - EncodingProfile::Video { - codec: "libsvtav1".to_string(), - preset: Some(8), - bitrate: 1_200_000, - width: Some(1280), - }, - EncodingProfile::Video { - codec: "libsvtav1".to_string(), - preset: Some(8), - bitrate: 300_000, - width: Some(640), - }, - EncodingProfile::Audio { - codec: "libopus".to_string(), - bitrate: 128_000, - sample_rate: None, - channels: Some(2), - }, - EncodingProfile::Audio { - codec: "libopus".to_string(), - bitrate: 64_000, - sample_rate: None, - channels: Some(2), - }, - EncodingProfile::Subtitles { - codec: "webvtt".to_string(), - }, - ] - } } fn return_true() -> bool { diff --git a/common/src/jhls.rs b/common/src/jhls.rs index 6dc976b..90f48f5 100644 --- a/common/src/jhls.rs +++ b/common/src/jhls.rs @@ -5,33 +5,6 @@ */ use bincode::{Decode, Encode}; use serde::{Deserialize, Serialize}; -use std::ops::Range; - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct JhlsTrackIndex { - pub extra_profiles: Vec, - pub fragments: Vec>, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] -pub enum EncodingProfile { - Video { - codec: String, - preset: Option, - bitrate: usize, - width: Option, - }, - Audio { - codec: String, - bitrate: usize, - channels: Option, - sample_rate: Option, - }, - Subtitles { - codec: String, - }, -} #[derive(Debug, Serialize, Deserialize, Encode, Decode)] pub struct SubtitleCue { diff --git a/common/src/stream.rs b/common/src/stream.rs index 75349cc..a14fd57 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -91,7 +91,8 @@ pub struct StreamFormatInfo { pub remux: bool, pub containers: Vec, - pub pixel_count: Option, + pub width: Option, + pub height: Option, pub samplerate: Option, pub channels: Option, pub bit_depth: Option, diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index 52d32f4..b2e254b 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -6,9 +6,10 @@ use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; use jellybase::common::stream::StreamContainer; +use jellytranscoder::fragment::transcode; use log::warn; use std::sync::Arc; -use tokio::io::DuplexStream; +use tokio::{fs::File, io::DuplexStream}; use tokio_util::io::SyncIoBridge; pub async fn fragment_stream( @@ -17,7 +18,7 @@ pub async fn fragment_stream( track: usize, segment: usize, index: usize, - format: usize, + format_num: usize, container: StreamContainer, ) -> Result<()> { let (iinfo, info) = stream_info(info).await?; @@ -26,52 +27,56 @@ pub async fn fragment_stream( .get(track) .ok_or(anyhow!("track not found"))?; let path = iinfo.paths[file_index].clone(); + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; + let format = track + .formats + .get(format_num) + .ok_or(anyhow!("format not found"))?; - // if let Some(profile) = None { - // perms.assert(&UserPermission::Transcode)?; - // let location = transcode( - // &format!("{track} {index} {:?}", node), // TODO maybe not use the entire source - // CONF.transcoding_profiles - // .get(profile) - // .ok_or(anyhow!("profile out of range"))?, - // move |b| { - // tokio::task::spawn_blocking(move || { - // if let Err(err) = jellyremuxer::write_fragment_into( - // SyncIoBridge::new(b), - // &CONF.media_path, - // &node, - // &local_track, - // track as usize, - // false, - // index, - // ) { - // warn!("segment stream error: {err}"); - // } - // }); - // }, - // ) - // .await?; - // let mut output = File::open(location.abs()).await?; - // tokio::task::spawn(async move { - // if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - // warn!("cannot write stream: {err}") - // } - // }); - // } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - b, - &path, - track_num, - container == StreamContainer::WebM, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - // } + if format.remux { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + SyncIoBridge::new(b), + &path, + track_num, + container == StreamContainer::WebM, + &info.name.unwrap_or_default(), + index, + ) { + warn!("segment stream error: {err}"); + } + }); + } else { + let location = transcode( + &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source + format, + move |b| { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + SyncIoBridge::new(b), + &path, + track_num, + container == StreamContainer::WebM, + &info.name.unwrap_or_default(), + index, + ) { + warn!("segment stream error: {err}"); + } + }); + }, + ) + .await?; + let mut output = File::open(location.abs()).await?; + tokio::task::spawn(async move { + if let Err(err) = tokio::io::copy(&mut output, &mut b).await { + warn!("cannot write stream: {err}") + } + }); + } Ok(()) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index a6faf54..eb56529 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -136,7 +136,8 @@ async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, Strea bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), channels: t.audio.as_ref().map(|a| a.channels as usize), - pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height), + width: t.video.as_ref().map(|v| v.pixel_width), + height: t.video.as_ref().map(|v| v.pixel_height), ..Default::default() }); tracks.push(StreamTrackInfo { diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index ff6a9db..b88339c 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -7,7 +7,7 @@ use crate::LOCAL_VIDEO_TRANSCODING_TASKS; use jellybase::{ cache::{async_cache_file, CachePath}, - common::jhls::EncodingProfile, + common::stream::StreamFormatInfo, }; use log::{debug, info}; use std::process::Stdio; @@ -21,7 +21,7 @@ use tokio::{ pub async fn transcode( key: &str, - enc: &EncodingProfile, + enc: &StreamFormatInfo, input: impl FnOnce(ChildStdin), ) -> anyhow::Result { async_cache_file( @@ -30,51 +30,51 @@ pub async fn transcode( let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; debug!("transcoding fragment with {enc:?}"); - let mut args = Vec::new(); - match enc { - EncodingProfile::Video { - codec, - preset, - bitrate, - width, - } => { - if let Some(width) = width { - args.push("-vf".to_string()); - args.push(format!("scale={width}:-1")); - } - args.push("-c:v".to_string()); - args.push(codec.to_string()); - if let Some(preset) = preset { - args.push("-preset".to_string()); - args.push(format!("{preset}")); - } - args.push("-b:v".to_string()); - args.push(format!("{bitrate}")); - } - EncodingProfile::Audio { - codec, - bitrate, - sample_rate, - channels, - } => { - if let Some(channels) = channels { - args.push("-ac".to_string()); - args.push(format!("{channels}")) - } - if let Some(sample_rate) = sample_rate { - args.push("-ar".to_string()); - args.push(format!("{sample_rate}")) - } - args.push("-c:a".to_string()); - args.push(codec.to_string()); - args.push("-b:a".to_string()); - args.push(format!("{bitrate}")); - } - EncodingProfile::Subtitles { codec } => { - args.push("-c:s".to_string()); - args.push(codec.to_string()); - } - }; + let mut args = Vec::::new(); + // match enc { + // EncodingProfile::Video { + // codec, + // preset, + // bitrate, + // width, + // } => { + // if let Some(width) = width { + // args.push("-vf".to_string()); + // args.push(format!("scale={width}:-1")); + // } + // args.push("-c:v".to_string()); + // args.push(codec.to_string()); + // if let Some(preset) = preset { + // args.push("-preset".to_string()); + // args.push(format!("{preset}")); + // } + // args.push("-b:v".to_string()); + // args.push(format!("{bitrate}")); + // } + // EncodingProfile::Audio { + // codec, + // bitrate, + // sample_rate, + // channels, + // } => { + // if let Some(channels) = channels { + // args.push("-ac".to_string()); + // args.push(format!("{channels}")) + // } + // if let Some(sample_rate) = sample_rate { + // args.push("-ar".to_string()); + // args.push(format!("{sample_rate}")) + // } + // args.push("-c:a".to_string()); + // args.push(codec.to_string()); + // args.push("-b:a".to_string()); + // args.push(format!("{bitrate}")); + // } + // EncodingProfile::Subtitles { codec } => { + // args.push("-c:s".to_string()); + // args.push(codec.to_string()); + // } + // }; info!("encoding with {:?}", args.join(" ")); let mut proc = Command::new("ffmpeg") -- cgit v1.2.3-70-g09d2 From 42e08750a5a9a112d458a5db1d6b169278e953c5 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 14 Apr 2025 16:02:42 +0200 Subject: stream info for transcoding --- common/src/config.rs | 71 ++++++------- common/src/stream.rs | 5 +- import/src/infojson.rs | 2 +- server/src/routes/compat/jellyfin/mod.rs | 18 ++-- stream/src/fragment.rs | 1 + stream/src/lib.rs | 91 +---------------- stream/src/stream_info.rs | 164 +++++++++++++++++++++++++++++++ stream/src/webvtt.rs | 18 ++-- transcoder/src/fragment.rs | 74 ++++---------- 9 files changed, 239 insertions(+), 205 deletions(-) create mode 100644 stream/src/stream_info.rs diff --git a/common/src/config.rs b/common/src/config.rs index a0dc459..3a48fea 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -8,36 +8,42 @@ use crate::user::PermissionSet; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, path::PathBuf}; -#[rustfmt::skip] #[derive(Debug, Deserialize, Serialize, Default)] pub struct GlobalConfig { pub hostname: String, pub brand: String, pub slogan: String, - #[serde(default = "return_true" )] pub tls: bool, - #[serde(default = "default::asset_path")] pub asset_path: PathBuf, - #[serde(default = "default::database_path")] pub database_path: PathBuf, - #[serde(default = "default::cache_path")] pub cache_path: PathBuf, - #[serde(default = "default::media_path")] pub media_path: PathBuf, - #[serde(default = "default::secrets_path")] pub secrets_path: PathBuf, - #[serde(default = "default::max_in_memory_cache_size")] pub max_in_memory_cache_size: usize, - #[serde(default)] pub admin_username: Option, - #[serde(default = "default::login_expire")] pub login_expire: i64, - #[serde(default)] pub default_permission_set: PermissionSet, - #[serde(default)] encoders: EncoderPreferences, + #[serde(default = "return_true")] + pub tls: bool, + pub asset_path: PathBuf, + pub database_path: PathBuf, + pub cache_path: PathBuf, + pub media_path: PathBuf, + pub secrets_path: PathBuf, + #[serde(default = "max_in_memory_cache_size")] + pub max_in_memory_cache_size: usize, + #[serde(default)] + pub admin_username: Option, + #[serde(default = "login_expire")] + pub login_expire: i64, + #[serde(default)] + pub default_permission_set: PermissionSet, + #[serde(default)] + pub encoders: EncoderPreferences, } #[derive(Debug, Deserialize, Serialize, Default)] pub struct EncoderPreferences { - avc: Option, - hevc: Option, - vp8: Option, - vp9: Option, - av1: Option, + pub avc: Option, + pub hevc: Option, + pub vp8: Option, + pub vp9: Option, + pub av1: Option, } #[derive(Debug, Deserialize, Serialize)] -enum EncoderClass { +#[serde(rename_all = "snake_case")] +pub enum EncoderClass { Aom, Svt, X26n, @@ -77,30 +83,11 @@ pub struct ApiSecrets { pub trakt: Option, } -mod default { - use std::path::PathBuf; - - pub fn login_expire() -> i64 { - 60 * 60 * 24 - } - pub fn asset_path() -> PathBuf { - "data/assets".into() - } - pub fn database_path() -> PathBuf { - "data/database".into() - } - pub fn cache_path() -> PathBuf { - "data/cache".into() - } - pub fn media_path() -> PathBuf { - "data/media".into() - } - pub fn secrets_path() -> PathBuf { - "data/secrets.yaml".into() - } - pub fn max_in_memory_cache_size() -> usize { - 50_000_000 - } +fn login_expire() -> i64 { + 60 * 60 * 24 +} +fn max_in_memory_cache_size() -> usize { + 50_000_000 } fn return_true() -> bool { diff --git a/common/src/stream.rs b/common/src/stream.rs index a14fd57..555a5d0 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -87,7 +87,7 @@ pub enum TrackKind { #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct StreamFormatInfo { pub codec: String, - pub byterate: f64, + pub bitrate: f64, pub remux: bool, pub containers: Vec, @@ -104,6 +104,7 @@ pub enum StreamContainer { WebM, Matroska, WebVTT, + MPEG4, JVTT, } @@ -203,6 +204,7 @@ impl Display for StreamContainer { StreamContainer::Matroska => "matroska", StreamContainer::WebVTT => "webvtt", StreamContainer::JVTT => "jvtt", + StreamContainer::MPEG4 => "mp4", }) } } @@ -214,6 +216,7 @@ impl FromStr for StreamContainer { "matroska" => StreamContainer::Matroska, "webvtt" => StreamContainer::WebVTT, "jvtt" => StreamContainer::JVTT, + "mp4" => StreamContainer::MPEG4, _ => return Err(()), }) } diff --git a/import/src/infojson.rs b/import/src/infojson.rs index 3a8d76e..1efbae9 100644 --- a/import/src/infojson.rs +++ b/import/src/infojson.rs @@ -86,7 +86,7 @@ pub struct YFormat { pub fps: Option, pub columns: Option, pub fragments: Option>, - pub resolution: String, + pub resolution: Option, pub dynamic_range: Option, pub aspect_ratio: Option, pub http_headers: HashMap, diff --git a/server/src/routes/compat/jellyfin/mod.rs b/server/src/routes/compat/jellyfin/mod.rs index 6066760..7393c5f 100644 --- a/server/src/routes/compat/jellyfin/mod.rs +++ b/server/src/routes/compat/jellyfin/mod.rs @@ -5,18 +5,14 @@ */ pub mod models; -use crate::routes::{ - stream::rocket_uri_macro_r_stream, - ui::{ - account::{login_logic, session::Session}, - assets::{ - rocket_uri_macro_r_asset, rocket_uri_macro_r_item_backdrop, - rocket_uri_macro_r_item_poster, - }, - error::MyResult, - node::{aspect_class, DatabaseNodeUserDataExt}, - sort::{filter_and_sort_nodes, FilterProperty, NodeFilterSort, SortOrder, SortProperty}, +use crate::routes::ui::{ + account::{login_logic, session::Session}, + assets::{ + rocket_uri_macro_r_asset, rocket_uri_macro_r_item_backdrop, rocket_uri_macro_r_item_poster, }, + error::MyResult, + node::{aspect_class, DatabaseNodeUserDataExt}, + sort::{filter_and_sort_nodes, FilterProperty, NodeFilterSort, SortOrder, SortProperty}, }; use anyhow::{anyhow, Context}; use jellybase::{database::Database, CONF}; diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index b2e254b..e0644aa 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -54,6 +54,7 @@ pub async fn fragment_stream( let location = transcode( &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source format, + container, move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( diff --git a/stream/src/lib.rs b/stream/src/lib.rs index eb56529..18ad2a7 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -7,6 +7,7 @@ pub mod fragment; pub mod fragment_index; pub mod hls; +pub mod stream_info; pub mod webvtt; use anyhow::{anyhow, Context, Result}; @@ -14,18 +15,14 @@ use fragment::fragment_stream; use fragment_index::fragment_index_stream; use hls::{hls_master_stream, hls_variant_stream}; use jellybase::common::{ - stream::{ - StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, - StreamTrackInfo, TrackKind, - }, + stream::{StreamContainer, StreamSpec}, Node, }; -use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; +use stream_info::{stream_info, write_stream_info}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, - task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; @@ -50,6 +47,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead { StreamContainer::Matroska => "video/x-matroska", StreamContainer::WebVTT => "text/vtt", StreamContainer::JVTT => "application/jellything-vtt+json", + StreamContainer::MPEG4 => "video/mp4", }; match spec { StreamSpec::Whep { .. } => cons("application/x-todo", false), @@ -103,87 +101,6 @@ pub async fn stream( Ok(a) } -async fn async_matroska_metadata(path: PathBuf) -> Result> { - Ok(spawn_blocking(move || matroska_metadata(&path)).await??) -} - -pub(crate) struct InternalStreamInfo { - pub paths: Vec, - pub metadata: Vec>, - pub track_to_file: Vec<(usize, u64)>, -} - -async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { - let mut metadata = Vec::new(); - let mut paths = Vec::new(); - for path in &info.files { - metadata.push(async_matroska_metadata(path.clone()).await?); - paths.push(path.clone()); - } - - let mut tracks = Vec::new(); - let mut track_to_file = Vec::new(); - - for (i, m) in metadata.iter().enumerate() { - if let Some(t) = &m.tracks { - for t in &t.entries { - let mut formats = Vec::new(); - formats.push(StreamFormatInfo { - codec: t.codec_id.to_string(), - remux: true, - byterate: 10., // TODO - containers: [StreamContainer::Matroska].to_vec(), - bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), - samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), - channels: t.audio.as_ref().map(|a| a.channels as usize), - width: t.video.as_ref().map(|v| v.pixel_width), - height: t.video.as_ref().map(|v| v.pixel_height), - ..Default::default() - }); - tracks.push(StreamTrackInfo { - name: None, - kind: match t.track_type { - 1 => TrackKind::Video, - 2 => TrackKind::Audio, - 17 => TrackKind::Subtitle, - _ => todo!(), - }, - formats, - }); - track_to_file.push((i, t.track_number)); - } - } - } - - let segment = StreamSegmentInfo { - name: None, - duration: metadata[0] - .info - .as_ref() - .unwrap() - .duration - .unwrap_or_default(), - tracks, - }; - Ok(( - InternalStreamInfo { - metadata, - paths, - track_to_file, - }, - StreamInfo { - name: info.info.title.clone(), - segments: vec![segment], - }, - )) -} - -async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { - let (_, info) = stream_info(info).await?; - b.write_all(&serde_json::to_vec(&info)?).await?; - Ok(()) -} - async fn remux_stream( node: Arc, spec: StreamSpec, diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs new file mode 100644 index 0000000..9d3d741 --- /dev/null +++ b/stream/src/stream_info.rs @@ -0,0 +1,164 @@ +use anyhow::Result; +use ebml_struct::matroska::TrackEntry; +use jellybase::{ + common::stream::{ + StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, + TrackKind, + }, + CONF, +}; +use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; +use std::{path::PathBuf, sync::Arc}; +use tokio::{ + io::{AsyncWriteExt, DuplexStream}, + spawn, + task::spawn_blocking, +}; + +use crate::SMediaInfo; + +async fn async_matroska_metadata(path: PathBuf) -> Result> { + Ok(spawn_blocking(move || matroska_metadata(&path)).await??) +} + +pub(crate) struct InternalStreamInfo { + pub paths: Vec, + pub metadata: Vec>, + pub track_to_file: Vec<(usize, u64)>, +} + +pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { + let mut metadata = Vec::new(); + let mut paths = Vec::new(); + for path in &info.files { + metadata.push(async_matroska_metadata(path.clone()).await?); + paths.push(path.clone()); + } + let mut tracks = Vec::new(); + let mut track_to_file = Vec::new(); + + for (i, m) in metadata.iter().enumerate() { + if let Some(t) = &m.tracks { + for t in &t.entries { + tracks.push(StreamTrackInfo { + name: None, + kind: match t.track_type { + 1 => TrackKind::Video, + 2 => TrackKind::Audio, + 17 => TrackKind::Subtitle, + _ => todo!(), + }, + formats: stream_formats(t), + }); + track_to_file.push((i, t.track_number)); + } + } + } + + let segment = StreamSegmentInfo { + name: None, + duration: metadata[0] + .info + .as_ref() + .unwrap() + .duration + .unwrap_or_default(), + tracks, + }; + Ok(( + InternalStreamInfo { + metadata, + paths, + track_to_file, + }, + StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }, + )) +} + +fn stream_formats(t: &TrackEntry) -> Vec { + let mut formats = Vec::new(); + formats.push(StreamFormatInfo { + codec: t.codec_id.to_string(), + remux: true, + bitrate: 2_000_000., // TODO + containers: containers_by_codec(&t.codec_id), + bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), + samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), + channels: t.audio.as_ref().map(|a| a.channels as usize), + width: t.video.as_ref().map(|v| v.pixel_width), + height: t.video.as_ref().map(|v| v.pixel_height), + ..Default::default() + }); + + match t.track_type { + 1 => { + let sw = t.video.as_ref().unwrap().pixel_width; + let sh = t.video.as_ref().unwrap().pixel_height; + for (w, br) in [(3840, 8e6), (1920, 5e6), (1280, 3e6), (640, 1e6)] { + if w > sw { + continue; + } + let h = (w * sh) / sw; + for (cid, enable) in [ + ("V_AV1", CONF.encoders.av1.is_some()), + ("V_VP8", CONF.encoders.vp8.is_some()), + ("V_VP9", CONF.encoders.vp9.is_some()), + ("V_AVC", CONF.encoders.avc.is_some()), + ("V_HEVC", CONF.encoders.hevc.is_some()), + ] { + if enable { + formats.push(StreamFormatInfo { + codec: cid.to_string(), + bitrate: br, + remux: false, + containers: containers_by_codec(cid), + width: Some(w), + height: Some(h), + samplerate: None, + channels: None, + bit_depth: None, + }); + } + } + } + } + 2 => { + for br in [256e3, 128e3, 64e3] { + formats.push(StreamFormatInfo { + codec: "A_OPUS".to_string(), + bitrate: br, + remux: false, + containers: containers_by_codec("A_OPUS"), + width: None, + height: None, + samplerate: Some(48e3), + channels: Some(2), + bit_depth: Some(32), + }); + } + } + 17 => {} + _ => {} + } + + formats +} + +fn containers_by_codec(codec: &str) -> Vec { + use StreamContainer::*; + match codec { + "V_VP8" | "V_VP9" | "V_AV1" | "A_OPUS" | "A_VORBIS" => vec![Matroska, WebM], + "V_AVC" | "A_AAC" => vec![Matroska, MPEG4], + "S_TEXT/UTF8" | "S_TEXT/WEBVTT" => vec![Matroska, WebVTT, WebM, JVTT], + _ => vec![Matroska], + } +} + +pub(crate) async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { + let (_, info) = stream_info(info).await?; + spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await }); + Ok(()) +} diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index 960849c..e9f0181 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -3,23 +3,21 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use anyhow::{anyhow, Context, Result}; -use jellybase::{ - cache::async_cache_memory, - common::{stream::StreamSpec, Node}, - CONF, -}; -use jellyremuxer::extract::extract_track; -use jellytranscoder::subtitles::{parse_subtitles, write_webvtt}; +use anyhow::Result; +use jellybase::common::{stream::StreamSpec, Node}; use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; +use tokio::io::DuplexStream; pub async fn vtt_stream( json: bool, node: Arc, spec: StreamSpec, - mut b: DuplexStream, + b: DuplexStream, ) -> Result<()> { + let _ = b; + let _ = spec; + let _ = node; + let _ = json; // TODO cache // TODO should use fragments too? big films take too long... diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index b88339c..3cb4c40 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -7,7 +7,7 @@ use crate::LOCAL_VIDEO_TRANSCODING_TASKS; use jellybase::{ cache::{async_cache_file, CachePath}, - common::stream::StreamFormatInfo, + common::stream::{StreamContainer, StreamFormatInfo}, }; use log::{debug, info}; use std::process::Stdio; @@ -21,73 +21,41 @@ use tokio::{ pub async fn transcode( key: &str, - enc: &StreamFormatInfo, + format: &StreamFormatInfo, + container: StreamContainer, input: impl FnOnce(ChildStdin), ) -> anyhow::Result { async_cache_file( - &["frag-tc", key, &format!("{enc:?}")], + &["frag-tc", key, &format!("{format:?}")], move |mut output| async move { let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; - debug!("transcoding fragment with {enc:?}"); + debug!("transcoding fragment with {format:?}"); let mut args = Vec::::new(); - // match enc { - // EncodingProfile::Video { - // codec, - // preset, - // bitrate, - // width, - // } => { - // if let Some(width) = width { - // args.push("-vf".to_string()); - // args.push(format!("scale={width}:-1")); - // } - // args.push("-c:v".to_string()); - // args.push(codec.to_string()); - // if let Some(preset) = preset { - // args.push("-preset".to_string()); - // args.push(format!("{preset}")); - // } - // args.push("-b:v".to_string()); - // args.push(format!("{bitrate}")); - // } - // EncodingProfile::Audio { - // codec, - // bitrate, - // sample_rate, - // channels, - // } => { - // if let Some(channels) = channels { - // args.push("-ac".to_string()); - // args.push(format!("{channels}")) - // } - // if let Some(sample_rate) = sample_rate { - // args.push("-ar".to_string()); - // args.push(format!("{sample_rate}")) - // } - // args.push("-c:a".to_string()); - // args.push(codec.to_string()); - // args.push("-b:a".to_string()); - // args.push(format!("{bitrate}")); - // } - // EncodingProfile::Subtitles { codec } => { - // args.push("-c:s".to_string()); - // args.push(codec.to_string()); - // } - // }; + + match format.codec.as_str() { + "V_AVC" => {} + + _ => unreachable!(), + } + info!("encoding with {:?}", args.join(" ")); + let container = match container { + StreamContainer::WebM => "webm", + StreamContainer::Matroska => "matroska", + StreamContainer::WebVTT => "vtt", + StreamContainer::MPEG4 => "mp4", + StreamContainer::JVTT => unreachable!(), + }; + let mut proc = Command::new("ffmpeg") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .args(["-f", "matroska", "-i", "pipe:0"]) .args(args) - .args(["-f", "webm", "pipe:1"]) + .args(["-f", container, "pipe:1"]) .spawn()?; - // let mut proc = Command::new("cat") - // .stdin(Stdio::piped()) - // .stdout(Stdio::piped()) - // .spawn()?; let stdin = proc.stdin.take().unwrap(); let mut stdout = proc.stdout.take().unwrap(); -- cgit v1.2.3-70-g09d2 From 3b147cb1dfcbd5c7218e0accd5784d992d5ae21c Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 14 Apr 2025 18:42:16 +0200 Subject: things --- common/src/config.rs | 17 ++++++++-------- common/src/stream.rs | 2 +- server/src/routes/stream.rs | 4 ++-- stream/src/fragment.rs | 1 + stream/src/hls.rs | 26 ++++++++++++++++++++++++- stream/src/lib.rs | 38 +++++++----------------------------- stream/src/stream_info.rs | 6 +++--- transcoder/src/fragment.rs | 47 +++++++++++++++++++++++++++++++++++++-------- 8 files changed, 87 insertions(+), 54 deletions(-) diff --git a/common/src/config.rs b/common/src/config.rs index 3a48fea..df16ef0 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -29,19 +29,20 @@ pub struct GlobalConfig { #[serde(default)] pub default_permission_set: PermissionSet, #[serde(default)] - pub encoders: EncoderPreferences, + pub encoders: EncoderArgs, } #[derive(Debug, Deserialize, Serialize, Default)] -pub struct EncoderPreferences { - pub avc: Option, - pub hevc: Option, - pub vp8: Option, - pub vp9: Option, - pub av1: Option, +pub struct EncoderArgs { + pub avc: Option, + pub hevc: Option, + pub vp8: Option, + pub vp9: Option, + pub av1: Option, + pub generic: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum EncoderClass { Aom, diff --git a/common/src/stream.rs b/common/src/stream.rs index 555a5d0..9fd7daf 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -76,7 +76,7 @@ pub struct StreamTrackInfo { pub formats: Vec, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Copy, Clone, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub enum TrackKind { Video, diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 8f97aec..0fbeb3a 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -42,8 +42,8 @@ pub async fn r_stream_head( #[get("/n//stream?")] pub async fn r_stream( - session: Session, - federation: &State, + _session: Session, + _federation: &State, db: &State, id: &str, range: Option, diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e0644aa..26746fc 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -53,6 +53,7 @@ pub async fn fragment_stream( } else { let location = transcode( &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source + track.kind, format, container, move |b| { diff --git a/stream/src/hls.rs b/stream/src/hls.rs index f06ac72..3dfbf01 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -15,7 +15,31 @@ use tokio::{ task::spawn_blocking, }; -pub async fn hls_master_stream( +pub async fn hls_supermultivariant_stream( + mut b: DuplexStream, + info: Arc, + container: StreamContainer, +) -> Result<()> { + let (_iinfo, info) = stream_info(info).await?; + let mut out = String::new(); + writeln!(out, "#EXTM3U")?; + writeln!(out, "#EXT-X-VERSION:4")?; + for (i, _seg) in info.segments.iter().enumerate() { + let uri = format!( + "stream{}", + StreamSpec::HlsMultiVariant { + segment: i, + container, + } + .to_query() + ); + writeln!(out, "{uri}")?; + } + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} + +pub async fn hls_multivariant_stream( mut b: DuplexStream, info: Arc, segment: SegmentNum, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 18ad2a7..4df87ae 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -10,10 +10,10 @@ pub mod hls; pub mod stream_info; pub mod webvtt; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; -use hls::{hls_master_stream, hls_variant_stream}; +use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; use jellybase::common::{ stream::{StreamContainer, StreamSpec}, Node, @@ -24,7 +24,6 @@ use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; -use tokio_util::io::SyncIoBridge; #[derive(Debug)] pub struct SMediaInfo { @@ -71,13 +70,12 @@ pub async fn stream( let (a, b) = duplex(4096); match spec { - StreamSpec::Whep { track, seek } => todo!(), - StreamSpec::WhepControl { token } => todo!(), - StreamSpec::Remux { tracks, container } => todo!(), StreamSpec::Original { track } => original_stream(info, track, range, b).await?, - StreamSpec::HlsSuperMultiVariant { container } => todo!(), + StreamSpec::HlsSuperMultiVariant { container } => { + hls_supermultivariant_stream(b, info, container).await?; + } StreamSpec::HlsMultiVariant { segment, container } => { - hls_master_stream(b, info, segment, container).await? + hls_multivariant_stream(b, info, segment, container).await? } StreamSpec::HlsVariant { segment, @@ -96,34 +94,12 @@ pub async fn stream( container, format, } => fragment_stream(b, info, track, segment, index, format, container).await?, + _ => bail!("todo"), } Ok(a) } -async fn remux_stream( - node: Arc, - spec: StreamSpec, - range: Range, - b: DuplexStream, -) -> Result<()> { - let b = SyncIoBridge::new(b); - - // tokio::task::spawn_blocking(move || { - // jellyremuxer::remux_stream_into( - // b, - // range, - // CONF.media_path.to_owned(), - // &node, - // local_tracks, - // spec.track, - // spec.webm.unwrap_or(false), - // ) - // }); - - Ok(()) -} - async fn original_stream( info: Arc, track: usize, diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index 9d3d741..a8b6989 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -23,7 +23,7 @@ async fn async_matroska_metadata(path: PathBuf) -> Result> pub(crate) struct InternalStreamInfo { pub paths: Vec, - pub metadata: Vec>, + pub _metadata: Vec>, pub track_to_file: Vec<(usize, u64)>, } @@ -67,7 +67,7 @@ pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStream }; Ok(( InternalStreamInfo { - metadata, + _metadata: metadata, paths, track_to_file, }, @@ -83,7 +83,7 @@ fn stream_formats(t: &TrackEntry) -> Vec { formats.push(StreamFormatInfo { codec: t.codec_id.to_string(), remux: true, - bitrate: 2_000_000., // TODO + bitrate: 10_000_000., // TODO containers: containers_by_codec(&t.codec_id), bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 3cb4c40..1d06e9a 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -7,7 +7,8 @@ use crate::LOCAL_VIDEO_TRANSCODING_TASKS; use jellybase::{ cache::{async_cache_file, CachePath}, - common::stream::{StreamContainer, StreamFormatInfo}, + common::stream::{StreamContainer, StreamFormatInfo, TrackKind}, + CONF, }; use log::{debug, info}; use std::process::Stdio; @@ -21,6 +22,7 @@ use tokio::{ pub async fn transcode( key: &str, + kind: TrackKind, format: &StreamFormatInfo, container: StreamContainer, input: impl FnOnce(ChildStdin), @@ -31,15 +33,44 @@ pub async fn transcode( let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; debug!("transcoding fragment with {format:?}"); - let mut args = Vec::::new(); - - match format.codec.as_str() { - "V_AVC" => {} + let template = match format.codec.as_str() { + "V_AVC" => CONF.encoders.avc.as_ref(), + "V_HEVC" => CONF.encoders.hevc.as_ref(), + "V_VP8" => CONF.encoders.vp8.as_ref(), + "V_VP9" => CONF.encoders.vp9.as_ref(), + "V_AV1" => CONF.encoders.av1.as_ref(), + _ => None, + } + .or(CONF.encoders.generic.as_ref()) + .cloned() + .unwrap_or("ffmpeg %i %f %e %o".to_owned()); + let filter = match kind { + TrackKind::Video => format!("-vf scale={}:-1", format.width.unwrap()), + TrackKind::Audio => format!(""), + TrackKind::Subtitle => String::new(), + }; + let typechar = match kind { + TrackKind::Video => "v", + TrackKind::Audio => "a", + TrackKind::Subtitle => "s", + }; + let fallback_encoder = match format.codec.as_str() { + "A_OPUS" => "libopus", _ => unreachable!(), - } + }; + + let args = template + .replace("%i", "-f matroska -i pipe:0") + .replace("%o", "-f %C pipe:1") + .replace("%f", &filter) + .replace("%e", "-c:%t %c -b:%t %r") + .replace("%t", typechar) + .replace("%c", fallback_encoder) + .replace("%r", &(format.bitrate as i64).to_string()) + .replace("%C", &container.to_string()); - info!("encoding with {:?}", args.join(" ")); + info!("encoding with {:?}", args); let container = match container { StreamContainer::WebM => "webm", @@ -53,7 +84,7 @@ pub async fn transcode( .stdin(Stdio::piped()) .stdout(Stdio::piped()) .args(["-f", "matroska", "-i", "pipe:0"]) - .args(args) + .args(args.split(" ")) .args(["-f", container, "pipe:1"]) .spawn()?; -- cgit v1.2.3-70-g09d2 From c59abb792391e2f7540a80bb8d989021fe0a5b80 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Tue, 15 Apr 2025 13:54:52 +0200 Subject: refactor jsp, part 1 --- common/src/stream.rs | 5 ++ web/script/player/jhls.d.ts | 103 -------------------------------------- web/script/player/mediacaps.ts | 96 +++++++++++------------------------ web/script/player/mod.ts | 88 +++++++++++++++++++------------- web/script/player/player.ts | 62 ++++++++--------------- web/script/player/profiles.ts | 83 ------------------------------ web/script/player/profiles.ts_ | 80 +++++++++++++++++++++++++++++ web/script/player/track/create.ts | 10 ++-- web/script/player/track/mod.ts | 2 +- web/script/player/track/mse.ts | 63 +++++++++++------------ web/script/player/track/vtt.ts | 6 +-- web/script/player/types_node.ts | 76 ++++++++++++++++++++++++++++ web/script/player/types_stream.ts | 39 +++++++++++++++ 13 files changed, 340 insertions(+), 373 deletions(-) delete mode 100644 web/script/player/jhls.d.ts delete mode 100644 web/script/player/profiles.ts create mode 100644 web/script/player/profiles.ts_ create mode 100644 web/script/player/types_node.ts create mode 100644 web/script/player/types_stream.ts diff --git a/common/src/stream.rs b/common/src/stream.rs index 9fd7daf..ba91ff5 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -191,6 +191,11 @@ impl StreamSpec { index: get_num("index")? as IndexNum, container: get_container()?, }) + } else if query.contains_key("fragmentindex") { + Ok(Self::FragmentIndex { + segment: get_num("segment")? as SegmentNum, + track: get_num("track")? as TrackNum, + }) } else { Err("invalid stream spec") } diff --git a/web/script/player/jhls.d.ts b/web/script/player/jhls.d.ts deleted file mode 100644 index c7325e4..0000000 --- a/web/script/player/jhls.d.ts +++ /dev/null @@ -1,103 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ - -export interface JhlsTrackIndex { - fragments: TimeRange[], - extra_profiles: EncodingProfile[], -} - -export interface TimeRange { start: number, end: number } - -export interface NodePublic { - kind: NodeKind, - title?: string, - tagline?: string, - description?: string, - id?: string, - path: string[], - children: string[], - release_date?: string, - index?: number, - media?: MediaInfo, - ratings: { [key in Rating]: number }, - // might be incomplete -} - -export type NodeKind = "movie" - | "video" - | "collection" - | "channel" - | "show" - | "series" - | "season" - | "episode" - -export type Rating = "imdb" - | "tmdb" - | "rotten_tomatoes" - | "metacritic" - | "youtube_views" - | "youtube_likes" - | "youtube_followers" - -export interface MediaInfo { - duration: number, - tracks: SourceTrack[], - chapters: Chapter[], -} - -export interface Chapter { - time_start?: number, - time_end?: number, - labels: { [key: string]: string } -} - -export interface SourceTrack { - kind: SourceTrackKind, - name: string, - codec: string, - language: string, -} -export type SourceTrackKind = { - video: { - width: number, - height: number, - fps: number, - } -} - | { - audio: { - channels: number, - sample_rate: number, - bit_depth: number, - } - } | "subtitles"; - -export interface EncodingProfile { - video?: { - codec: string, - preset: number, - bitrate: number, - width: number, - }, - audio?: { - codec: string, - bitrate: number, - sample_rate?: number, - }, - subtitles?: { - codec: string, - }, -} - -export interface NodeUserData { - watched: WatchedState -} -export type WatchedState = "none" | "watched" | "pending" | { progress: number } - -export interface JvttCue extends TimeRange { - content: string -} \ No newline at end of file diff --git a/web/script/player/mediacaps.ts b/web/script/player/mediacaps.ts index e44b92b..037a84b 100644 --- a/web/script/player/mediacaps.ts +++ b/web/script/player/mediacaps.ts @@ -4,85 +4,58 @@ Copyright (C) 2025 metamuffin */ /// -import { EncodingProfile, SourceTrack, SourceTrackKind } from "./jhls.d.ts"; + +import { FormatInfo, StreamContainer } from "./types_stream.ts"; const cache = new Map() // TODO this testing method makes the assumption, that if the codec is supported on its own, it can be // TODO arbitrarly combined with others that are supported. in reality this is true but the spec does not gurantee it. -export async function test_media_capability(track: SourceTrack): Promise { - const cache_key = `${get_track_kind(track.kind)};${track.codec}` +export async function test_media_capability(format: FormatInfo, container: StreamContainer): Promise { + const cache_key = JSON.stringify(format) + container const cached = cache.get(cache_key); if (cached !== undefined) return cached - const r = await test_media_capability_inner(track) - console.log(`${r ? "positive" : "negative"} media capability test finished for codec=${track.codec}`); + const r = await test_media_capability_inner(format, container) + console.log(`${r ? "positive" : "negative"} media capability test finished for codec=${format.codec}`); cache.set(cache_key, r) return r } -async function test_media_capability_inner(track: SourceTrack) { - if (track.kind == "subtitles") { +async function test_media_capability_inner(format: FormatInfo, container: StreamContainer) { + if (format.codec.startsWith("S_") || format.codec.startsWith("V_") || format.codec.startsWith("D_")) { // TODO do we need to check this? - return track.codec == "V_TEXT/WEBVTT" || track.codec == "D_WEBVTT/SUBTITLES" + return format.codec == "V_TEXT/WEBVTT" || format.codec == "D_WEBVTT/SUBTITLES" } let res; - const codec = MASTROSKA_CODEC_MAP[track.codec] - if (!codec) return console.warn(`unknown codec: ${track.codec}`), false - if ("audio" in track.kind) { + if (format.codec.startsWith("A_")) { res = await navigator.mediaCapabilities.decodingInfo({ type: "media-source", audio: { - contentType: `audio/webm; codecs=${codec}`, - samplerate: track.kind.audio.sample_rate, - channels: "" + track.kind.audio.channels, - bitrate: 128 * 1000, + contentType: track_to_content_type(format, container), + samplerate: format.samplerate, + channels: "" + format.channels, + bitrate: format.bitrate, } }) } - if ("video" in track.kind) { + if (format.codec.startsWith("V_")) { res = await navigator.mediaCapabilities.decodingInfo({ type: "media-source", video: { - contentType: `video/webm; codecs=${codec}`, - framerate: track.kind.video.fps || 30, - width: track.kind.video.width, - height: track.kind.video.height, - bitrate: 5 * 1000 * 1000 // TODO we dont know this but we should in the future + contentType: track_to_content_type(format, container), + framerate: 30, // TODO get average framerate from server + width: format.width ?? 1920, + height: format.height ?? 1080, + bitrate: format.bitrate } }) } + console.log(format, res); return res?.supported ?? false } -export function track_to_content_type(track: SourceTrack): string | undefined { - if (track.kind == "subtitles") return "video/webm" - const codec = MASTROSKA_CODEC_MAP[track.codec] - if (!codec) return - return `${get_track_kind(track.kind)}/webm; codecs="${codec}"` -} -export function profile_to_partial_track(profile: EncodingProfile): SourceTrack { - if (profile.audio) { - return { - codec: FFMPEG_ENCODER_CODEC_MAP[profile.audio.codec], - kind: { audio: { bit_depth: 16, channels: 2, sample_rate: 48000 } }, - name: "test audio", - language: "en" - } - } else if (profile.video) { - return { - codec: FFMPEG_ENCODER_CODEC_MAP[profile.video.codec], - kind: { video: { fps: 30, height: 1080, width: 1090 } }, - language: "en", - name: "test video" - } - } else if (profile.subtitles) { - return { - codec: FFMPEG_ENCODER_CODEC_MAP[profile.subtitles.codec], - kind: "subtitles", - language: "en", - name: "test subtitle" - } - } else throw new Error("unreachable"); +export function track_to_content_type(format: FormatInfo, container: StreamContainer): string { + return `${CONTAINER_TO_MIME_TYPE[container]}; codecs="${MASTROSKA_CODEC_MAP[format.codec]}"` } const MASTROSKA_CODEC_MAP: { [key: string]: string } = { @@ -96,21 +69,10 @@ const MASTROSKA_CODEC_MAP: { [key: string]: string } = { "S_TEXT/WEBVTT": "webvtt", "D_WEBVTT/SUBTITLES": "webvtt", } - -const FFMPEG_ENCODER_CODEC_MAP: { [key: string]: string } = { - "libsvtav1": "V_AV1", - "libvpx": "V_VP8", - "libvpx-vp9": "V_VP9", - "opus": "A_OPUS", - "libopus": "A_OPUS", -} - -export type TrackKind = "audio" | "video" | "subtitles" -export function get_track_kind(track: SourceTrackKind): TrackKind { - // TODO why different encodings for "subtitles"? - if (track == "subtitles") return "subtitles" - if ("subtitles" in track) return "subtitles" - if ("audio" in track) return "audio" - if ("video" in track) return "video" - throw new Error("invalid track"); +const CONTAINER_TO_MIME_TYPE: { [key in StreamContainer]: string } = { + webvtt: "text/webvtt", + webm: "video/webm", + matroska: "video/x-matroska", + mpeg4: "video/mp4", + jvtt: "application/jellything-vtt+json" } diff --git a/web/script/player/mod.ts b/web/script/player/mod.ts index 15c37da..82ee287 100644 --- a/web/script/player/mod.ts +++ b/web/script/player/mod.ts @@ -7,11 +7,11 @@ import { OVar, show } from "../jshelper/mod.ts"; import { e } from "../jshelper/mod.ts"; import { Logger } from "../jshelper/src/log.ts"; -import { EncodingProfile } from "./jhls.d.ts"; -import { TrackKind, get_track_kind } from "./mediacaps.ts"; import { Player } from "./player.ts"; import { Popup } from "./popup.ts"; import { Playersync, playersync_controls } from "./sync.ts" +import { WatchedState } from "./types_node.ts"; +import { FormatInfo, TrackKind } from "./types_stream.ts"; globalThis.addEventListener("DOMContentLoaded", () => { if (document.body.classList.contains("player")) { @@ -36,12 +36,26 @@ function toggle_fullscreen() { else document.documentElement.requestFullscreen() } +function get_continue_time(w: WatchedState): number { + if (typeof w == "string") return 0 + else return w.progress +} + +function get_query_start_time() { + const u = new URL(globalThis.location.href) + const p = u.searchParams.get("t") + if (!p) return + const x = parseFloat(p) + if (Number.isNaN(x)) return + return x +} function initialize_player(el: HTMLElement, node_id: string) { el.innerHTML = "" // clear the body const logger = new Logger(s => e("p", s)) - const player = new Player(node_id, logger) + const start_time = get_query_start_time() ?? 0 // TODO get_continue_time(ndata.userdata.watched); + const player = new Player(`/n/${encodeURIComponent(node_id)}/stream`, `/n/${encodeURIComponent(node_id)}/poster`, start_time, logger) const show_stats = new OVar(false); const idle_inhibit = new OVar(false) const sync_state = new OVar(undefined) @@ -72,13 +86,13 @@ function initialize_player(el: HTMLElement, node_id: string) { const step_track_kind = (kind: TrackKind) => { // TODO cycle through all of them const active = player.active_tracks.value.filter( - ts => get_track_kind(player.tracks![ts.track_index].kind) == kind) + ts => player.tracks![ts.track_index].kind == kind) if (active.length > 0) { for (const t of active) player.set_track_enabled(t.track_index, false) } else { const all_kind = (player.tracks ?? []) .map((track, index) => ({ index, track })) - .filter(({ track }) => get_track_kind(track.kind) == kind) + .filter(({ track }) => track.kind == kind) if (all_kind.length < 1) return logger.log(`No ${kind} tracks available`) player.set_track_enabled(all_kind[0].index, true) } @@ -92,7 +106,7 @@ function initialize_player(el: HTMLElement, node_id: string) { const track_select = (kind: TrackKind) => { const button = e("div", player.active_tracks.map(_ => { const active = player.active_tracks.value.filter( - ts => get_track_kind(player.tracks![ts.track_index].kind) == kind) + ts => player.tracks![ts.track_index].kind == kind) const enabled = active.length > 0 return e("button", MEDIA_KIND_ICONS[kind][+enabled], { class: "icon", @@ -105,7 +119,7 @@ function initialize_player(el: HTMLElement, node_id: string) { } else { const all_kind = (player.tracks ?? []) .map((track, index) => ({ index, track })) - .filter(({ track }) => get_track_kind(track.kind) == kind) + .filter(({ track }) => track.kind == kind) if (all_kind.length < 1) return player.set_track_enabled(all_kind[0].index, true) } @@ -138,7 +152,7 @@ function initialize_player(el: HTMLElement, node_id: string) { player.active_tracks.map(_ => { const tracks_avail = (player.tracks ?? []) .map((track, index) => ({ index, track })) - .filter(({ track }) => get_track_kind(track.kind) == kind); + .filter(({ track }) => track.kind == kind); if (!tracks_avail.length) return e("p", `No ${kind} tracks available.`) as HTMLElement; return e("ul", { class: "jsp-track-list" }, ...tracks_avail .map(({ track, index }): HTMLElement => { @@ -184,15 +198,16 @@ function initialize_player(el: HTMLElement, node_id: string) { ), pri = e("div", { class: "jsp-pri" }, pri_current = e("div", { class: "jsp-pri-current" }), - player.chapters.map( - chapters => e("div", ...chapters.map(chap => e("div", { - class: "jsp-chapter", - style: { - left: pri_map(chap.time_start ?? 0), - width: pri_map((chap.time_end ?? player.duration.value) - (chap.time_start ?? 0)) - } - }, e("p", chap.labels[0][1])))) - ), + // TODO + // player.chapters.map( + // chapters => e("div", ...chapters.map(chap => e("div", { + // class: "jsp-chapter", + // style: { + // left: pri_map(chap.time_start ?? 0), + // width: pri_map((chap.time_end ?? player.duration.value) - (chap.time_start ?? 0)) + // } + // }, e("p", chap.labels[0][1])))) + // ), player.active_tracks.map( tracks => e("div", ...tracks.map((t, i) => t.buffered.map( ranges => e("div", ...ranges.map( @@ -275,8 +290,8 @@ function initialize_player(el: HTMLElement, node_id: string) { else if (k.code == "ArrowRight") player.seek(player.position.value + 5) else if (k.code == "ArrowUp") player.seek(player.position.value - 60) else if (k.code == "ArrowDown") player.seek(player.position.value + 60) - else if (k.code == "PageUp") player.seek(find_closest_chaps(player).prev?.time_start ?? 0) - else if (k.code == "PageDown") player.seek(find_closest_chaps(player).next?.time_start ?? player.duration.value) + // else if (k.code == "PageUp") player.seek(find_closest_chaps(player).prev?.time_start ?? 0) + // else if (k.code == "PageDown") player.seek(find_closest_chaps(player).next?.time_start ?? player.duration.value) else return; k.preventDefault() }) @@ -339,25 +354,26 @@ function mouse_idle(e: HTMLElement, timeout: number): OVar { return idle } -export function show_profile(profile: EncodingProfile): string { - if (profile.audio) return `codec=${profile.audio.codec} br=${show.metric(profile.audio.bitrate, "b/s")}${profile.audio.sample_rate ? ` sr=${show.metric(profile.audio.sample_rate, "Hz")}` : ""}` - if (profile.video) return `codec=${profile.video.codec} br=${show.metric(profile.video.bitrate, "b/s")} w=${profile.video.width} preset=${profile.video.preset}` - if (profile.subtitles) return `codec=${profile.subtitles.codec}` - return `???` +export function show_format(format: FormatInfo): string { + // if (format.audio) return `codec=${format.audio.codec} br=${show.metric(format.audio.bitrate, "b/s")}${format.audio.sample_rate ? ` sr=${show.metric(format.audio.sample_rate, "Hz")}` : ""}` + // if (format.video) return `codec=${format.video.codec} br=${show.metric(format.video.bitrate, "b/s")} w=${format.video.width} preset=${format.video.preset}` + // if (format.subtitles) return `codec=${format.subtitles.codec}` + return `TODO` } export function show_volume(v: number): string { return `${v == 0 ? "-∞" : (Math.log10(v) * 10).toFixed(2)}dB | ${(v * 100).toFixed(2)}%` } -function find_closest_chaps(player: Player) { - const now = player.position.value - const chaps = player.chapters.value - let prev, next; - for (const c of chaps) { - const t_start = (c.time_start ?? 0) - next = c; - if (t_start > now) break - prev = c; - } - return { next, prev } -} +// TODO +// function find_closest_chaps(player: Player) { +// const now = player.position.value +// const chaps = player.chapters.value +// let prev, next; +// for (const c of chaps) { +// const t_start = (c.time_start ?? 0) +// next = c; +// if (t_start > now) break +// prev = c; +// } +// return { next, prev } +// } diff --git a/web/script/player/player.ts b/web/script/player/player.ts index e0a6ddf..f44c14f 100644 --- a/web/script/player/player.ts +++ b/web/script/player/player.ts @@ -5,20 +5,18 @@ */ /// import { OVar, e } from "../jshelper/mod.ts"; -import { NodePublic, NodeUserData, SourceTrack, TimeRange } from "./jhls.d.ts"; import { SegmentDownloader } from "./download.ts"; import { PlayerTrack } from "./track/mod.ts"; import { Logger } from "../jshelper/src/log.ts"; -import { WatchedState, Chapter } from "./jhls.d.ts"; -import { get_track_kind } from "./mediacaps.ts"; import { create_track } from "./track/create.ts"; +import { StreamInfo, TimeRange, TrackInfo } from "./types_stream.ts"; export interface BufferRange extends TimeRange { status: "buffered" | "loading" | "queued" } export class Player { public video = e("video") public media_source = new MediaSource(); - public tracks?: SourceTrack[]; - public chapters = new OVar([]); + public streaminfo?: StreamInfo; + public tracks?: TrackInfo[]; public active_tracks = new OVar([]); public downloader: SegmentDownloader = new SegmentDownloader(); @@ -35,8 +33,8 @@ export class Player { if (s) this.cancel_buffering_pers = this.logger?.log_persistent(s) } - constructor(public node_id: string, public logger?: Logger) { - this.video.poster = `/n/${encodeURIComponent(node_id)}/poster` + constructor(public base_url: string, poster: string, private start_time: number, public logger?: Logger) { + this.video.poster = poster this.volume.value = this.video.volume let skip_change = false; this.volume.onchange(v => { @@ -100,40 +98,38 @@ export class Player { } async fetch_meta() { - this.set_pers("Loading metadata...") - const res = await fetch(`/n/${encodeURIComponent(this.node_id)}`, { headers: { "Accept": "application/json" } }) - if (!res.ok) return this.error.value = "Cannot download node." + this.set_pers("Loading stream metadata...") + const res = await fetch(`${this.base_url}?info`, { headers: { "Accept": "application/json" } }) + if (!res.ok) return this.error.value = "Cannot download stream info." - let ndata!: { node: NodePublic, userdata: NodeUserData } & { error: string } - try { ndata = await res.json() } + let streaminfo!: StreamInfo & { error: string } + try { streaminfo = await res.json() } catch (_) { this.set_pers("Error: Node data invalid") } - if (ndata.error) return this.set_pers("server error: " + ndata.error) + if (streaminfo.error) return this.set_pers("server error: " + streaminfo.error) this.set_pers() //! bad code: assignment order is important because chapter callbacks use duration - this.duration.value = ndata.node.media!.duration - this.chapters.value = ndata.node.media!.chapters - this.tracks = ndata.node.media!.tracks + this.duration.value = streaminfo.segments[0].duration + this.streaminfo = streaminfo + this.tracks = streaminfo!.segments[0].tracks; this.video.src = URL.createObjectURL(this.media_source) this.media_source.addEventListener("sourceopen", async () => { let video = false, audio = false, subtitles = false; for (let i = 0; i < this.tracks!.length; i++) { const t = this.tracks![i]; - const kind = get_track_kind(t.kind) - if (kind == "video" && !video) + if (t.kind == "video" && !video) video = true, await this.set_track_enabled(i, true, false) - if (kind == "audio" && !audio) + if (t.kind == "audio" && !audio) audio = true, await this.set_track_enabled(i, true, false) - if (kind == "subtitles" && !subtitles) + if (t.kind == "subtitles" && !subtitles) subtitles = true, await this.set_track_enabled(i, true, false) } this.set_pers("Buffering initial stream fragments...") - const start_time = get_query_start_time() ?? get_continue_time(ndata.userdata.watched); - this.update(start_time) - this.video.currentTime = start_time + this.update(this.start_time) + this.video.currentTime = this.start_time await this.canplay.wait_for(true) this.set_pers() @@ -153,7 +149,7 @@ export class Player { track.abort.abort() } else if (state && active_index == -1) { this.logger?.log(`Enabled track ${index}: ${display_track(this.tracks![index])}`) - this.active_tracks.value.push(create_track(this, this.node_id, index, this.tracks![index])!) + this.active_tracks.value.push(create_track(this, this.base_url, 0, index, this.tracks![index])!) if (update) await this.update() } this.active_tracks.change() @@ -172,20 +168,6 @@ export class Player { } } -function get_continue_time(w: WatchedState): number { - if (typeof w == "string") return 0 - else return w.progress -} - -function get_query_start_time() { - const u = new URL(globalThis.location.href) - const p = u.searchParams.get("t") - if (!p) return - const x = parseFloat(p) - if (Number.isNaN(x)) return - return x -} - -function display_track(t: SourceTrack): string { - return `"${t.name}" (${t.language})` +function display_track(t: TrackInfo): string { + return `${t.name}` } diff --git a/web/script/player/profiles.ts b/web/script/player/profiles.ts deleted file mode 100644 index 5ebdeb4..0000000 --- a/web/script/player/profiles.ts +++ /dev/null @@ -1,83 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ -/// -import { OVar } from "../jshelper/mod.ts"; -import { EncodingProfile, SourceTrackKind } from "./jhls.d.ts"; -import { get_track_kind } from "./mediacaps.ts"; -import { profile_to_partial_track, test_media_capability } from "./mediacaps.ts"; -import { Player } from "./player.ts"; -import { MSEPlayerTrack } from "./track/mse.ts"; - -const PROFILE_UP_FAC = 0.6 -const PROFILE_DOWN_FAC = 0.8 - -export interface EncodingProfileExt extends EncodingProfile { id: number, order: number } -export class ProfileSelector { - profiles: EncodingProfileExt[] = [] - is_init = false - - constructor( - private player: Player, - private track: MSEPlayerTrack, - private bandwidth: OVar - ) { - } - async init() { - for (let id = 0; id < this.track.index!.extra_profiles.length; id++) { - const p = this.track.index!.extra_profiles[id]; - // TODO hacky type casting solution - if (get_track_kind(this.track.metadata.kind) != get_track_kind(p as unknown as SourceTrackKind)) continue - if (!await test_media_capability(profile_to_partial_track(p))) continue - this.profiles.push({ id, order: 0, ...p }) - } - this.profiles.sort((a, b) => profile_byterate(b) - profile_byterate(a)) - for (let i = 0; i < this.profiles.length; i++) this.profiles[i].order = i - } - async remux_supported(track: number): Promise { - return await test_media_capability(this.player.tracks![track]) - } - async select_optimal_profile(track: number, profile: OVar) { - if (!this.is_init) await this.init(), this.is_init = true; - - const sup_remux = await this.remux_supported(track); - if (!sup_remux && !this.profiles.length) { - this.player.logger?.log("None of the available codecs are supported. This track can't be played back.") - return false - } - const min_prof = sup_remux ? -1 : 0 - const co = profile.value?.order ?? min_prof - // TODO use actual bitrate as a fallback. the server should supply it. - const current_bitrate = profile_byterate(this.profiles[co], 500 * 1000) - const next_bitrate = profile_byterate(this.profiles[co - 1], 500 * 1000) - // console.log({ current_bitrate, next_bitrate, co, bandwidth: this.bandwidth.value * 8 }); - if (!sup_remux && !profile.value) profile.value = this.profiles[co]; - if (current_bitrate > this.bandwidth.value * PROFILE_DOWN_FAC && co + 1 < this.profiles.length) { - console.log("profile up"); - profile.value = this.profiles[co + 1] - this.log_change(track, profile.value) - } - if (next_bitrate < this.bandwidth.value * PROFILE_UP_FAC && co > min_prof) { - console.log("profile down"); - profile.value = this.profiles[co - 1] - this.log_change(track, profile.value) - } - - // profile.value = profs[0] - return true - } - - log_change(track: number, p: EncodingProfileExt | undefined) { - const ps = p ? `transcoding profile ${p.id}` : `remuxed original` - this.player.logger?.log(`Track #${track} switched to ${ps}`) - } -} - -function profile_byterate(p?: EncodingProfile, fallback = 0): number { - if (p?.audio) return p.audio.bitrate / 8 - if (p?.video) return p.video.bitrate / 8 - if (p?.subtitles) return 100 - return fallback -} diff --git a/web/script/player/profiles.ts_ b/web/script/player/profiles.ts_ new file mode 100644 index 0000000..943639c --- /dev/null +++ b/web/script/player/profiles.ts_ @@ -0,0 +1,80 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +/// +import { OVar } from "../jshelper/mod.ts"; +import { Player } from "./player.ts"; +import { MSEPlayerTrack } from "./track/mse.ts"; + +const PROFILE_UP_FAC = 0.6 +const PROFILE_DOWN_FAC = 0.8 + +export interface EncodingProfileExt extends EncodingProfile { id: number, order: number } +export class ProfileSelector { + profiles: EncodingProfileExt[] = [] + is_init = false + + constructor( + private player: Player, + private track: MSEPlayerTrack, + private bandwidth: OVar + ) { + } + async init() { + for (let id = 0; id < this.track.index!.extra_profiles.length; id++) { + const p = this.track.index!.extra_profiles[id]; + // TODO hacky type casting solution + if (get_track_kind(this.track.trackinfo.kind) != get_track_kind(p as unknown as SourceTrackKind)) continue + if (!await test_media_capability(profile_to_partial_track(p))) continue + this.profiles.push({ id, order: 0, ...p }) + } + this.profiles.sort((a, b) => profile_byterate(b) - profile_byterate(a)) + for (let i = 0; i < this.profiles.length; i++) this.profiles[i].order = i + } + async remux_supported(track: number): Promise { + return await test_media_capability(this.player.tracks![track]) + } + async select_optimal_profile(track: number, profile: OVar) { + if (!this.is_init) await this.init(), this.is_init = true; + + const sup_remux = await this.remux_supported(track); + if (!sup_remux && !this.profiles.length) { + this.player.logger?.log("None of the available codecs are supported. This track can't be played back.") + return false + } + const min_prof = sup_remux ? -1 : 0 + const co = profile.value?.order ?? min_prof + // TODO use actual bitrate as a fallback. the server should supply it. + const current_bitrate = profile_byterate(this.profiles[co], 500 * 1000) + const next_bitrate = profile_byterate(this.profiles[co - 1], 500 * 1000) + // console.log({ current_bitrate, next_bitrate, co, bandwidth: this.bandwidth.value * 8 }); + if (!sup_remux && !profile.value) profile.value = this.profiles[co]; + if (current_bitrate > this.bandwidth.value * PROFILE_DOWN_FAC && co + 1 < this.profiles.length) { + console.log("profile up"); + profile.value = this.profiles[co + 1] + this.log_change(track, profile.value) + } + if (next_bitrate < this.bandwidth.value * PROFILE_UP_FAC && co > min_prof) { + console.log("profile down"); + profile.value = this.profiles[co - 1] + this.log_change(track, profile.value) + } + + // profile.value = profs[0] + return true + } + + log_change(track: number, p: EncodingProfileExt | undefined) { + const ps = p ? `transcoding profile ${p.id}` : `remuxed original` + this.player.logger?.log(`Track #${track} switched to ${ps}`) + } +} + +function profile_byterate(p?: EncodingProfile, fallback = 0): number { + if (p?.audio) return p.audio.bitrate / 8 + if (p?.video) return p.video.bitrate / 8 + if (p?.subtitles) return 100 + return fallback +} diff --git a/web/script/player/track/create.ts b/web/script/player/track/create.ts index 1aaf12c..95bccca 100644 --- a/web/script/player/track/create.ts +++ b/web/script/player/track/create.ts @@ -3,15 +3,13 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -import { get_track_kind } from "../mediacaps.ts"; import { VttPlayerTrack } from "./vtt.ts"; import { MSEPlayerTrack } from "./mse.ts"; import { Player } from "../player.ts"; -import { SourceTrack } from "../jhls.d.ts"; import { PlayerTrack } from "./mod.ts"; +import { TrackInfo } from "../types_stream.ts"; -export function create_track(player: Player, node_id: string, track_index: number, metadata: SourceTrack): PlayerTrack | undefined { - const kind = get_track_kind(metadata.kind) - if (kind == "subtitles") return new VttPlayerTrack(player, node_id, track_index, metadata) - else return new MSEPlayerTrack(player, node_id, track_index, metadata) +export function create_track(player: Player, base_url: string, segment_index: number, track_index: number, track_info: TrackInfo): PlayerTrack | undefined { + if (track_info.kind == "subtitles") return new VttPlayerTrack(player, base_url, track_index, track_info) + else return new MSEPlayerTrack(player, base_url, segment_index, track_index, track_info) } diff --git a/web/script/player/track/mod.ts b/web/script/player/track/mod.ts index 0c7c1c0..5a91209 100644 --- a/web/script/player/track/mod.ts +++ b/web/script/player/track/mod.ts @@ -4,7 +4,7 @@ Copyright (C) 2025 metamuffin */ /// -import { TimeRange } from "../jhls.d.ts"; +import { TimeRange } from "../types_stream.ts"; import { OVar } from "../../jshelper/mod.ts"; import { BufferRange } from "../player.ts"; diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index d1a8c12..066bbfd 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -3,41 +3,41 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -import { JhlsTrackIndex, SourceTrack } from "../jhls.d.ts"; import { OVar } from "../../jshelper/mod.ts"; -import { profile_to_partial_track, track_to_content_type } from "../mediacaps.ts"; +import { track_to_content_type } from "../mediacaps.ts"; import { BufferRange, Player } from "../player.ts"; -import { EncodingProfileExt, ProfileSelector } from "../profiles.ts"; import { PlayerTrack, AppendRange, TARGET_BUFFER_DURATION, MIN_BUFFER_DURATION } from "./mod.ts"; -import { show_profile } from "../mod.ts"; import { e } from "../../jshelper/src/element.ts"; +import { FormatInfo, FragmentIndex, StreamContainer, TrackInfo } from "../types_stream.ts"; + +interface UsableFormat { format_index: number, usable_index: number, format: FormatInfo, container: StreamContainer } export class MSEPlayerTrack extends PlayerTrack { public source_buffer!: SourceBuffer; private current_load?: AppendRange; private loading = new Set(); private append_queue: AppendRange[] = []; - public profile_selector: ProfileSelector; - public profile = new OVar(undefined); - public index?: JhlsTrackIndex + public index?: FragmentIndex + public active_format = new OVar(undefined); + public usable_formats: UsableFormat[] = [] constructor( private player: Player, - private node_id: string, + private base_url: string, + private segment_index: number, track_index: number, - public metadata: SourceTrack, + public trackinfo: TrackInfo, ) { super(track_index); - this.profile_selector = new ProfileSelector(player, this, player.downloader.bandwidth_avail); this.init() } async init() { this.buffered.value = [{ start: 0, end: this.player.duration.value, status: "loading" }] try { - const res = await fetch(`/n/${encodeURIComponent(this.node_id)}/stream?format=jhlsi&track=${this.track_index}`, { headers: { "Accept": "application/json" } }); + const res = await fetch(`${this.base_url}?fragmentindex&segment=${this.segment_index}&track=${this.track_index}`, { headers: { "Accept": "application/json" } }); if (!res.ok) return this.player.error.value = "Cannot download index.", undefined; - let index!: JhlsTrackIndex & { error: string; }; + let index!: FragmentIndex & { error: string; }; try { index = await res.json(); } catch (_) { this.player.set_pers("Error: Failed to fetch node"); } if (index.error) return this.player.set_pers("server error: " + index.error), undefined; @@ -49,10 +49,9 @@ export class MSEPlayerTrack extends PlayerTrack { } this.buffered.value = [] - const canplay = await this.profile_selector.select_optimal_profile(this.track_index, this.profile); - if (!canplay) return this.player.set_track_enabled(this.track_index, false) - const ct = track_to_content_type(this.track_from_profile())!; - console.log(`track ${this.track_index} source buffer content-type: ${ct}`); + this.active_format.value = { usable_index: 0, format_index: 0, container: "webm", format: this.trackinfo.formats[0] } + + const ct = track_to_content_type(this.active_format.value!.format, this.active_format.value!.container); this.source_buffer = this.player.media_source.addSourceBuffer(ct); this.abort.signal.addEventListener("abort", () => { console.log(`destroy source buffer for track ${this.track_index}`); @@ -81,10 +80,6 @@ export class MSEPlayerTrack extends PlayerTrack { this.update(this.player.video.currentTime) } - track_from_profile(): SourceTrack { - if (this.profile.value) return profile_to_partial_track(this.profile.value); - else return this.metadata; - } update_buf_ranges() { if (!this.index) return; @@ -97,7 +92,7 @@ export class MSEPlayerTrack extends PlayerTrack { }); } for (const r of this.loading) { - ranges.push({ ...this.index.fragments[r], status: "loading" }); + ranges.push({ ...this.index[r], status: "loading" }); } this.buffered.value = ranges; } @@ -107,8 +102,8 @@ export class MSEPlayerTrack extends PlayerTrack { this.update_buf_ranges(); // TODO required? const blocking = []; - for (let i = 0; i < this.index.fragments.length; i++) { - const frag = this.index.fragments[i]; + for (let i = 0; i < this.index.length; i++) { + const frag = this.index[i]; if (frag.end < target) continue; if (frag.start >= target + TARGET_BUFFER_DURATION) break; if (!this.check_buf_collision(frag.start, frag.end)) continue; @@ -129,13 +124,13 @@ export class MSEPlayerTrack extends PlayerTrack { async load(index: number) { this.loading.add(index); - await this.profile_selector.select_optimal_profile(this.track_index, this.profile); - const url = `/n/${encodeURIComponent(this.node_id)}/stream?format=frag&webm=true&track=${this.track_index}&index=${index}${this.profile.value ? `&profile=${this.profile.value.id}` : ""}`; + // TODO update format selection + const url = `${this.base_url}?fragment&segment=${this.segment_index}&track=${this.track_index}&format=${this.active_format.value!.format_index}&index=${index}&container=${this.active_format.value!.container}`; const buf = await this.player.downloader.download(url); await new Promise(cb => { if (!this.index) return; if (this.abort.signal.aborted) return; - this.append_queue.push({ buf, ...this.index.fragments[index], index, cb }); + this.append_queue.push({ buf, ...this.index[index], index, cb }); this.tick_append(); }); } @@ -146,8 +141,8 @@ export class MSEPlayerTrack extends PlayerTrack { this.append_queue.splice(0, 1); this.current_load = frag; // TODO why is appending so unreliable?! sometimes it does not add it - this.source_buffer.changeType(track_to_content_type(this.track_from_profile())!); - this.source_buffer.timestampOffset = this.profile.value !== undefined ? frag.start : 0 + this.source_buffer.changeType(track_to_content_type(this.active_format.value!.format, this.active_format.value!.container)); + this.source_buffer.timestampOffset = this.active_format.value !== undefined ? frag.start : 0 console.log(`append track ${this.track_index}`); this.source_buffer.appendBuffer(frag.buf); } @@ -158,13 +153,13 @@ export class MSEPlayerTrack extends PlayerTrack { const c = b.filter(r => r.status == t); return `${c.length} range${c.length != 1 ? "s" : ""}, ${c.reduce((a, v) => a + v.end - v.start, 0).toFixed(2)}s` } - return this.profile.liftA2(this.buffered, (p, b) => + return this.active_format.liftA2(this.buffered, (p, b) => e("pre", - `mse track ${this.track_index}: ${(p ? `profile ${p.id} (${show_profile(p)})` : `remux`)}` - + `\n\ttype: ${track_to_content_type(this.track_from_profile())}` - + `\n\tbuffered: ${rtype("buffered", b)}` - + `\n\tqueued: ${rtype("queued", b)}` - + `\n\tloading: ${rtype("loading", b)}` + p ? + `mse track ${this.track_index}: format ${p.format_index} (${p.format.remux ? "remux" : "transcode"})` + + `\n\ttype: ${track_to_content_type(p.format, p.container)} br=${p.format.bitrate}` + + `\n\tbuffered: ${rtype("buffered", b)} / queued: ${rtype("queued", b)} / loading: ${rtype("loading", b)}` + : "" ) as HTMLElement ) } diff --git a/web/script/player/track/vtt.ts b/web/script/player/track/vtt.ts index ea4951c..3dd7670 100644 --- a/web/script/player/track/vtt.ts +++ b/web/script/player/track/vtt.ts @@ -4,8 +4,8 @@ Copyright (C) 2025 metamuffin */ import { e } from "../../jshelper/src/element.ts"; -import { SourceTrack, JvttCue } from "../jhls.d.ts"; import { Player } from "../player.ts"; +import { JvttCue, TrackInfo } from "../types_stream.ts"; import { PlayerTrack } from "./mod.ts"; export class VttPlayerTrack extends PlayerTrack { @@ -16,10 +16,10 @@ export class VttPlayerTrack extends PlayerTrack { private player: Player, private node_id: string, track_index: number, - private metadata: SourceTrack, + private track_info: TrackInfo, ) { super(track_index); - this.track = this.player.video.addTextTrack("subtitles", this.metadata.name, this.metadata.language); + this.track = this.player.video.addTextTrack("subtitles", this.track_info.name, this.track_info.language); this.buffered.value = [{ start: 0, end: this.player.duration.value, status: "loading" }] this.init() } diff --git a/web/script/player/types_node.ts b/web/script/player/types_node.ts new file mode 100644 index 0000000..6946313 --- /dev/null +++ b/web/script/player/types_node.ts @@ -0,0 +1,76 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ + +export interface NodePublic { + kind: NodeKind, + title?: string, + tagline?: string, + description?: string, + id?: string, + path: string[], + children: string[], + release_date?: string, + index?: number, + media?: MediaInfo, + ratings: { [key in Rating]: number }, + // might be incomplete +} + +export type NodeKind = "movie" + | "video" + | "collection" + | "channel" + | "show" + | "series" + | "season" + | "episode" + +export type Rating = "imdb" + | "tmdb" + | "rotten_tomatoes" + | "metacritic" + | "youtube_views" + | "youtube_likes" + | "youtube_followers" + +export interface MediaInfo { + duration: number, + tracks: SourceTrack[], + chapters: Chapter[], +} + +export interface Chapter { + time_start?: number, + time_end?: number, + labels: { [key: string]: string } +} + +export interface SourceTrack { + kind: SourceTrackKind, + name: string, + codec: string, + language: string, +} +export type SourceTrackKind = { + video: { + width: number, + height: number, + fps: number, + } +} + | { + audio: { + channels: number, + sample_rate: number, + bit_depth: number, + } + } | "subtitles"; + +export interface NodeUserData { + watched: WatchedState +} +export type WatchedState = "none" | "watched" | "pending" | { progress: number } + diff --git a/web/script/player/types_stream.ts b/web/script/player/types_stream.ts new file mode 100644 index 0000000..290a778 --- /dev/null +++ b/web/script/player/types_stream.ts @@ -0,0 +1,39 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +export type FragmentIndex = TimeRange[] +export interface TimeRange { start: number, end: number } +export interface JvttCue extends TimeRange { + content: string +} +export interface StreamInfo { + name?: string, + segments: SegmentInfo[], +} +export interface SegmentInfo { + name?: string, + duration: number, + tracks: TrackInfo[], +} +export type TrackKind = "video" | "audio" | "subtitles" +export interface TrackInfo { + name?: string, + language?: string, + kind: TrackKind, + formats: FormatInfo[] +} +export type StreamContainer = "webm" | "matroska" | "mpeg4" | "jvtt" | "webvtt" +export interface FormatInfo { + codec: string, + bitrate: number, + remux: boolean, + containers: StreamContainer[] + + width?: number, + height?: number, + channels?: number, + samplerate?: number, + bit_depth?: number, +} -- cgit v1.2.3-70-g09d2 From 50dc0e7bea02d7fc5b38edb7f943e19bd8c0285b Mon Sep 17 00:00:00 2001 From: metamuffin Date: Tue, 15 Apr 2025 17:56:06 +0200 Subject: remux playback works --- stream/src/stream_info.rs | 12 ++++++------ web/script/player/track/mod.ts | 2 +- web/script/player/track/mse.ts | 5 +++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index a8b6989..43c536a 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -57,12 +57,7 @@ pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStream let segment = StreamSegmentInfo { name: None, - duration: metadata[0] - .info - .as_ref() - .unwrap() - .duration - .unwrap_or_default(), + duration: media_duration(&metadata[0]), tracks, }; Ok(( @@ -162,3 +157,8 @@ pub(crate) async fn write_stream_info(info: Arc, mut b: DuplexStream spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await }); Ok(()) } + +fn media_duration(m: &MatroskaMetadata) -> f64 { + let info = m.info.as_ref().unwrap(); + (info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000. +} diff --git a/web/script/player/track/mod.ts b/web/script/player/track/mod.ts index 5a91209..99b348c 100644 --- a/web/script/player/track/mod.ts +++ b/web/script/player/track/mod.ts @@ -8,7 +8,7 @@ import { TimeRange } from "../types_stream.ts"; import { OVar } from "../../jshelper/mod.ts"; import { BufferRange } from "../player.ts"; -export const TARGET_BUFFER_DURATION = 10 +export const TARGET_BUFFER_DURATION = 15 export const MIN_BUFFER_DURATION = 1 export interface AppendRange extends TimeRange { buf: ArrayBuffer, index: number, cb: () => void } diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index 066bbfd..9fa5e42 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -142,7 +142,7 @@ export class MSEPlayerTrack extends PlayerTrack { this.current_load = frag; // TODO why is appending so unreliable?! sometimes it does not add it this.source_buffer.changeType(track_to_content_type(this.active_format.value!.format, this.active_format.value!.container)); - this.source_buffer.timestampOffset = this.active_format.value !== undefined ? frag.start : 0 + this.source_buffer.timestampOffset = 0 // TODO send if relative PTS //this.active_format.value !== undefined ? frag.start : 0 console.log(`append track ${this.track_index}`); this.source_buffer.appendBuffer(frag.buf); } @@ -151,7 +151,8 @@ export class MSEPlayerTrack extends PlayerTrack { public debug(): OVar { const rtype = (t: string, b: BufferRange[]) => { const c = b.filter(r => r.status == t); - return `${c.length} range${c.length != 1 ? "s" : ""}, ${c.reduce((a, v) => a + v.end - v.start, 0).toFixed(2)}s` + // ${c.length} range${c.length != 1 ? "s" : ""} + return `${c.reduce((a, v) => a + v.end - v.start, 0).toFixed(2)}s` } return this.active_format.liftA2(this.buffered, (p, b) => e("pre", -- cgit v1.2.3-70-g09d2 From 39dee6820db4581fa41cfac8bcfdd399a96f5319 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 16 Apr 2025 00:09:35 +0200 Subject: transcode impl but broken --- common/src/stream.rs | 4 ++-- remuxer/src/lib.rs | 3 +++ remuxer/src/mpeg4.rs | 34 ++++++++++++++++++++++++++++++++++ stream/src/fragment.rs | 32 ++++++++++++++++++++++++-------- stream/src/stream_info.rs | 13 +++++++++---- transcoder/src/fragment.rs | 31 +++++++++++-------------------- web/script/player/mediacaps.ts | 11 ++++++----- web/script/player/track/mse.ts | 20 ++++++++++++++++---- 8 files changed, 105 insertions(+), 43 deletions(-) create mode 100644 remuxer/src/mpeg4.rs diff --git a/common/src/stream.rs b/common/src/stream.rs index ba91ff5..55f2f49 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -209,7 +209,7 @@ impl Display for StreamContainer { StreamContainer::Matroska => "matroska", StreamContainer::WebVTT => "webvtt", StreamContainer::JVTT => "jvtt", - StreamContainer::MPEG4 => "mp4", + StreamContainer::MPEG4 => "mpeg4", }) } } @@ -221,7 +221,7 @@ impl FromStr for StreamContainer { "matroska" => StreamContainer::Matroska, "webvtt" => StreamContainer::WebVTT, "jvtt" => StreamContainer::JVTT, - "mp4" => StreamContainer::MPEG4, + "mpeg4" => StreamContainer::MPEG4, _ => return Err(()), }) } diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index 9ddf7c1..c20197f 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -3,9 +3,11 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ +#![feature(random, exit_status_error)] pub mod extract; pub mod fragment; pub mod metadata; +pub mod mpeg4; pub mod remux; pub mod seek_index; pub mod segment_extractor; @@ -14,6 +16,7 @@ pub mod trim_writer; use ebml_struct::matroska::TrackEntry; pub use fragment::write_fragment_into; use jellymatroska::{Master, MatroskaTag}; +pub use mpeg4::matroska_to_mpeg4; pub use remux::remux_stream_into; pub fn ebml_header(webm: bool) -> MatroskaTag { diff --git a/remuxer/src/mpeg4.rs b/remuxer/src/mpeg4.rs new file mode 100644 index 0000000..9e59514 --- /dev/null +++ b/remuxer/src/mpeg4.rs @@ -0,0 +1,34 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +use anyhow::Result; +use std::{ + fs::{remove_file, File}, + io::{copy, Read, Write}, + process::{Command, Stdio}, + random::random, +}; + +pub fn matroska_to_mpeg4( + mut input: impl Read + Send + 'static, + mut output: impl Write, +) -> Result<()> { + let path = format!("/tmp/jellything-tc-hack-{:016x}", random::()); + let args = format!("-f matroska -i pipe:0 -c copy -map 0 -f mp4 {path}"); + let mut child = Command::new("ffmpeg") + .args(args.split(" ")) + .stdin(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + let mut stdin = child.stdin.take().unwrap(); + copy(&mut input, &mut stdin)?; + drop(stdin); + child.wait()?.exit_ok()?; + copy(&mut File::open(&path)?, &mut output)?; + remove_file(path)?; + + Ok(()) +} diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index 26746fc..2ce3c78 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -4,8 +4,9 @@ Copyright (C) 2025 metamuffin */ use crate::{stream_info, SMediaInfo}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use jellybase::common::stream::StreamContainer; +use jellyremuxer::matroska_to_mpeg4; use jellytranscoder::fragment::transcode; use log::warn; use std::sync::Arc; @@ -55,14 +56,13 @@ pub async fn fragment_stream( &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source track.kind, format, - container, move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( SyncIoBridge::new(b), &path, track_num, - container == StreamContainer::WebM, + false, &info.name.unwrap_or_default(), index, ) { @@ -72,12 +72,28 @@ pub async fn fragment_stream( }, ) .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") + eprintln!("{:?}", location.abs()); + let mut frag = File::open(location.abs()).await?; + match container { + StreamContainer::WebM => {} + StreamContainer::Matroska => { + tokio::task::spawn(async move { + if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { + warn!("cannot write stream: {err}") + } + }); } - }); + StreamContainer::MPEG4 => { + tokio::task::spawn_blocking(move || { + if let Err(err) = + matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + { + warn!("mpeg4 transmux failed: {err}"); + } + }); + } + _ => bail!("unsupported"), + } } Ok(()) diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index 43c536a..c3746c6 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -79,7 +79,12 @@ fn stream_formats(t: &TrackEntry) -> Vec { codec: t.codec_id.to_string(), remux: true, bitrate: 10_000_000., // TODO - containers: containers_by_codec(&t.codec_id), + containers: { + let mut x = containers_by_codec(&t.codec_id); + // TODO remove this + x.retain_mut(|x| *x != StreamContainer::MPEG4); + x + }, bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), channels: t.audio.as_ref().map(|a| a.channels as usize), @@ -101,8 +106,8 @@ fn stream_formats(t: &TrackEntry) -> Vec { ("V_AV1", CONF.encoders.av1.is_some()), ("V_VP8", CONF.encoders.vp8.is_some()), ("V_VP9", CONF.encoders.vp9.is_some()), - ("V_AVC", CONF.encoders.avc.is_some()), - ("V_HEVC", CONF.encoders.hevc.is_some()), + ("V_MPEG4/ISO/AVC", CONF.encoders.avc.is_some()), + ("V_MPEGH/ISO/HEVC", CONF.encoders.hevc.is_some()), ] { if enable { formats.push(StreamFormatInfo { @@ -146,7 +151,7 @@ fn containers_by_codec(codec: &str) -> Vec { use StreamContainer::*; match codec { "V_VP8" | "V_VP9" | "V_AV1" | "A_OPUS" | "A_VORBIS" => vec![Matroska, WebM], - "V_AVC" | "A_AAC" => vec![Matroska, MPEG4], + "V_MPEG4/ISO/AVC" | "A_AAC" => vec![Matroska, MPEG4], "S_TEXT/UTF8" | "S_TEXT/WEBVTT" => vec![Matroska, WebVTT, WebM, JVTT], _ => vec![Matroska], } diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 1d06e9a..8692423 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -7,7 +7,7 @@ use crate::LOCAL_VIDEO_TRANSCODING_TASKS; use jellybase::{ cache::{async_cache_file, CachePath}, - common::stream::{StreamContainer, StreamFormatInfo, TrackKind}, + common::stream::{StreamFormatInfo, TrackKind}, CONF, }; use log::{debug, info}; @@ -24,7 +24,6 @@ pub async fn transcode( key: &str, kind: TrackKind, format: &StreamFormatInfo, - container: StreamContainer, input: impl FnOnce(ChildStdin), ) -> anyhow::Result { async_cache_file( @@ -34,8 +33,8 @@ pub async fn transcode( debug!("transcoding fragment with {format:?}"); let template = match format.codec.as_str() { - "V_AVC" => CONF.encoders.avc.as_ref(), - "V_HEVC" => CONF.encoders.hevc.as_ref(), + "V_MPEG4/ISO/AVC" => CONF.encoders.avc.as_ref(), + "V_MPEGH/ISO/HEVC" => CONF.encoders.hevc.as_ref(), "V_VP8" => CONF.encoders.vp8.as_ref(), "V_VP9" => CONF.encoders.vp9.as_ref(), "V_AV1" => CONF.encoders.av1.as_ref(), @@ -57,35 +56,27 @@ pub async fn transcode( }; let fallback_encoder = match format.codec.as_str() { "A_OPUS" => "libopus", - _ => unreachable!(), + "V_MPEG4/ISO/AVC" => "libx264", + "V_MPEGH/ISO/HEVC" => "libx265", + _ => "", }; let args = template .replace("%i", "-f matroska -i pipe:0") - .replace("%o", "-f %C pipe:1") + .replace("%o", "-f matroska pipe:1") .replace("%f", &filter) .replace("%e", "-c:%t %c -b:%t %r") .replace("%t", typechar) .replace("%c", fallback_encoder) - .replace("%r", &(format.bitrate as i64).to_string()) - .replace("%C", &container.to_string()); + .replace("%r", &(format.bitrate as i64).to_string()); info!("encoding with {:?}", args); - let container = match container { - StreamContainer::WebM => "webm", - StreamContainer::Matroska => "matroska", - StreamContainer::WebVTT => "vtt", - StreamContainer::MPEG4 => "mp4", - StreamContainer::JVTT => unreachable!(), - }; - - let mut proc = Command::new("ffmpeg") + let mut args = args.split(" "); + let mut proc = Command::new(args.next().unwrap()) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .args(["-f", "matroska", "-i", "pipe:0"]) - .args(args.split(" ")) - .args(["-f", container, "pipe:1"]) + .args(args) .spawn()?; let stdin = proc.stdin.take().unwrap(); diff --git a/web/script/player/mediacaps.ts b/web/script/player/mediacaps.ts index 037a84b..29cd64a 100644 --- a/web/script/player/mediacaps.ts +++ b/web/script/player/mediacaps.ts @@ -22,9 +22,9 @@ export async function test_media_capability(format: FormatInfo, container: Strea return r } async function test_media_capability_inner(format: FormatInfo, container: StreamContainer) { - if (format.codec.startsWith("S_") || format.codec.startsWith("V_") || format.codec.startsWith("D_")) { + if (format.codec.startsWith("S_") || format.codec.startsWith("D_")) { // TODO do we need to check this? - return format.codec == "V_TEXT/WEBVTT" || format.codec == "D_WEBVTT/SUBTITLES" + return format.codec == "S_TEXT/WEBVTT" || format.codec == "S_TEXT/UTF8" || format.codec == "D_WEBVTT/SUBTITLES" } let res; if (format.codec.startsWith("A_")) { @@ -50,19 +50,20 @@ async function test_media_capability_inner(format: FormatInfo, container: Stream } }) } - console.log(format, res); return res?.supported ?? false } export function track_to_content_type(format: FormatInfo, container: StreamContainer): string { - return `${CONTAINER_TO_MIME_TYPE[container]}; codecs="${MASTROSKA_CODEC_MAP[format.codec]}"` + let c = CONTAINER_TO_MIME_TYPE[container]; + if (format.codec.startsWith("A_")) c = c.replace("video/", "audio/") + return `${c}; codecs="${MASTROSKA_CODEC_MAP[format.codec]}"` } const MASTROSKA_CODEC_MAP: { [key: string]: string } = { "V_VP9": "vp9", "V_VP8": "vp8", "V_AV1": "av1", - "V_MPEG4/ISO/AVC": "h264", + "V_MPEG4/ISO/AVC": "avc1.4d002a", "V_MPEGH/ISO/HEVC": "h265", "A_OPUS": "opus", "A_VORBIS": "vorbis", diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index 9fa5e42..199aa14 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -4,7 +4,7 @@ Copyright (C) 2025 metamuffin */ import { OVar } from "../../jshelper/mod.ts"; -import { track_to_content_type } from "../mediacaps.ts"; +import { test_media_capability, track_to_content_type } from "../mediacaps.ts"; import { BufferRange, Player } from "../player.ts"; import { PlayerTrack, AppendRange, TARGET_BUFFER_DURATION, MIN_BUFFER_DURATION } from "./mod.ts"; import { e } from "../../jshelper/src/element.ts"; @@ -49,7 +49,19 @@ export class MSEPlayerTrack extends PlayerTrack { } this.buffered.value = [] - this.active_format.value = { usable_index: 0, format_index: 0, container: "webm", format: this.trackinfo.formats[0] } + console.log(this.trackinfo); + + for (let i = 0; i < this.trackinfo.formats.length; i++) { + const format = this.trackinfo.formats[i]; + for (const container of format.containers) { + if (container != "webm" && container != "mpeg4") continue; + if (await test_media_capability(format, container)) + this.usable_formats.push({ container, format, format_index: i, usable_index: this.usable_formats.length }) + } + } + if (!this.usable_formats.length) + return this.player.logger?.log("No availble format is supported by this device. The track can't be played back.") + this.active_format.value = this.usable_formats[0] const ct = track_to_content_type(this.active_format.value!.format, this.active_format.value!.container); this.source_buffer = this.player.media_source.addSourceBuffer(ct); @@ -142,8 +154,8 @@ export class MSEPlayerTrack extends PlayerTrack { this.current_load = frag; // TODO why is appending so unreliable?! sometimes it does not add it this.source_buffer.changeType(track_to_content_type(this.active_format.value!.format, this.active_format.value!.container)); - this.source_buffer.timestampOffset = 0 // TODO send if relative PTS //this.active_format.value !== undefined ? frag.start : 0 - console.log(`append track ${this.track_index}`); + this.source_buffer.timestampOffset = this.active_format.value?.format.remux ? 0 : frag.start + console.log(`append track at ${this.source_buffer.timestampOffset} ${this.trackinfo.kind} ${this.track_index}`); this.source_buffer.appendBuffer(frag.buf); } } -- cgit v1.2.3-70-g09d2 From a9c897c7d7df5509a195055e95dfa821fe7aa274 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 16 Apr 2025 14:39:27 +0200 Subject: the typical mse problems again... --- remuxer/src/mpeg4.rs | 2 +- transcoder/src/fragment.rs | 8 ++++++-- web/script/player/download.ts | 2 +- web/script/player/mediacaps.ts | 4 ++-- web/script/player/track/mse.ts | 3 ++- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/remuxer/src/mpeg4.rs b/remuxer/src/mpeg4.rs index 9e59514..da66fe2 100644 --- a/remuxer/src/mpeg4.rs +++ b/remuxer/src/mpeg4.rs @@ -16,7 +16,7 @@ pub fn matroska_to_mpeg4( mut output: impl Write, ) -> Result<()> { let path = format!("/tmp/jellything-tc-hack-{:016x}", random::()); - let args = format!("-f matroska -i pipe:0 -c copy -map 0 -f mp4 {path}"); + let args = format!("-f matroska -i pipe:0 -copyts -c copy -f mp4 {path}"); let mut child = Command::new("ffmpeg") .args(args.split(" ")) .stdin(Stdio::piped()) diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 8692423..88a311e 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -56,19 +56,23 @@ pub async fn transcode( }; let fallback_encoder = match format.codec.as_str() { "A_OPUS" => "libopus", + "V_VP8" => "libvpx", + "V_VP9" => "libvpx-vp9", + "V_AV1" => "libaom", // svtav1 is x86 only :( "V_MPEG4/ISO/AVC" => "libx264", "V_MPEGH/ISO/HEVC" => "libx265", _ => "", }; let args = template - .replace("%i", "-f matroska -i pipe:0") + .replace("%i", "-f matroska -i pipe:0 -copyts") .replace("%o", "-f matroska pipe:1") .replace("%f", &filter) .replace("%e", "-c:%t %c -b:%t %r") .replace("%t", typechar) .replace("%c", fallback_encoder) - .replace("%r", &(format.bitrate as i64).to_string()); + .replace("%r", &(format.bitrate as i64).to_string()) + .replace(" ", " "); info!("encoding with {:?}", args); diff --git a/web/script/player/download.ts b/web/script/player/download.ts index 18f1e8d..8294d2a 100644 --- a/web/script/player/download.ts +++ b/web/script/player/download.ts @@ -20,7 +20,7 @@ export class SegmentDownloader { const dl_start = performance.now(); const res = await fetch(url) const dl_header = performance.now(); - if (!res.ok) throw new Error("aaaaa"); + if (!res.ok) throw new Error("aaaaaa"); const buf = await res.arrayBuffer() const dl_body = performance.now(); diff --git a/web/script/player/mediacaps.ts b/web/script/player/mediacaps.ts index 29cd64a..3c55aa9 100644 --- a/web/script/player/mediacaps.ts +++ b/web/script/player/mediacaps.ts @@ -63,8 +63,8 @@ const MASTROSKA_CODEC_MAP: { [key: string]: string } = { "V_VP9": "vp9", "V_VP8": "vp8", "V_AV1": "av1", - "V_MPEG4/ISO/AVC": "avc1.4d002a", - "V_MPEGH/ISO/HEVC": "h265", + "V_MPEG4/ISO/AVC": "avc1.42C01F", + "V_MPEGH/ISO/HEVC": "hev1.1.6.L93.90", "A_OPUS": "opus", "A_VORBIS": "vorbis", "S_TEXT/WEBVTT": "webvtt", diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index 199aa14..6bb77e0 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -154,7 +154,8 @@ export class MSEPlayerTrack extends PlayerTrack { this.current_load = frag; // TODO why is appending so unreliable?! sometimes it does not add it this.source_buffer.changeType(track_to_content_type(this.active_format.value!.format, this.active_format.value!.container)); - this.source_buffer.timestampOffset = this.active_format.value?.format.remux ? 0 : frag.start + // this.source_buffer.timestampOffset = this.active_format.value?.format.remux ? 0 : frag.start + this.source_buffer.timestampOffset = 0 console.log(`append track at ${this.source_buffer.timestampOffset} ${this.trackinfo.kind} ${this.track_index}`); this.source_buffer.appendBuffer(frag.buf); } -- cgit v1.2.3-70-g09d2 From edfd710c055621d7ef0c8d0e9c6668b4aa2283d7 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 16 Apr 2025 14:53:58 +0200 Subject: move seek index types to remuxer --- common/src/lib.rs | 1 - common/src/seek_index.rs | 33 --------------------------------- remuxer/src/seek_index.rs | 33 +++++++++++++++++++++++++++++---- web/script/player/track/mse.ts | 1 - 4 files changed, 29 insertions(+), 39 deletions(-) delete mode 100644 common/src/seek_index.rs diff --git a/common/src/lib.rs b/common/src/lib.rs index 00f07b6..4480db5 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -9,7 +9,6 @@ pub mod config; pub mod helpers; pub mod r#impl; pub mod jhls; -pub mod seek_index; pub mod stream; pub mod user; diff --git a/common/src/seek_index.rs b/common/src/seek_index.rs deleted file mode 100644 index 20cf394..0000000 --- a/common/src/seek_index.rs +++ /dev/null @@ -1,33 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ -use bincode::{Decode, Encode}; - -pub const SEEK_INDEX_VERSION: u32 = 0x5eef1de4; - -#[derive(Debug, Clone, Decode, Encode)] -pub struct SeekIndex { - pub version: u32, - pub blocks: Vec, - pub keyframes: Vec, -} - -#[derive(Debug, Clone, Decode, Encode)] -pub struct BlockIndex { - pub pts: u64, - // pub duration: Option, - pub source_off: u64, // points to start of SimpleBlock or BlockGroup (not the Block inside it) - pub size: usize, -} - -impl Default for SeekIndex { - fn default() -> Self { - Self { - version: SEEK_INDEX_VERSION, - blocks: Vec::new(), - keyframes: Vec::new(), - } - } -} diff --git a/remuxer/src/seek_index.rs b/remuxer/src/seek_index.rs index bd351d9..7296d93 100644 --- a/remuxer/src/seek_index.rs +++ b/remuxer/src/seek_index.rs @@ -4,10 +4,8 @@ Copyright (C) 2025 metamuffin */ use anyhow::{Context, Result}; -use jellybase::{ - cache::cache_memory, - common::seek_index::{BlockIndex, SeekIndex}, -}; +use bincode::{Decode, Encode}; +use jellybase::cache::cache_memory; use jellymatroska::{ block::Block, read::EbmlReader, @@ -17,6 +15,33 @@ use jellymatroska::{ use log::{debug, info, trace, warn}; use std::{collections::BTreeMap, fs::File, io::BufReader, path::Path, sync::Arc}; +pub const SEEK_INDEX_VERSION: u32 = 0x5eef1de4; + +#[derive(Debug, Clone, Decode, Encode)] +pub struct SeekIndex { + pub version: u32, + pub blocks: Vec, + pub keyframes: Vec, +} + +#[derive(Debug, Clone, Decode, Encode)] +pub struct BlockIndex { + pub pts: u64, + // pub duration: Option, + pub source_off: u64, // points to start of SimpleBlock or BlockGroup (not the Block inside it) + pub size: usize, +} + +impl Default for SeekIndex { + fn default() -> Self { + Self { + version: SEEK_INDEX_VERSION, + blocks: Vec::new(), + keyframes: Vec::new(), + } + } +} + pub fn get_seek_index(path: &Path) -> anyhow::Result>>> { cache_memory(&["seekindex", path.to_str().unwrap()], move || { info!("generating seek index for {path:?}"); diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index 6bb77e0..5565a6b 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -156,7 +156,6 @@ export class MSEPlayerTrack extends PlayerTrack { this.source_buffer.changeType(track_to_content_type(this.active_format.value!.format, this.active_format.value!.container)); // this.source_buffer.timestampOffset = this.active_format.value?.format.remux ? 0 : frag.start this.source_buffer.timestampOffset = 0 - console.log(`append track at ${this.source_buffer.timestampOffset} ${this.trackinfo.kind} ${this.track_index}`); this.source_buffer.appendBuffer(frag.buf); } } -- cgit v1.2.3-70-g09d2 From ad8016d8014af1e8dfb267fcdb51da63ab8ca4a9 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 16 Apr 2025 17:23:55 +0200 Subject: better debug info and loggin --- web/script/player/mod.ts | 19 +++++++++++-------- web/script/player/track/mse.ts | 7 +++++-- web/style/js-player.css | 2 ++ 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/web/script/player/mod.ts b/web/script/player/mod.ts index 82ee287..af62cde 100644 --- a/web/script/player/mod.ts +++ b/web/script/player/mod.ts @@ -66,11 +66,11 @@ function initialize_player(el: HTMLElement, node_id: string) { let mute_saved_volume = 1; const toggle_mute = () => { if (player.volume.value == 0) { - logger.log("Unmuted."); + logger.log("Unmuted.", "volume"); player.volume.value = mute_saved_volume } else { - logger.log("Muted."); + logger.log("Muted.", "volume"); mute_saved_volume = player.volume.value player.volume.value = 0. } @@ -281,8 +281,8 @@ function initialize_player(el: HTMLElement, node_id: string) { else if (k.code == "KeyS") screenshot_video(player.video) else if (k.code == "KeyJ") step_track_kind("subtitles") else if (k.code == "KeyM") toggle_mute() - else if (k.code == "Digit9") (player.volume.value /= 1.2), logger.log(`Volume decreased to ${show_volume(player.volume.value)}`) - else if (k.code == "Digit0") (player.volume.value *= 1.2), logger.log(`Volume increased to ${show_volume(player.volume.value)}`) + else if (k.code == "Digit9") (player.volume.value /= 1.2), logger.log(`Volume decreased to ${show_volume(player.volume.value)}`, "volume") + else if (k.code == "Digit0") (player.volume.value *= 1.2), logger.log(`Volume increased to ${show_volume(player.volume.value)}`, "volume") else if (k.key == "#") step_track_kind("audio") else if (k.key == "_") step_track_kind("video") else if (k.code == "KeyV") show_stats.value = !show_stats.value @@ -355,10 +355,13 @@ function mouse_idle(e: HTMLElement, timeout: number): OVar { } export function show_format(format: FormatInfo): string { - // if (format.audio) return `codec=${format.audio.codec} br=${show.metric(format.audio.bitrate, "b/s")}${format.audio.sample_rate ? ` sr=${show.metric(format.audio.sample_rate, "Hz")}` : ""}` - // if (format.video) return `codec=${format.video.codec} br=${show.metric(format.video.bitrate, "b/s")} w=${format.video.width} preset=${format.video.preset}` - // if (format.subtitles) return `codec=${format.subtitles.codec}` - return `TODO` + let o = `${format.codec} br=${show.metric(format.bitrate, "b/s")} ac=${format.containers.join(",")}` + if (format.width) o += ` w=${format.width}` + if (format.height) o += ` h=${format.height}` + if (format.samplerate) o += ` ar=${show.metric(format.samplerate, "Hz")}` + if (format.channels) o += ` ac=${format.channels}` + if (format.bit_depth) o += ` bits=${format.bit_depth}` + return o } export function show_volume(v: number): string { return `${v == 0 ? "-∞" : (Math.log10(v) * 10).toFixed(2)}dB | ${(v * 100).toFixed(2)}%` diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index 5565a6b..237b6f6 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -9,6 +9,7 @@ import { BufferRange, Player } from "../player.ts"; import { PlayerTrack, AppendRange, TARGET_BUFFER_DURATION, MIN_BUFFER_DURATION } from "./mod.ts"; import { e } from "../../jshelper/src/element.ts"; import { FormatInfo, FragmentIndex, StreamContainer, TrackInfo } from "../types_stream.ts"; +import { show_format } from "../mod.ts"; interface UsableFormat { format_index: number, usable_index: number, format: FormatInfo, container: StreamContainer } @@ -154,8 +155,9 @@ export class MSEPlayerTrack extends PlayerTrack { this.current_load = frag; // TODO why is appending so unreliable?! sometimes it does not add it this.source_buffer.changeType(track_to_content_type(this.active_format.value!.format, this.active_format.value!.container)); + this.source_buffer.timestampOffset = this.active_format.value?.container == "mpeg4" ? frag.start : 0 // this.source_buffer.timestampOffset = this.active_format.value?.format.remux ? 0 : frag.start - this.source_buffer.timestampOffset = 0 + // this.source_buffer.timestampOffset = 0 this.source_buffer.appendBuffer(frag.buf); } } @@ -170,7 +172,8 @@ export class MSEPlayerTrack extends PlayerTrack { e("pre", p ? `mse track ${this.track_index}: format ${p.format_index} (${p.format.remux ? "remux" : "transcode"})` - + `\n\ttype: ${track_to_content_type(p.format, p.container)} br=${p.format.bitrate}` + + `\n\tformat: ${show_format(p.format)}` + + `\n\tbuffer type: ${track_to_content_type(p.format, p.container)}` + `\n\tbuffered: ${rtype("buffered", b)} / queued: ${rtype("queued", b)} / loading: ${rtype("loading", b)}` : "" ) as HTMLElement diff --git a/web/style/js-player.css b/web/style/js-player.css index c9a48e9..33669a5 100644 --- a/web/style/js-player.css +++ b/web/style/js-player.css @@ -112,6 +112,8 @@ padding: 0.15em; margin: 0px; font-size: large; +} +.jsp .jsh-log-line-appear { animation-name: appear; animation-timing-function: linear; animation-duration: 0.5s; -- cgit v1.2.3-70-g09d2 From cdf95d7b80bd2b78895671da8f462145bb5db522 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 16 Apr 2025 17:24:08 +0200 Subject: webm and mpeg4 fragments semi fixed --- remuxer/src/lib.rs | 5 ++- remuxer/src/matroska_to_mpeg4.rs | 36 +++++++++++++++++ remuxer/src/matroska_to_webm.rs | 84 ++++++++++++++++++++++++++++++++++++++++ remuxer/src/mpeg4.rs | 34 ---------------- stream/src/fragment.rs | 14 +++++-- web/script/jshelper | 2 +- 6 files changed, 135 insertions(+), 40 deletions(-) create mode 100644 remuxer/src/matroska_to_mpeg4.rs create mode 100644 remuxer/src/matroska_to_webm.rs delete mode 100644 remuxer/src/mpeg4.rs diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index c20197f..931d5e6 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -7,16 +7,17 @@ pub mod extract; pub mod fragment; pub mod metadata; -pub mod mpeg4; +pub mod matroska_to_mpeg4; pub mod remux; pub mod seek_index; pub mod segment_extractor; pub mod trim_writer; +pub mod matroska_to_webm; use ebml_struct::matroska::TrackEntry; pub use fragment::write_fragment_into; use jellymatroska::{Master, MatroskaTag}; -pub use mpeg4::matroska_to_mpeg4; +pub use matroska_to_mpeg4::matroska_to_mpeg4; pub use remux::remux_stream_into; pub fn ebml_header(webm: bool) -> MatroskaTag { diff --git a/remuxer/src/matroska_to_mpeg4.rs b/remuxer/src/matroska_to_mpeg4.rs new file mode 100644 index 0000000..e8268e7 --- /dev/null +++ b/remuxer/src/matroska_to_mpeg4.rs @@ -0,0 +1,36 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +use anyhow::Result; +use std::{ + fs::{remove_file, File}, + io::{copy, Read, Write}, + process::{Command, Stdio}, + random::random, +}; + +pub fn matroska_to_mpeg4( + mut input: impl Read + Send + 'static, + mut output: impl Write, +) -> Result<()> { + let path = format!("/tmp/jellything-tc-hack-{:016x}", random::()); + let args = format!( + "-hide_banner -loglevel warning -f matroska -i pipe:0 -copyts -c copy -f mp4 -movflags frag_keyframe+empty_moov {path}" + ); + let mut child = Command::new("ffmpeg") + .args(args.split(" ")) + .stdin(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + let mut stdin = child.stdin.take().unwrap(); + copy(&mut input, &mut stdin)?; + drop(stdin); + child.wait()?.exit_ok()?; + copy(&mut File::open(&path)?, &mut output)?; + remove_file(path)?; + + Ok(()) +} diff --git a/remuxer/src/matroska_to_webm.rs b/remuxer/src/matroska_to_webm.rs new file mode 100644 index 0000000..b9a1819 --- /dev/null +++ b/remuxer/src/matroska_to_webm.rs @@ -0,0 +1,84 @@ +use crate::ebml_track_entry; +use anyhow::Context; +use ebml_struct::{ + ids::*, + matroska::{Cluster, Ebml, Info, Tracks}, + read::{EbmlReadExt, TagRead}, + write::TagWrite, +}; +use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag}; +use log::warn; +use std::io::{BufReader, BufWriter, ErrorKind, Read, Seek, Write}; + +pub fn matroska_to_webm( + input: impl Read + Seek + 'static, + output: impl Write, +) -> anyhow::Result<()> { + let mut output = EbmlWriter::new(BufWriter::new(output), 0); + let mut input = EbmlReader::new(BufReader::new(input)); + + Ebml { + ebml_version: 1, + ebml_read_version: 1, + ebml_max_id_length: 4, + ebml_max_size_length: 8, + doc_type: "webm".to_string(), + doc_type_version: 4, + doc_type_read_version: 2, + doc_type_extensions: vec![], + } + .write(&mut output)?; + output.write_tag(&MatroskaTag::Segment(Master::Start))?; + + let (x, mut ebml) = input.read_tag()?; + assert_eq!(x, EL_EBML); + let ebml = Ebml::read(&mut ebml).unwrap(); + assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); + let (x, mut segment) = input.read_tag()?; + assert_eq!(x, EL_SEGMENT); + + loop { + let (x, mut seg) = match segment.read_tag() { + Ok(o) => o, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + }; + match x { + EL_INFO => { + let info = Info::read(&mut seg).context("info")?; + output.write_tag(&{ + MatroskaTag::Info(Master::Collected(vec![ + MatroskaTag::TimestampScale(info.timestamp_scale), + MatroskaTag::Duration(info.duration.unwrap_or_default()), + MatroskaTag::Title(info.title.unwrap_or_default()), + MatroskaTag::MuxingApp("jellyremux".to_string()), + MatroskaTag::WritingApp("jellything".to_string()), + ])) + })?; + } + EL_TRACKS => { + let tracks = Tracks::read(&mut seg).context("tracks")?; + output.write_tag(&MatroskaTag::Tracks(Master::Collected( + tracks + .entries + .into_iter() + .map(|t| ebml_track_entry(t.track_number, &t)) + .collect(), + )))?; + } + EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD | EL_ATTACHMENTS | EL_TAGS => { + seg.consume()?; + } + EL_CLUSTER => { + let cluster = Cluster::read(&mut seg).context("cluster")?; + // TODO mixing both ebml libraries :))) + cluster.write(&mut output)?; + } + id => { + warn!("unknown top-level element {id:x}"); + seg.consume()?; + } + } + } + Ok(()) +} diff --git a/remuxer/src/mpeg4.rs b/remuxer/src/mpeg4.rs deleted file mode 100644 index da66fe2..0000000 --- a/remuxer/src/mpeg4.rs +++ /dev/null @@ -1,34 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ -use anyhow::Result; -use std::{ - fs::{remove_file, File}, - io::{copy, Read, Write}, - process::{Command, Stdio}, - random::random, -}; - -pub fn matroska_to_mpeg4( - mut input: impl Read + Send + 'static, - mut output: impl Write, -) -> Result<()> { - let path = format!("/tmp/jellything-tc-hack-{:016x}", random::()); - let args = format!("-f matroska -i pipe:0 -copyts -c copy -f mp4 {path}"); - let mut child = Command::new("ffmpeg") - .args(args.split(" ")) - .stdin(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn()?; - - let mut stdin = child.stdin.take().unwrap(); - copy(&mut input, &mut stdin)?; - drop(stdin); - child.wait()?.exit_ok()?; - copy(&mut File::open(&path)?, &mut output)?; - remove_file(path)?; - - Ok(()) -} diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index 2ce3c78..dfe101e 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -6,7 +6,7 @@ use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, bail, Result}; use jellybase::common::stream::StreamContainer; -use jellyremuxer::matroska_to_mpeg4; +use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; use jellytranscoder::fragment::transcode; use log::warn; use std::sync::Arc; @@ -72,10 +72,18 @@ pub async fn fragment_stream( }, ) .await?; - eprintln!("{:?}", location.abs()); + let mut frag = File::open(location.abs()).await?; match container { - StreamContainer::WebM => {} + StreamContainer::WebM => { + tokio::task::spawn_blocking(move || { + if let Err(err) = + matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + { + warn!("webm transmux failed: {err}"); + } + }); + } StreamContainer::Matroska => { tokio::task::spawn(async move { if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { diff --git a/web/script/jshelper b/web/script/jshelper index b2bcdcc..ef36d50 160000 --- a/web/script/jshelper +++ b/web/script/jshelper @@ -1 +1 @@ -Subproject commit b2bcdcc99e42015085b4d0d63e7c94b2d4f84e24 +Subproject commit ef36d50d7858a56cbc08bfb4f272bab9476bb977 -- cgit v1.2.3-70-g09d2