diff options
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r-- | stream/src/lib.rs | 167 |
1 files changed, 72 insertions, 95 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 00338c1..4df87ae 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -5,136 +5,113 @@ */ #![feature(iterator_try_collect)] pub mod fragment; +pub mod fragment_index; pub mod hls; -pub mod jhls; +pub mod stream_info; pub mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; -use hls::{hls_master_stream, hls_variant_stream}; -use jellybase::{ - assetfed::AssetInner, - common::{ - stream::{StreamFormat, StreamSpec}, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, TrackSource, - }, - permission::PermissionSetExt, - CONF, +use fragment_index::fragment_index_stream; +use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; +use jellybase::common::{ + stream::{StreamContainer, StreamSpec}, + Node, }; -use jhls::jhls_index; -use std::{io::SeekFrom, ops::Range, sync::Arc}; +use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; +use stream_info::{stream_info, write_stream_info}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; -use tokio_util::io::SyncIoBridge; -use webvtt::vtt_stream; + +#[derive(Debug)] +pub struct SMediaInfo { + pub info: Arc<Node>, + pub files: BTreeSet<PathBuf>, +} pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } -#[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; - match spec.format { - StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, - StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, - StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, - StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, - StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, - StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false }, - StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false }, + let cons = |ct: &'static str, rs: bool| StreamHead { + content_type: ct, + range_supported: rs, + }; + let container_ct = |x: StreamContainer| match x { + StreamContainer::WebM => "video/webm", + StreamContainer::Matroska => "video/x-matroska", + StreamContainer::WebVTT => "text/vtt", + StreamContainer::JVTT => "application/jellything-vtt+json", + StreamContainer::MPEG4 => "video/mp4", + }; + match spec { + StreamSpec::Whep { .. } => cons("application/x-todo", false), + StreamSpec::WhepControl { .. } => cons("application/x-todo", false), + StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), + StreamSpec::Original { .. } => cons("video/x-matroska", true), + StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), + StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), + StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( - node: Arc<Node>, + info: Arc<SMediaInfo>, spec: StreamSpec, range: Range<usize>, - perms: &PermissionSet, ) -> Result<DuplexStream> { - perms.assert(&UserPermission::StreamFormat(spec.format))?; - let (a, b) = duplex(4096); - // TODO remux of mixed remote and local tracks?! - let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?; - - let local_tracks = spec - .track - .iter() - .map(|i| { - anyhow::Ok( - match &track_sources - .tracks - .get(*i) - .ok_or(anyhow!("track does not exist"))? - .source - { - TrackSource::Local(t) => AssetInner::deser(&t.0)? - .as_local_track() - .ok_or(anyhow!("asset not a track"))?, - TrackSource::Remote(_) => bail!("track is not local"), - }, - ) - }) - .collect::<anyhow::Result<Vec<_>>>()? - .into_iter() - .collect::<Vec<_>>(); - - match spec.format { - StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, - StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, - StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, - StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, - StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, - StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?, - StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?, - StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?, + match spec { + StreamSpec::Original { track } => original_stream(info, track, range, b).await?, + StreamSpec::HlsSuperMultiVariant { container } => { + hls_supermultivariant_stream(b, info, container).await?; + } + StreamSpec::HlsMultiVariant { segment, container } => { + hls_multivariant_stream(b, info, segment, container).await? + } + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => hls_variant_stream(b, info, segment, track, format, container).await?, + StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, + StreamSpec::FragmentIndex { segment, track } => { + fragment_index_stream(b, info, segment, track).await? + } + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => fragment_stream(b, info, track, segment, index, format, container).await?, + _ => bail!("todo"), } Ok(a) } -async fn remux_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, - range: Range<usize>, - b: DuplexStream, -) -> Result<()> { - let b = SyncIoBridge::new(b); - - tokio::task::spawn_blocking(move || { - jellyremuxer::remux_stream_into( - b, - range, - CONF.media_path.to_owned(), - &node, - local_tracks, - spec.track, - spec.webm.unwrap_or(false), - ) - }); - - Ok(()) -} - async fn original_stream( - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, + info: Arc<SMediaInfo>, + track: usize, range: Range<usize>, b: DuplexStream, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - } - - let source = local_tracks[spec.track[0]].clone(); - let mut file = File::open(CONF.media_path.join(source.path)) + let (iinfo, _info) = stream_info(info).await?; + let (file_index, _) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("unknown track"))?; + let mut file = File::open(&iinfo.paths[file_index]) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) |