diff options
author | metamuffin <metamuffin@disroot.org> | 2025-02-28 13:52:41 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-02-28 13:52:41 +0100 |
commit | f0dbf139d8708194d1ff7e887b1dff48ccc166fa (patch) | |
tree | c88a1ae37f404a243ded8a9548fe260d2cc26832 | |
parent | 26d3a70b0be2809177076e155f987e18e2b2ceb2 (diff) | |
download | jellything-f0dbf139d8708194d1ff7e887b1dff48ccc166fa.tar jellything-f0dbf139d8708194d1ff7e887b1dff48ccc166fa.tar.bz2 jellything-f0dbf139d8708194d1ff7e887b1dff48ccc166fa.tar.zst |
spec + break things
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | client/src/lib.rs | 3 | ||||
-rw-r--r-- | common/src/stream.rs | 120 | ||||
-rw-r--r-- | common/src/user.rs | 46 | ||||
-rw-r--r-- | doc/api.md | 13 | ||||
-rw-r--r-- | stream/Cargo.toml | 2 | ||||
-rw-r--r-- | stream/src/lib.rs | 157 |
7 files changed, 143 insertions, 200 deletions
@@ -1803,7 +1803,9 @@ name = "jellystream" version = "0.1.0" dependencies = [ "anyhow", + "ebml-struct", "jellybase", + "jellymatroska", "jellyremuxer", "jellytranscoder", "log", diff --git a/client/src/lib.rs b/client/src/lib.rs index 96c39b6..1497e45 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -168,7 +168,8 @@ impl Session { "{}/n/{}/stream?{}&{}", self.instance.base(), id, - stream_spec.to_query(), + todo!(), + // stream_spec.to_query(), self.session_param() ) } diff --git a/common/src/stream.rs b/common/src/stream.rs index 3e227e1..46f6abc 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -1,82 +1,60 @@ -use bincode::{Decode, Encode}; /* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -#[cfg(feature = "rocket")] -use rocket::{FromForm, FromFormField, UriDisplayQuery}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "rocket", derive(FromForm, UriDisplayQuery))] -pub struct StreamSpec { - pub track: Vec<usize>, - pub format: StreamFormat, - pub webm: Option<bool>, - pub profile: Option<usize>, - pub index: Option<usize>, +pub enum StreamSpec { + Whep { + track: u64, + seek: u64, + }, + WhepControl { + token: String, + }, + Remux { + track: Vec<u64>, + container: StreamContainer, + }, + Original { + track: u64, + }, + HlsSuperMultiVariant { + container: StreamContainer, + }, + HlsMultiVariant { + segment: u64, + container: StreamContainer, + }, + HlsVariant { + segment: u64, + track: u64, + container: StreamContainer, + format: usize, + }, + Info { + segment: Option<u64>, + }, + FragmentIndex { + segment: u64, + track: u64, + }, + Fragment { + segment: u64, + track: u64, + index: u64, + container: StreamContainer, + format: usize, + }, } -#[rustfmt::skip] -#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Hash, Encode, Decode)] -#[serde(rename_all = "snake_case")] -#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))] -pub enum StreamFormat { - #[cfg_attr(feature = "rocket", field(value = "original"))] Original, - #[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska, - #[cfg_attr(feature = "rocket", field(value = "hlsmaster"))] HlsMaster, - #[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant, - #[cfg_attr(feature = "rocket", field(value = "jhlsi"))] JhlsIndex, - #[cfg_attr(feature = "rocket", field(value = "frag"))] Fragment, - #[cfg_attr(feature = "rocket", field(value = "webvtt"))] Webvtt, - #[cfg_attr(feature = "rocket", field(value = "jvtt"))] Jvtt, -} - -impl Default for StreamSpec { - fn default() -> Self { - Self { - track: Vec::new(), - format: StreamFormat::Matroska, - webm: Some(true), - profile: None, - index: None, - } - } -} - -impl StreamSpec { - pub fn to_query(&self) -> String { - use std::fmt::Write; - let mut u = String::new(); - write!(u, "format={}", self.format.ident()).unwrap(); - for t in &self.track { - write!(u, "&track={}", t).unwrap(); - } - if let Some(profile) = self.profile { - write!(u, "&profile={profile}").unwrap(); - } - if let Some(index) = self.index { - write!(u, "&index={index}").unwrap(); - } - if let Some(webm) = self.webm { - write!(u, "&webm={webm}").unwrap(); - } - u - } -} - -impl StreamFormat { - pub fn ident(&self) -> &'static str { - match self { - StreamFormat::Jvtt => "jvtt", - StreamFormat::Original => "original", - StreamFormat::Matroska => "matroska", - StreamFormat::HlsMaster => "hlsmaster", - StreamFormat::HlsVariant => "hlsvariant", - StreamFormat::JhlsIndex => "jhlsi", - StreamFormat::Fragment => "frag", - StreamFormat::Webvtt => "webvtt", - } - } +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum StreamContainer { + WebM, + Matroska, + WebVTT, + JVTT, } diff --git a/common/src/user.rs b/common/src/user.rs index ef78eca..e0e7a0d 100644 --- a/common/src/user.rs +++ b/common/src/user.rs @@ -3,7 +3,6 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{stream::StreamFormat, user}; use bincode::{Decode, Encode}; #[cfg(feature = "rocket")] use rocket::{FromFormField, UriDisplayQuery}; @@ -99,7 +98,6 @@ pub enum UserPermission { ManageSelf, AccessNode(String), - StreamFormat(StreamFormat), Transcode, FederatedContent, } @@ -107,33 +105,11 @@ pub enum UserPermission { impl UserPermission { pub const ALL_ENUMERABLE: &'static [UserPermission] = { use UserPermission::*; - &[ - Admin, - Transcode, - ManageSelf, - FederatedContent, - StreamFormat(user::StreamFormat::Original), - ] + &[Admin, Transcode, ManageSelf, FederatedContent] }; pub fn default_value(&self) -> bool { - use user::StreamFormat::*; use UserPermission::*; - matches!( - self, - Transcode - | ManageSelf - | FederatedContent - | StreamFormat( - JhlsIndex - | Jvtt - | HlsMaster - | HlsVariant - | Matroska - | Fragment - | Webvtt - | Original // TODO remove later - ) - ) + matches!(self, Transcode | ManageSelf | FederatedContent) } } @@ -143,15 +119,15 @@ impl Display for UserPermission { UserPermission::ManageSelf => "manage self (password, display name, etc.)".to_string(), UserPermission::FederatedContent => "access to federated content".to_string(), UserPermission::Admin => "administrative rights".to_string(), - UserPermission::StreamFormat(StreamFormat::Original) => { - "downloading the original media".to_string() - } - UserPermission::StreamFormat(StreamFormat::Matroska) => { - "downloading a remuxed WebM/Matroska version of the media ".to_string() - } - UserPermission::StreamFormat(x) => { - format!("downloading media via {x:?}") - } + // UserPermission::StreamFormat(StreamFormat::Original) => { + // "downloading the original media".to_string() + // } + // UserPermission::StreamFormat(StreamFormat::Matroska) => { + // "downloading a remuxed WebM/Matroska version of the media ".to_string() + // } + // UserPermission::StreamFormat(x) => { + // format!("downloading media via {x:?}") + // } UserPermission::Transcode => "transcoding".to_string(), // UserPermission::ManageUsers => format!("management of all users"), // UserPermission::GenerateInvite => format!("inviting new users"), @@ -111,19 +111,19 @@ federation. by the server. - `?whepcontrol&<token>` - WebSocket endpoint for controlling WHEP playback. TODO schema -- `?remux&<track...>&<webm>` -- `?hlssupermultivariant&<webm>` +- `?remux&<track...>&<container>` +- `?hlssupermultivariant&<container>` - Returns m3u8/HLS playlist of all known multi-variant playlists, one for each segment. The plylist is updated for live media. -- `?hlsmultivariant&<segment>&<webm>` +- `?hlsmultivariant&<segment>&<container>` - Returns m3u8/HLS playlist of all track formats' variant playlists. -- `?hlsvariant&<segment>&<track>&<webm>&<format>` +- `?hlsvariant&<segment>&<track>&<container>&<format>` - Returns m3u8/HLS playlist of all known fragments of this track format. The playlist is updated for live media. - `?info&<segment?>` - Returns JSON `SegmentInfo` if a segment index is provided, else `MediaInfo` - `?fragmentindex&<segment>&<track>` -- `?fragment&<segment>&<track>&<index>&<webm>&<format>` +- `?fragment&<segment>&<track>&<index>&<container>&<format>` ```ts interface MediaInfo { @@ -147,10 +147,13 @@ interface TrackFormat { bandwidth: number; remux: bool; title?: string; + containers: StreamContainer[]; a_sampling_frequency?: number; a_channels?: number; v_resolution_width?: number; av_bit_depth?: number; } +type FragmentIndex = number[]; +type StreamContainer = "webm" | "matroska" | "webvtt" | "jvtt"; ``` diff --git a/stream/Cargo.toml b/stream/Cargo.toml index 36979c9..21d1650 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" jellybase = { path = "../base", features = ["rocket"] } jellytranscoder = { path = "../transcoder" } jellyremuxer = { path = "../remuxer" } +jellymatroska = { path = "../matroska" } +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" } log = { workspace = true } anyhow = { workspace = true } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 00338c1..6f31e6b 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -9,93 +9,57 @@ pub mod hls; pub mod jhls; pub mod webvtt; -use anyhow::{anyhow, bail, Context, Result}; -use fragment::fragment_stream; -use hls::{hls_master_stream, hls_variant_stream}; -use jellybase::{ - assetfed::AssetInner, - common::{ - stream::{StreamFormat, StreamSpec}, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, TrackSource, - }, - permission::PermissionSetExt, - CONF, +use anyhow::Result; +use ebml_struct::matroska::{Info, Tracks}; +use jellybase::common::{ + stream::{StreamContainer, StreamSpec}, + LocalTrack, MediaInfo, Node, }; -use jhls::jhls_index; -use std::{io::SeekFrom, ops::Range, sync::Arc}; +use jellymatroska::block::LacingType; +use std::{ops::Range, sync::Arc}; use tokio::{ fs::File, - io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, + io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, }; use tokio_util::io::SyncIoBridge; -use webvtt::vtt_stream; pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } -#[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; - match spec.format { - StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, - StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, - StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, - StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, - StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, - StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false }, - StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false }, + let cons = |ct: &'static str, rs: bool| StreamHead { + content_type: ct, + range_supported: rs, + }; + let container_ct = |x: StreamContainer| match x { + StreamContainer::WebM => "video/webm", + StreamContainer::Matroska => "video/x-matroska", + StreamContainer::WebVTT => "text/vtt", + StreamContainer::JVTT => "application/jellything-vtt+json", + }; + match spec { + StreamSpec::Whep { .. } => cons("application/x-todo", false), + StreamSpec::WhepControl { .. } => cons("application/x-todo", false), + StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), + StreamSpec::Original { .. } => cons("video/x-matroska", true), + StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), + StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), + StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( - node: Arc<Node>, + info: Arc<MediaInfo>, spec: StreamSpec, range: Range<usize>, - perms: &PermissionSet, ) -> Result<DuplexStream> { - perms.assert(&UserPermission::StreamFormat(spec.format))?; - let (a, b) = duplex(4096); - // TODO remux of mixed remote and local tracks?! - let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?; - - let local_tracks = spec - .track - .iter() - .map(|i| { - anyhow::Ok( - match &track_sources - .tracks - .get(*i) - .ok_or(anyhow!("track does not exist"))? - .source - { - TrackSource::Local(t) => AssetInner::deser(&t.0)? - .as_local_track() - .ok_or(anyhow!("asset not a track"))?, - TrackSource::Remote(_) => bail!("track is not local"), - }, - ) - }) - .collect::<anyhow::Result<Vec<_>>>()? - .into_iter() - .collect::<Vec<_>>(); - - match spec.format { - StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, - StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, - StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, - StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, - StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, - StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?, - StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?, - StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?, - } - Ok(a) } @@ -108,17 +72,17 @@ async fn remux_stream( ) -> Result<()> { let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - jellyremuxer::remux_stream_into( - b, - range, - CONF.media_path.to_owned(), - &node, - local_tracks, - spec.track, - spec.webm.unwrap_or(false), - ) - }); + // tokio::task::spawn_blocking(move || { + // jellyremuxer::remux_stream_into( + // b, + // range, + // CONF.media_path.to_owned(), + // &node, + // local_tracks, + // spec.track, + // spec.webm.unwrap_or(false), + // ) + // }); Ok(()) } @@ -129,19 +93,19 @@ async fn original_stream( range: Range<usize>, b: DuplexStream, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - } + // if spec.track.len() != 1 { + // bail!("invalid amout of source \"tracks\". original only allows for exactly one.") + // } - let source = local_tracks[spec.track[0]].clone(); - let mut file = File::open(CONF.media_path.join(source.path)) - .await - .context("opening source")?; - file.seek(SeekFrom::Start(range.start as u64)) - .await - .context("seek source")?; + // let source = local_tracks[spec.track[0]].clone(); + // let mut file = File::open(CONF.media_path.join(source.path)) + // .await + // .context("opening source")?; + // file.seek(SeekFrom::Start(range.start as u64)) + // .await + // .context("seek source")?; - tokio::task::spawn(copy_stream(file, b, range.end - range.start)); + // tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } @@ -157,3 +121,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> amount -= size; } } + +// TODO functions to test seekability, get live status and enumate segments +trait MediaSource { + /// Seeks to some position close to, but before, `time` ticks. + fn seek(&mut self, time: u64) -> Result<()>; + /// Retrieve headers (info and tracks) for some segment. + fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; + /// Returns the next block and the current segment index + fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>; +} +pub struct AbsBlock { + track: u64, + pts: u64, + keyframe: bool, + lacing: Option<LacingType>, + data: Vec<u8>, +} |