diff options
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r-- | stream/src/lib.rs | 157 |
1 files changed, 69 insertions, 88 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 00338c1..6f31e6b 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -9,93 +9,57 @@ pub mod hls; pub mod jhls; pub mod webvtt; -use anyhow::{anyhow, bail, Context, Result}; -use fragment::fragment_stream; -use hls::{hls_master_stream, hls_variant_stream}; -use jellybase::{ - assetfed::AssetInner, - common::{ - stream::{StreamFormat, StreamSpec}, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, TrackSource, - }, - permission::PermissionSetExt, - CONF, +use anyhow::Result; +use ebml_struct::matroska::{Info, Tracks}; +use jellybase::common::{ + stream::{StreamContainer, StreamSpec}, + LocalTrack, MediaInfo, Node, }; -use jhls::jhls_index; -use std::{io::SeekFrom, ops::Range, sync::Arc}; +use jellymatroska::block::LacingType; +use std::{ops::Range, sync::Arc}; use tokio::{ fs::File, - io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, + io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, }; use tokio_util::io::SyncIoBridge; -use webvtt::vtt_stream; pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } -#[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; - match spec.format { - StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, - StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, - StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, - StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, - StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, - StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false }, - StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false }, + let cons = |ct: &'static str, rs: bool| StreamHead { + content_type: ct, + range_supported: rs, + }; + let container_ct = |x: StreamContainer| match x { + StreamContainer::WebM => "video/webm", + StreamContainer::Matroska => "video/x-matroska", + StreamContainer::WebVTT => "text/vtt", + StreamContainer::JVTT => "application/jellything-vtt+json", + }; + match spec { + StreamSpec::Whep { .. } => cons("application/x-todo", false), + StreamSpec::WhepControl { .. } => cons("application/x-todo", false), + StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), + StreamSpec::Original { .. } => cons("video/x-matroska", true), + StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), + StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), + StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( - node: Arc<Node>, + info: Arc<MediaInfo>, spec: StreamSpec, range: Range<usize>, - perms: &PermissionSet, ) -> Result<DuplexStream> { - perms.assert(&UserPermission::StreamFormat(spec.format))?; - let (a, b) = duplex(4096); - // TODO remux of mixed remote and local tracks?! - let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?; - - let local_tracks = spec - .track - .iter() - .map(|i| { - anyhow::Ok( - match &track_sources - .tracks - .get(*i) - .ok_or(anyhow!("track does not exist"))? - .source - { - TrackSource::Local(t) => AssetInner::deser(&t.0)? - .as_local_track() - .ok_or(anyhow!("asset not a track"))?, - TrackSource::Remote(_) => bail!("track is not local"), - }, - ) - }) - .collect::<anyhow::Result<Vec<_>>>()? - .into_iter() - .collect::<Vec<_>>(); - - match spec.format { - StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, - StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, - StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, - StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, - StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, - StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?, - StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?, - StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?, - } - Ok(a) } @@ -108,17 +72,17 @@ async fn remux_stream( ) -> Result<()> { let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - jellyremuxer::remux_stream_into( - b, - range, - CONF.media_path.to_owned(), - &node, - local_tracks, - spec.track, - spec.webm.unwrap_or(false), - ) - }); + // tokio::task::spawn_blocking(move || { + // jellyremuxer::remux_stream_into( + // b, + // range, + // CONF.media_path.to_owned(), + // &node, + // local_tracks, + // spec.track, + // spec.webm.unwrap_or(false), + // ) + // }); Ok(()) } @@ -129,19 +93,19 @@ async fn original_stream( range: Range<usize>, b: DuplexStream, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - } + // if spec.track.len() != 1 { + // bail!("invalid amout of source \"tracks\". original only allows for exactly one.") + // } - let source = local_tracks[spec.track[0]].clone(); - let mut file = File::open(CONF.media_path.join(source.path)) - .await - .context("opening source")?; - file.seek(SeekFrom::Start(range.start as u64)) - .await - .context("seek source")?; + // let source = local_tracks[spec.track[0]].clone(); + // let mut file = File::open(CONF.media_path.join(source.path)) + // .await + // .context("opening source")?; + // file.seek(SeekFrom::Start(range.start as u64)) + // .await + // .context("seek source")?; - tokio::task::spawn(copy_stream(file, b, range.end - range.start)); + // tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } @@ -157,3 +121,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> amount -= size; } } + +// TODO functions to test seekability, get live status and enumate segments +trait MediaSource { + /// Seeks to some position close to, but before, `time` ticks. + fn seek(&mut self, time: u64) -> Result<()>; + /// Retrieve headers (info and tracks) for some segment. + fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; + /// Returns the next block and the current segment index + fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>; +} +pub struct AbsBlock { + track: u64, + pts: u64, + keyframe: bool, + lacing: Option<LacingType>, + data: Vec<u8>, +} |