diff options
Diffstat (limited to 'stream/src')
-rw-r--r-- | stream/src/fragment.rs | 104 | ||||
-rw-r--r-- | stream/src/hls.rs | 42 | ||||
-rw-r--r-- | stream/src/jhls.rs | 34 | ||||
-rw-r--r-- | stream/src/lib.rs | 24 | ||||
-rw-r--r-- | stream/src/webvtt.rs | 97 |
5 files changed, 187 insertions, 114 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e276d29..a34bb8d 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use jellybase::{ common::{ stream::StreamSpec, @@ -25,64 +25,62 @@ pub async fn fragment_stream( spec: StreamSpec, mut b: DuplexStream, perms: &PermissionSet, + webm: bool, + track: u64, + segment: u64, + index: usize, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("unsupported number of tracks for segment, must be exactly one"); - } - let track = spec.track[0]; - let n = spec.index.ok_or(anyhow!("segment index missing"))?; - let local_track = local_tracks .first() .ok_or(anyhow!("track missing"))? .to_owned(); - if let Some(profile) = spec.profile { - perms.assert(&UserPermission::Transcode)?; - let location = transcode( - &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source - CONF.transcoding_profiles - .get(profile) - .ok_or(anyhow!("profile out of range"))?, - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &CONF.media_path, - &node, - &local_track, - track, - false, - n, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - b, - &CONF.media_path, - &node, - &local_track, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); - } - }); - } + // if let Some(profile) = None { + // perms.assert(&UserPermission::Transcode)?; + // let location = transcode( + // &format!("{track} {index} {:?}", node), // TODO maybe not use the entire source + // CONF.transcoding_profiles + // .get(profile) + // .ok_or(anyhow!("profile out of range"))?, + // move |b| { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &CONF.media_path, + // &node, + // &local_track, + // track as usize, + // false, + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // }, + // ) + // .await?; + // let mut output = File::open(location.abs()).await?; + // tokio::task::spawn(async move { + // if let Err(err) = tokio::io::copy(&mut output, &mut b).await { + // warn!("cannot write stream: {err}") + // } + // }); + // } else { + let b = SyncIoBridge::new(b); + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + b, + &CONF.media_path, + &node, + &local_track, + track as usize, + webm, + index, + ) { + warn!("segment stream error: {err}"); + } + }); + // } Ok(()) } diff --git a/stream/src/hls.rs b/stream/src/hls.rs index dca1036..56edd2d 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Result}; use jellybase::{ common::{ - stream::{StreamFormat, StreamSpec}, + stream::{StreamContainer, StreamSpec}, LocalTrack, Node, SourceTrackKind, }, CONF, @@ -21,7 +21,8 @@ use tokio::{ pub async fn hls_master_stream( node: Arc<Node>, _local_tracks: Vec<LocalTrack>, - _spec: StreamSpec, + segment: u64, + container: StreamContainer, mut b: DuplexStream, ) -> Result<()> { let media = node.media.as_ref().ok_or(anyhow!("no media"))?; @@ -32,10 +33,11 @@ pub async fn hls_master_stream( for (i, t) in media.tracks.iter().enumerate() { let uri = format!( "stream?{}", - StreamSpec { - track: vec![i], - format: StreamFormat::HlsVariant, - ..Default::default() + StreamSpec::HlsVariant { + track: i, + segment, + container, + format: 0 } .to_query() ); @@ -54,14 +56,21 @@ pub async fn hls_master_stream( pub async fn hls_variant_stream( node: Arc<Node>, local_tracks: Vec<LocalTrack>, - mut spec: StreamSpec, + segment: u64, + track: usize, + format: usize, + container: StreamContainer, mut b: DuplexStream, ) -> Result<()> { let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned(); - let track_index = spec.track[0]; let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?; let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, track_index) + jellyremuxer::fragment::fragment_index( + &CONF.media_path, + &node, + &local_track, + track as usize, + ) }) .await??; @@ -72,11 +81,20 @@ pub async fn hls_variant_stream( writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - spec.format = StreamFormat::Fragment; for (i, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; - spec.index = Some(i); - writeln!(out, "stream?{}", spec.to_query())?; + writeln!( + out, + "stream?{}", + StreamSpec::Fragment { + segment, + track, + index: i as u64, + container, + format, + } + .to_query() + )?; } writeln!(out, "#EXT-X-ENDLIST")?; diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs index b222e39..2a2faec 100644 --- a/stream/src/jhls.rs +++ b/stream/src/jhls.rs @@ -24,24 +24,24 @@ pub async fn jhls_index( mut b: DuplexStream, perms: &PermissionSet, ) -> Result<()> { - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + // let local_track = local_tracks + // .first() + // .ok_or(anyhow!("track missing"))? + // .to_owned(); - let fragments = tokio::task::spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) - }) - .await??; + // let fragments = tokio::task::spawn_blocking(move || { + // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) + // }) + // .await??; - let out = serde_json::to_string(&JhlsTrackIndex { - extra_profiles: if perms.check(&UserPermission::Transcode) { - CONF.transcoding_profiles.clone() - } else { - vec![] - }, - fragments, - })?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + // let out = serde_json::to_string(&JhlsTrackIndex { + // extra_profiles: if perms.check(&UserPermission::Transcode) { + // CONF.transcoding_profiles.clone() + // } else { + // vec![] + // }, + // fragments, + // })?; + // tokio::spawn(async move { b.write_all(out.as_bytes()).await }); Ok(()) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 6f31e6b..68b7e44 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -60,6 +60,30 @@ pub async fn stream( ) -> Result<DuplexStream> { let (a, b) = duplex(4096); + match spec { + StreamSpec::Whep { track, seek } => todo!(), + StreamSpec::WhepControl { token } => todo!(), + StreamSpec::Remux { tracks, container } => todo!(), + StreamSpec::Original { track } => todo!(), + StreamSpec::HlsSuperMultiVariant { container } => todo!(), + StreamSpec::HlsMultiVariant { segment, container } => todo!(), + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => todo!(), + StreamSpec::Info { segment } => todo!(), + StreamSpec::FragmentIndex { segment, track } => todo!(), + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => todo!(), + } + Ok(a) } diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index f78ac2f..fbd6382 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -25,39 +25,72 @@ pub async fn vtt_stream( // TODO should use fragments too? big films take too long... - let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; - let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); - let track = &node.media.as_ref().unwrap().tracks[tracki]; - let cp = local_track.codec_private.clone(); + // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let subtitles = async_cache_memory( - &[ - "vtt", - &format!( - "{} {}", - local_track.path.to_str().unwrap(), - local_track.track - ), - ], - move || async move { - let blocks = tokio::task::spawn_blocking(move || { - extract_track(CONF.media_path.clone(), local_track) - }) - .await??; - let subtitles = parse_subtitles(&track.codec, cp, blocks)?; - Ok(subtitles) - }, - ) - .await?; + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // )spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let output = if json { - serde_json::to_string(subtitles.as_ref())? - } else { - write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) - .context("writing webvtt")? - }; - tokio::task::spawn(async move { - let _ = b.write_all(output.as_bytes()).await; - }); + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // ) + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); Ok(()) } |