diff options
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r-- | stream/src/lib.rs | 110 |
1 files changed, 40 insertions, 70 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 5b4e8ed..60c283c 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -4,30 +4,29 @@ Copyright (C) 2025 metamuffin <metamuffin.org> */ #![feature(iterator_try_collect)] +pub mod cues; mod fragment; mod fragment_index; mod hls; +pub mod metadata; mod stream_info; mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; -use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; +use hls::{hls_multivariant_stream, hls_variant_stream}; use jellystream_types::{StreamContainer, StreamSpec}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeSet, - io::SeekFrom, + fs::File, + io::{Read, Seek, SeekFrom}, ops::Range, path::PathBuf, sync::{Arc, LazyLock, Mutex}, }; use stream_info::{stream_info, write_stream_info}; -use tokio::{ - fs::File, - io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, -}; #[rustfmt::skip] #[derive(Debug, Deserialize, Serialize, Default)] @@ -60,98 +59,69 @@ pub struct StreamHead { } pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let cons = |ct: &'static str, rs: bool| StreamHead { - content_type: ct, - range_supported: rs, - }; + use StreamContainer::*; + use StreamSpec::*; let container_ct = |x: StreamContainer| match x { - StreamContainer::WebM => "video/webm", - StreamContainer::Matroska => "video/x-matroska", - StreamContainer::WebVTT => "text/vtt", - StreamContainer::JVTT => "application/jellything-vtt+json", - StreamContainer::MPEG4 => "video/mp4", + WebM => "video/webm", + Matroska => "video/x-matroska", + WebVTT => "text/vtt", + JVTT => "application/jellything-vtt+json", + MPEG4 => "video/mp4", }; - match spec { - StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), - StreamSpec::Original { .. } => cons("video/x-matroska", true), - StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), - StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), - StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), + let range_supported = matches!(spec, Remux { .. } | Original { .. }); + let content_type = match spec { + Original { .. } => "video/x-matroska", + HlsMultiVariant { .. } => "application/vnd.apple.mpegurl", + HlsVariant { .. } => "application/vnd.apple.mpegurl", + Info { .. } => "application/jellything-stream-info+json", + FragmentIndex { .. } => "application/jellything-frag-index+json", + Fragment { container, .. } => container_ct(*container), + Remux { container, .. } => container_ct(*container), + }; + StreamHead { + content_type, + range_supported, } } -pub async fn stream( +pub fn stream( info: Arc<SMediaInfo>, spec: StreamSpec, - range: Range<usize>, -) -> Result<DuplexStream> { - let (a, b) = duplex(4096); - + range: Range<u64>, +) -> Result<Box<dyn Read + Send + Sync>> { match spec { - StreamSpec::Original { track } => original_stream(info, track, range, b).await?, - StreamSpec::HlsSuperMultiVariant { container } => { - hls_supermultivariant_stream(b, info, container).await?; - } - StreamSpec::HlsMultiVariant { segment, container } => { - hls_multivariant_stream(b, info, segment, container).await? - } + StreamSpec::Original { track } => original_stream(info, track, range), + StreamSpec::HlsMultiVariant { container } => hls_multivariant_stream(info, container), StreamSpec::HlsVariant { - segment, track, container, format, - } => hls_variant_stream(b, info, segment, track, format, container).await?, - StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, - StreamSpec::FragmentIndex { segment, track } => { - fragment_index_stream(b, info, segment, track).await? - } + } => hls_variant_stream(info, track, format, container), + StreamSpec::Info => write_stream_info(info), + StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track), StreamSpec::Fragment { - segment, track, index, container, format, - } => fragment_stream(b, info, track, segment, index, format, container).await?, + } => fragment_stream(info, track, index, format, container), _ => bail!("todo"), } - - Ok(a) } -async fn original_stream( +fn original_stream( info: Arc<SMediaInfo>, track: usize, - range: Range<usize>, - b: DuplexStream, -) -> Result<()> { - let (iinfo, _info) = stream_info(info).await?; + range: Range<u64>, +) -> Result<Box<dyn Read+ Send + Sync>> { + let (iinfo, _info) = stream_info(info)?; let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; - let mut file = File::open(&iinfo.paths[file_index]) - .await - .context("opening source")?; + let mut file = File::open(&iinfo.paths[file_index]).context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) - .await .context("seek source")?; - tokio::task::spawn(copy_stream(file, b, range.end - range.start)); - - Ok(()) -} - -async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { - let mut buf = [0u8; 4096]; - loop { - let size = inp.read(&mut buf[..amount.min(4096)]).await?; - if size == 0 { - break Ok(()); - } - out.write_all(&buf[..size]).await?; - amount -= size; - } + Ok(Box::new(file.take(range.end - range.start))) } |