diff options
-rw-r--r-- | common/src/stream.rs | 20 | ||||
-rw-r--r-- | remuxer/src/snippet.rs | 49 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 2 | ||||
-rw-r--r-- | stream/src/hls.rs | 64 | ||||
-rw-r--r-- | stream/src/lib.rs | 10 | ||||
-rw-r--r-- | stream/src/segment.rs | 2 |
6 files changed, 123 insertions, 24 deletions
diff --git a/common/src/stream.rs b/common/src/stream.rs index af19062..e6fe767 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -26,7 +26,8 @@ pub struct StreamSpec { pub enum StreamFormat { #[cfg_attr(feature = "rocket", field(value = "original"))] Original, #[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska, - #[cfg_attr(feature = "rocket", field(value = "hls"))] Hls, + #[cfg_attr(feature = "rocket", field(value = "hlsmaster"))] HlsMaster, + #[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant, #[cfg_attr(feature = "rocket", field(value = "jhls"))] Jhls, #[cfg_attr(feature = "rocket", field(value = "hlsseg"))] Segment, } @@ -49,10 +50,10 @@ impl StreamSpec { pub fn to_query(&self) -> String { use std::fmt::Write; let mut u = String::new(); - writeln!(u, "format={}", self.format.ident()).unwrap(); + write!(u, "format={}", self.format.ident()).unwrap(); if !self.tracks.is_empty() { - writeln!( + write!( u, "&tracks={}", self.tracks @@ -64,19 +65,19 @@ impl StreamSpec { .unwrap(); } if let Some(abr) = self.abr { - writeln!(u, "&abr={abr}").unwrap(); + write!(u, "&abr={abr}").unwrap(); } if let Some(vbr) = self.vbr { - writeln!(u, "&vbr={vbr}").unwrap(); + write!(u, "&vbr={vbr}").unwrap(); } if let Some(index) = self.index { - writeln!(u, "&index={index}").unwrap(); + write!(u, "&index={index}").unwrap(); } if let Some(webm) = self.webm { - writeln!(u, "&webm={webm}").unwrap(); + write!(u, "&webm={webm}").unwrap(); } if let Some(width) = self.width { - writeln!(u, "&width={width}").unwrap(); + write!(u, "&width={width}").unwrap(); } u } @@ -87,7 +88,8 @@ impl StreamFormat { match self { StreamFormat::Original => "original", StreamFormat::Matroska => "matroska", - StreamFormat::Hls => "hls", + StreamFormat::HlsMaster => "hlsmaster", + StreamFormat::HlsVariant => "hlsvariant", StreamFormat::Jhls => "jhls", StreamFormat::Segment => "hlsseg", } diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs index 186be19..f23e19e 100644 --- a/remuxer/src/snippet.rs +++ b/remuxer/src/snippet.rs @@ -4,18 +4,49 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ -use crate::ebml_track_entry; -use crate::segment_extractor::SegmentExtractIter; -use crate::{ebml_header, ebml_segment_info}; -use anyhow::anyhow; -use anyhow::Context; +use crate::{ + ebml_header, ebml_segment_info, ebml_track_entry, segment_extractor::SegmentExtractIter, +}; +use anyhow::{anyhow, Context, Result}; use jellycommon::{seek_index::SeekIndex, LocalTrack, NodePublic}; use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag}; use log::{debug, info}; -use std::{fs::File, io::Write, path::PathBuf}; +use std::{fs::File, io::Write, ops::Range, path::PathBuf}; const SNIPPET_LENGTH: f64 = 2.; +pub fn snippet_index( + path_base: PathBuf, + item: NodePublic, + track_sources: &Vec<LocalTrack>, + track: usize, +) -> Result<Vec<Range<f64>>> { + let media_info = item.media.as_ref().unwrap(); + let private = &track_sources[track]; + let source_path = path_base.join(&private.path); + let mut index = File::open(source_path.with_extension(format!("si.{}", private.track))) + .context("opening seek index file")?; + let index = + bincode::decode_from_std_read::<SeekIndex, _, _>(&mut index, bincode::config::standard())?; + let average_kf_interval = media_info.duration / index.keyframes.len() as f64; + let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize; + debug!("average keyframe interval: {average_kf_interval}"); + debug!(" => keyframes per snippet {kf_per_snip}"); + + let n_snips = index.keyframes.len().div_ceil(kf_per_snip); + Ok((0..n_snips) + .map(|i| { + let start = index.blocks[index.keyframes[i * kf_per_snip]].pts as f64 / 1000.; + let end = index + .keyframes + .get((i + 1) * kf_per_snip) + .map(|i| index.blocks[*i].pts as f64 / 1000.) + .unwrap_or(media_info.duration); + start..end + }) + .collect()) +} + pub fn write_snippet_into( writer: impl Write, path_base: PathBuf, @@ -50,8 +81,8 @@ pub fn write_snippet_into( // TODO maybe refactor this to approx. contant time per snippet let average_kf_interval = media_info.duration / index.keyframes.len() as f64; let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize; - info!("average keyframe interval: {average_kf_interval}"); - info!(" => keyframes per snippet {kf_per_snip}"); + debug!("average keyframe interval: {average_kf_interval}"); + debug!(" => keyframes per snippet {kf_per_snip}"); let start_block_index = *index .keyframes @@ -79,7 +110,7 @@ pub fn write_snippet_into( let mut reader = SegmentExtractIter::new(&mut reader, private.track as u64); - let mut blocks = Vec::new(); + let mut blocks = vec![MatroskaTag::Timestamp(start_block.pts)]; for i in start_block_index..end_block_index { let index_block = &index.blocks[i]; let mut block = reader.next()?; diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 018fae5..09f788e 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -34,7 +34,7 @@ pub async fn r_stream_head( })) } -#[get("/n/<id>/stream?<spec>")] +#[get("/n/<id>/stream?<spec..>")] pub async fn r_stream( _sess: Session, federation: &State<Federation>, diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 6ddc2a4..81d0c8c 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -3,3 +3,67 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ + +use anyhow::{anyhow, Result}; +use jellybase::CONF; +use jellycommon::{ + stream::{StreamFormat, StreamSpec}, + LocalTrack, Node, +}; +use std::{fmt::Write, ops::Range}; +use tokio::io::{AsyncWriteExt, DuplexStream}; + +pub async fn hls_master_stream( + _node: Node, + _track_sources: Vec<LocalTrack>, + spec: StreamSpec, + mut b: DuplexStream, +) -> Result<()> { + let mut out = String::new(); + writeln!(out, "#EXTM3U")?; + writeln!(out, "#EXT-X-VERSION:4")?; + writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?; + for t in spec.tracks { + let uri = format!( + "stream?{}", + StreamSpec { + tracks: vec![t], + format: StreamFormat::HlsVariant, + ..Default::default() + } + .to_query() + ); + // writeln!(out,"#EXT-X-MEDIA:NAME=\"track {t}\", TYPE=AUDIO, GROUP-ID=\"track{t}\", DEFAULT=YES, AUTOSELECT=YES, URI=\"{uri}\"")?; + writeln!(out, "{uri}")?; + } + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} + +pub async fn hls_variant_stream( + node: Node, + track_sources: Vec<LocalTrack>, + mut spec: StreamSpec, + mut b: DuplexStream, +) -> Result<()> { + let track = *spec.tracks.get(0).ok_or(anyhow!("no track"))?; + let snips = jellyremuxer::snippet::snippet_index( + CONF.library_path.clone(), + node.public.clone(), + &track_sources, + track, + )?; + let mut out = String::new(); + writeln!(out, "#EXTM3U")?; + writeln!(out, "#EXT-X-VERSION:4")?; + + spec.format = StreamFormat::Segment; + for (i, Range { start, end }) in snips.iter().enumerate() { + writeln!(out, "#EXTINF:{},", end - start)?; + spec.index = Some(i); + writeln!(out, "stream?{}", spec.to_query())?; + } + + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 726f1e8..be1ff32 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -7,12 +7,13 @@ pub mod hls; pub mod segment; use anyhow::{anyhow, bail, Context, Result}; +use hls::{hls_master_stream, hls_variant_stream}; use jellybase::CONF; use jellycommon::{ stream::{StreamFormat, StreamSpec}, LocalTrack, MediaSource, Node, }; -use segment::stream_segment; +use segment::segment_stream; use std::{io::SeekFrom, ops::Range}; use tokio::{ fs::File, @@ -31,7 +32,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead { match spec.format { StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, - StreamFormat::Hls => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, + StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, StreamFormat::Jhls => StreamHead { content_type: "application/jellything-jhls+json", range_supported: false }, StreamFormat::Segment => StreamHead { content_type: webm_or_mkv, range_supported: false }, } @@ -53,9 +54,10 @@ pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result match spec.format { StreamFormat::Original => original_stream(track_sources, spec, range, b).await?, StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?, - StreamFormat::Hls => bail!("unsupported"), + StreamFormat::HlsMaster => hls_master_stream(node, track_sources, spec, b).await?, + StreamFormat::HlsVariant => hls_variant_stream(node, track_sources, spec, b).await?, StreamFormat::Jhls => bail!("unsupported"), - StreamFormat::Segment => stream_segment(node, track_sources, spec, b).await?, + StreamFormat::Segment => segment_stream(node, track_sources, spec, b).await?, } Ok(a) diff --git a/stream/src/segment.rs b/stream/src/segment.rs index d09d357..77f1238 100644 --- a/stream/src/segment.rs +++ b/stream/src/segment.rs @@ -10,7 +10,7 @@ use log::warn; use tokio::io::DuplexStream; use tokio_util::io::SyncIoBridge; -pub async fn stream_segment( +pub async fn segment_stream( node: Node, track_sources: Vec<LocalTrack>, spec: StreamSpec, |