diff options
author | metamuffin <metamuffin@disroot.org> | 2025-03-02 13:43:59 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-03-02 13:43:59 +0100 |
commit | b9539ee3afbf1440b8628bf0609dc0e24aed116c (patch) | |
tree | 5fa67c5db5e3c266385da36116f7723bf1a72c96 | |
parent | f0dbf139d8708194d1ff7e887b1dff48ccc166fa (diff) | |
download | jellything-b9539ee3afbf1440b8628bf0609dc0e24aed116c.tar jellything-b9539ee3afbf1440b8628bf0609dc0e24aed116c.tar.bz2 jellything-b9539ee3afbf1440b8628bf0609dc0e24aed116c.tar.zst |
change things
-rw-r--r-- | common/src/stream.rs | 72 | ||||
-rw-r--r-- | server/src/routes/compat/jellyfin/mod.rs | 27 | ||||
-rw-r--r-- | stream/src/fragment.rs | 104 | ||||
-rw-r--r-- | stream/src/hls.rs | 42 | ||||
-rw-r--r-- | stream/src/jhls.rs | 34 | ||||
-rw-r--r-- | stream/src/lib.rs | 24 | ||||
-rw-r--r-- | stream/src/webvtt.rs | 97 |
7 files changed, 265 insertions, 135 deletions
diff --git a/common/src/stream.rs b/common/src/stream.rs index 46f6abc..0e8f810 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -4,22 +4,23 @@ Copyright (C) 2025 metamuffin <metamuffin.org> */ use serde::{Deserialize, Serialize}; +use std::fmt::Display; #[derive(Debug, Clone, Deserialize, Serialize)] pub enum StreamSpec { Whep { - track: u64, + track: usize, seek: u64, }, WhepControl { token: String, }, Remux { - track: Vec<u64>, + tracks: Vec<usize>, container: StreamContainer, }, Original { - track: u64, + track: usize, }, HlsSuperMultiVariant { container: StreamContainer, @@ -30,7 +31,7 @@ pub enum StreamSpec { }, HlsVariant { segment: u64, - track: u64, + track: usize, container: StreamContainer, format: usize, }, @@ -39,11 +40,11 @@ pub enum StreamSpec { }, FragmentIndex { segment: u64, - track: u64, + track: usize, }, Fragment { segment: u64, - track: u64, + track: usize, index: u64, container: StreamContainer, format: usize, @@ -58,3 +59,62 @@ pub enum StreamContainer { WebVTT, JVTT, } + +impl StreamSpec { + pub fn to_query(&self) -> String { + match self { + StreamSpec::Whep { track, seek } => format!("?whep&track={track}&seek={seek}"), + StreamSpec::WhepControl { token } => format!("?whepcontrol&token={token}"), + StreamSpec::Remux { tracks, container } => { + format!( + "?remux&tracks={}&container={container}", + tracks + .iter() + .map(|t| t.to_string()) + .collect::<Vec<String>>() + .join(",") + ) + } + StreamSpec::Original { track } => format!("?original&track={track}"), + StreamSpec::HlsSuperMultiVariant { container } => { + format!("?hlssupermultivariant&container={container}") + } + StreamSpec::HlsMultiVariant { segment, container } => { + format!("?hlsmultivariant&segment={segment}&container={container}") + } + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => format!( + "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}" + ), + StreamSpec::Info { + segment: Some(segment), + } => format!("?info&segment={segment}"), + StreamSpec::Info { segment: None } => format!("?info"), + StreamSpec::FragmentIndex { segment, track } => { + format!("?fragmentindex&segment={segment}&track={track}") + } + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => format!("?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}"), + } + } +} + +impl Display for StreamContainer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + StreamContainer::WebM => "webm", + StreamContainer::Matroska => "matroska", + StreamContainer::WebVTT => "webvtt", + StreamContainer::JVTT => "jvtt", + }) + } +} diff --git a/server/src/routes/compat/jellyfin/mod.rs b/server/src/routes/compat/jellyfin/mod.rs index ab36a8c..6066760 100644 --- a/server/src/routes/compat/jellyfin/mod.rs +++ b/server/src/routes/compat/jellyfin/mod.rs @@ -21,7 +21,7 @@ use crate::routes::{ use anyhow::{anyhow, Context}; use jellybase::{database::Database, CONF}; use jellycommon::{ - stream::{StreamFormat, StreamSpec}, + stream::{StreamContainer, StreamSpec}, user::{NodeUserData, WatchedState}, MediaInfo, Node, NodeID, NodeKind, SourceTrack, SourceTrackKind, Visibility, }; @@ -446,16 +446,12 @@ pub fn r_jellyfin_video_stream( .get_node_slug(id)? .ok_or(anyhow!("node does not exist"))?; let media = node.media.as_ref().ok_or(anyhow!("node has no media"))?; - Ok(Redirect::temporary(rocket::uri!(r_stream( - id, - StreamSpec { - format: StreamFormat::Matroska, - webm: Some(true), - track: (0..media.tracks.len()).collect(), - index: None, - profile: None, - } - )))) + let params = StreamSpec::Remux { + tracks: (0..media.tracks.len()).collect(), + container: StreamContainer::WebM, + } + .to_query(); + Ok(Redirect::temporary(format!("/n/{id}/stream{params}"))) } #[derive(Deserialize)] @@ -498,9 +494,7 @@ pub fn r_jellyfin_playback_bitratetest(_session: Session, Size: usize) -> Vec<u8 } #[post("/Sessions/Capabilities/Full")] -pub fn r_jellyfin_sessions_capabilities_full(_session: Session) { - -} +pub fn r_jellyfin_sessions_capabilities_full(_session: Session) {} #[derive(Deserialize)] #[serde(rename_all = "PascalCase")] @@ -796,7 +790,10 @@ fn item_object(node: &Node, userdata: &NodeUserData) -> JellyfinItem { location_type: node.media.as_ref().map(|_| "FileSystem".to_owned()), play_access: node.media.as_ref().map(|_| "Full".to_owned()), container: node.media.as_ref().map(|_| "webm".to_owned()), - run_time_ticks: node.media.as_ref().map(|m| (m.duration * 10_000_000.) as i64), + run_time_ticks: node + .media + .as_ref() + .map(|m| (m.duration * 10_000_000.) as i64), media_sources: media_source.as_ref().map(|s| vec![s.clone()]), media_streams: media_source.as_ref().map(|s| s.media_streams.clone()), path: node diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e276d29..a34bb8d 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use jellybase::{ common::{ stream::StreamSpec, @@ -25,64 +25,62 @@ pub async fn fragment_stream( spec: StreamSpec, mut b: DuplexStream, perms: &PermissionSet, + webm: bool, + track: u64, + segment: u64, + index: usize, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("unsupported number of tracks for segment, must be exactly one"); - } - let track = spec.track[0]; - let n = spec.index.ok_or(anyhow!("segment index missing"))?; - let local_track = local_tracks .first() .ok_or(anyhow!("track missing"))? .to_owned(); - if let Some(profile) = spec.profile { - perms.assert(&UserPermission::Transcode)?; - let location = transcode( - &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source - CONF.transcoding_profiles - .get(profile) - .ok_or(anyhow!("profile out of range"))?, - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &CONF.media_path, - &node, - &local_track, - track, - false, - n, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - b, - &CONF.media_path, - &node, - &local_track, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); - } - }); - } + // if let Some(profile) = None { + // perms.assert(&UserPermission::Transcode)?; + // let location = transcode( + // &format!("{track} {index} {:?}", node), // TODO maybe not use the entire source + // CONF.transcoding_profiles + // .get(profile) + // .ok_or(anyhow!("profile out of range"))?, + // move |b| { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &CONF.media_path, + // &node, + // &local_track, + // track as usize, + // false, + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // }, + // ) + // .await?; + // let mut output = File::open(location.abs()).await?; + // tokio::task::spawn(async move { + // if let Err(err) = tokio::io::copy(&mut output, &mut b).await { + // warn!("cannot write stream: {err}") + // } + // }); + // } else { + let b = SyncIoBridge::new(b); + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + b, + &CONF.media_path, + &node, + &local_track, + track as usize, + webm, + index, + ) { + warn!("segment stream error: {err}"); + } + }); + // } Ok(()) } diff --git a/stream/src/hls.rs b/stream/src/hls.rs index dca1036..56edd2d 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Result}; use jellybase::{ common::{ - stream::{StreamFormat, StreamSpec}, + stream::{StreamContainer, StreamSpec}, LocalTrack, Node, SourceTrackKind, }, CONF, @@ -21,7 +21,8 @@ use tokio::{ pub async fn hls_master_stream( node: Arc<Node>, _local_tracks: Vec<LocalTrack>, - _spec: StreamSpec, + segment: u64, + container: StreamContainer, mut b: DuplexStream, ) -> Result<()> { let media = node.media.as_ref().ok_or(anyhow!("no media"))?; @@ -32,10 +33,11 @@ pub async fn hls_master_stream( for (i, t) in media.tracks.iter().enumerate() { let uri = format!( "stream?{}", - StreamSpec { - track: vec![i], - format: StreamFormat::HlsVariant, - ..Default::default() + StreamSpec::HlsVariant { + track: i, + segment, + container, + format: 0 } .to_query() ); @@ -54,14 +56,21 @@ pub async fn hls_master_stream( pub async fn hls_variant_stream( node: Arc<Node>, local_tracks: Vec<LocalTrack>, - mut spec: StreamSpec, + segment: u64, + track: usize, + format: usize, + container: StreamContainer, mut b: DuplexStream, ) -> Result<()> { let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned(); - let track_index = spec.track[0]; let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?; let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, track_index) + jellyremuxer::fragment::fragment_index( + &CONF.media_path, + &node, + &local_track, + track as usize, + ) }) .await??; @@ -72,11 +81,20 @@ pub async fn hls_variant_stream( writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - spec.format = StreamFormat::Fragment; for (i, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; - spec.index = Some(i); - writeln!(out, "stream?{}", spec.to_query())?; + writeln!( + out, + "stream?{}", + StreamSpec::Fragment { + segment, + track, + index: i as u64, + container, + format, + } + .to_query() + )?; } writeln!(out, "#EXT-X-ENDLIST")?; diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs index b222e39..2a2faec 100644 --- a/stream/src/jhls.rs +++ b/stream/src/jhls.rs @@ -24,24 +24,24 @@ pub async fn jhls_index( mut b: DuplexStream, perms: &PermissionSet, ) -> Result<()> { - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + // let local_track = local_tracks + // .first() + // .ok_or(anyhow!("track missing"))? + // .to_owned(); - let fragments = tokio::task::spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) - }) - .await??; + // let fragments = tokio::task::spawn_blocking(move || { + // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) + // }) + // .await??; - let out = serde_json::to_string(&JhlsTrackIndex { - extra_profiles: if perms.check(&UserPermission::Transcode) { - CONF.transcoding_profiles.clone() - } else { - vec![] - }, - fragments, - })?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + // let out = serde_json::to_string(&JhlsTrackIndex { + // extra_profiles: if perms.check(&UserPermission::Transcode) { + // CONF.transcoding_profiles.clone() + // } else { + // vec![] + // }, + // fragments, + // })?; + // tokio::spawn(async move { b.write_all(out.as_bytes()).await }); Ok(()) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 6f31e6b..68b7e44 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -60,6 +60,30 @@ pub async fn stream( ) -> Result<DuplexStream> { let (a, b) = duplex(4096); + match spec { + StreamSpec::Whep { track, seek } => todo!(), + StreamSpec::WhepControl { token } => todo!(), + StreamSpec::Remux { tracks, container } => todo!(), + StreamSpec::Original { track } => todo!(), + StreamSpec::HlsSuperMultiVariant { container } => todo!(), + StreamSpec::HlsMultiVariant { segment, container } => todo!(), + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => todo!(), + StreamSpec::Info { segment } => todo!(), + StreamSpec::FragmentIndex { segment, track } => todo!(), + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => todo!(), + } + Ok(a) } diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index f78ac2f..fbd6382 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -25,39 +25,72 @@ pub async fn vtt_stream( // TODO should use fragments too? big films take too long... - let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; - let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); - let track = &node.media.as_ref().unwrap().tracks[tracki]; - let cp = local_track.codec_private.clone(); + // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let subtitles = async_cache_memory( - &[ - "vtt", - &format!( - "{} {}", - local_track.path.to_str().unwrap(), - local_track.track - ), - ], - move || async move { - let blocks = tokio::task::spawn_blocking(move || { - extract_track(CONF.media_path.clone(), local_track) - }) - .await??; - let subtitles = parse_subtitles(&track.codec, cp, blocks)?; - Ok(subtitles) - }, - ) - .await?; + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // )spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let output = if json { - serde_json::to_string(subtitles.as_ref())? - } else { - write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) - .context("writing webvtt")? - }; - tokio::task::spawn(async move { - let _ = b.write_all(output.as_bytes()).await; - }); + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // ) + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); Ok(()) } |