aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/src/stream.rs20
-rw-r--r--remuxer/src/snippet.rs49
-rw-r--r--server/src/routes/stream.rs2
-rw-r--r--stream/src/hls.rs64
-rw-r--r--stream/src/lib.rs10
-rw-r--r--stream/src/segment.rs2
6 files changed, 123 insertions, 24 deletions
diff --git a/common/src/stream.rs b/common/src/stream.rs
index af19062..e6fe767 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -26,7 +26,8 @@ pub struct StreamSpec {
pub enum StreamFormat {
#[cfg_attr(feature = "rocket", field(value = "original"))] Original,
#[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska,
- #[cfg_attr(feature = "rocket", field(value = "hls"))] Hls,
+ #[cfg_attr(feature = "rocket", field(value = "hlsmaster"))] HlsMaster,
+ #[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant,
#[cfg_attr(feature = "rocket", field(value = "jhls"))] Jhls,
#[cfg_attr(feature = "rocket", field(value = "hlsseg"))] Segment,
}
@@ -49,10 +50,10 @@ impl StreamSpec {
pub fn to_query(&self) -> String {
use std::fmt::Write;
let mut u = String::new();
- writeln!(u, "format={}", self.format.ident()).unwrap();
+ write!(u, "format={}", self.format.ident()).unwrap();
if !self.tracks.is_empty() {
- writeln!(
+ write!(
u,
"&tracks={}",
self.tracks
@@ -64,19 +65,19 @@ impl StreamSpec {
.unwrap();
}
if let Some(abr) = self.abr {
- writeln!(u, "&abr={abr}").unwrap();
+ write!(u, "&abr={abr}").unwrap();
}
if let Some(vbr) = self.vbr {
- writeln!(u, "&vbr={vbr}").unwrap();
+ write!(u, "&vbr={vbr}").unwrap();
}
if let Some(index) = self.index {
- writeln!(u, "&index={index}").unwrap();
+ write!(u, "&index={index}").unwrap();
}
if let Some(webm) = self.webm {
- writeln!(u, "&webm={webm}").unwrap();
+ write!(u, "&webm={webm}").unwrap();
}
if let Some(width) = self.width {
- writeln!(u, "&width={width}").unwrap();
+ write!(u, "&width={width}").unwrap();
}
u
}
@@ -87,7 +88,8 @@ impl StreamFormat {
match self {
StreamFormat::Original => "original",
StreamFormat::Matroska => "matroska",
- StreamFormat::Hls => "hls",
+ StreamFormat::HlsMaster => "hlsmaster",
+ StreamFormat::HlsVariant => "hlsvariant",
StreamFormat::Jhls => "jhls",
StreamFormat::Segment => "hlsseg",
}
diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs
index 186be19..f23e19e 100644
--- a/remuxer/src/snippet.rs
+++ b/remuxer/src/snippet.rs
@@ -4,18 +4,49 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use crate::ebml_track_entry;
-use crate::segment_extractor::SegmentExtractIter;
-use crate::{ebml_header, ebml_segment_info};
-use anyhow::anyhow;
-use anyhow::Context;
+use crate::{
+ ebml_header, ebml_segment_info, ebml_track_entry, segment_extractor::SegmentExtractIter,
+};
+use anyhow::{anyhow, Context, Result};
use jellycommon::{seek_index::SeekIndex, LocalTrack, NodePublic};
use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag};
use log::{debug, info};
-use std::{fs::File, io::Write, path::PathBuf};
+use std::{fs::File, io::Write, ops::Range, path::PathBuf};
const SNIPPET_LENGTH: f64 = 2.;
+pub fn snippet_index(
+ path_base: PathBuf,
+ item: NodePublic,
+ track_sources: &Vec<LocalTrack>,
+ track: usize,
+) -> Result<Vec<Range<f64>>> {
+ let media_info = item.media.as_ref().unwrap();
+ let private = &track_sources[track];
+ let source_path = path_base.join(&private.path);
+ let mut index = File::open(source_path.with_extension(format!("si.{}", private.track)))
+ .context("opening seek index file")?;
+ let index =
+ bincode::decode_from_std_read::<SeekIndex, _, _>(&mut index, bincode::config::standard())?;
+ let average_kf_interval = media_info.duration / index.keyframes.len() as f64;
+ let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize;
+ debug!("average keyframe interval: {average_kf_interval}");
+ debug!(" => keyframes per snippet {kf_per_snip}");
+
+ let n_snips = index.keyframes.len().div_ceil(kf_per_snip);
+ Ok((0..n_snips)
+ .map(|i| {
+ let start = index.blocks[index.keyframes[i * kf_per_snip]].pts as f64 / 1000.;
+ let end = index
+ .keyframes
+ .get((i + 1) * kf_per_snip)
+ .map(|i| index.blocks[*i].pts as f64 / 1000.)
+ .unwrap_or(media_info.duration);
+ start..end
+ })
+ .collect())
+}
+
pub fn write_snippet_into(
writer: impl Write,
path_base: PathBuf,
@@ -50,8 +81,8 @@ pub fn write_snippet_into(
// TODO maybe refactor this to approx. contant time per snippet
let average_kf_interval = media_info.duration / index.keyframes.len() as f64;
let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize;
- info!("average keyframe interval: {average_kf_interval}");
- info!(" => keyframes per snippet {kf_per_snip}");
+ debug!("average keyframe interval: {average_kf_interval}");
+ debug!(" => keyframes per snippet {kf_per_snip}");
let start_block_index = *index
.keyframes
@@ -79,7 +110,7 @@ pub fn write_snippet_into(
let mut reader = SegmentExtractIter::new(&mut reader, private.track as u64);
- let mut blocks = Vec::new();
+ let mut blocks = vec![MatroskaTag::Timestamp(start_block.pts)];
for i in start_block_index..end_block_index {
let index_block = &index.blocks[i];
let mut block = reader.next()?;
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index 018fae5..09f788e 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -34,7 +34,7 @@ pub async fn r_stream_head(
}))
}
-#[get("/n/<id>/stream?<spec>")]
+#[get("/n/<id>/stream?<spec..>")]
pub async fn r_stream(
_sess: Session,
federation: &State<Federation>,
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 6ddc2a4..81d0c8c 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -3,3 +3,67 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
+
+use anyhow::{anyhow, Result};
+use jellybase::CONF;
+use jellycommon::{
+ stream::{StreamFormat, StreamSpec},
+ LocalTrack, Node,
+};
+use std::{fmt::Write, ops::Range};
+use tokio::io::{AsyncWriteExt, DuplexStream};
+
+pub async fn hls_master_stream(
+ _node: Node,
+ _track_sources: Vec<LocalTrack>,
+ spec: StreamSpec,
+ mut b: DuplexStream,
+) -> Result<()> {
+ let mut out = String::new();
+ writeln!(out, "#EXTM3U")?;
+ writeln!(out, "#EXT-X-VERSION:4")?;
+ writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?;
+ for t in spec.tracks {
+ let uri = format!(
+ "stream?{}",
+ StreamSpec {
+ tracks: vec![t],
+ format: StreamFormat::HlsVariant,
+ ..Default::default()
+ }
+ .to_query()
+ );
+ // writeln!(out,"#EXT-X-MEDIA:NAME=\"track {t}\", TYPE=AUDIO, GROUP-ID=\"track{t}\", DEFAULT=YES, AUTOSELECT=YES, URI=\"{uri}\"")?;
+ writeln!(out, "{uri}")?;
+ }
+ tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ Ok(())
+}
+
+pub async fn hls_variant_stream(
+ node: Node,
+ track_sources: Vec<LocalTrack>,
+ mut spec: StreamSpec,
+ mut b: DuplexStream,
+) -> Result<()> {
+ let track = *spec.tracks.get(0).ok_or(anyhow!("no track"))?;
+ let snips = jellyremuxer::snippet::snippet_index(
+ CONF.library_path.clone(),
+ node.public.clone(),
+ &track_sources,
+ track,
+ )?;
+ let mut out = String::new();
+ writeln!(out, "#EXTM3U")?;
+ writeln!(out, "#EXT-X-VERSION:4")?;
+
+ spec.format = StreamFormat::Segment;
+ for (i, Range { start, end }) in snips.iter().enumerate() {
+ writeln!(out, "#EXTINF:{},", end - start)?;
+ spec.index = Some(i);
+ writeln!(out, "stream?{}", spec.to_query())?;
+ }
+
+ tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ Ok(())
+}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 726f1e8..be1ff32 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -7,12 +7,13 @@ pub mod hls;
pub mod segment;
use anyhow::{anyhow, bail, Context, Result};
+use hls::{hls_master_stream, hls_variant_stream};
use jellybase::CONF;
use jellycommon::{
stream::{StreamFormat, StreamSpec},
LocalTrack, MediaSource, Node,
};
-use segment::stream_segment;
+use segment::segment_stream;
use std::{io::SeekFrom, ops::Range};
use tokio::{
fs::File,
@@ -31,7 +32,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead {
match spec.format {
StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true },
StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
- StreamFormat::Hls => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
+ StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
StreamFormat::Jhls => StreamHead { content_type: "application/jellything-jhls+json", range_supported: false },
StreamFormat::Segment => StreamHead { content_type: webm_or_mkv, range_supported: false },
}
@@ -53,9 +54,10 @@ pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result
match spec.format {
StreamFormat::Original => original_stream(track_sources, spec, range, b).await?,
StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?,
- StreamFormat::Hls => bail!("unsupported"),
+ StreamFormat::HlsMaster => hls_master_stream(node, track_sources, spec, b).await?,
+ StreamFormat::HlsVariant => hls_variant_stream(node, track_sources, spec, b).await?,
StreamFormat::Jhls => bail!("unsupported"),
- StreamFormat::Segment => stream_segment(node, track_sources, spec, b).await?,
+ StreamFormat::Segment => segment_stream(node, track_sources, spec, b).await?,
}
Ok(a)
diff --git a/stream/src/segment.rs b/stream/src/segment.rs
index d09d357..77f1238 100644
--- a/stream/src/segment.rs
+++ b/stream/src/segment.rs
@@ -10,7 +10,7 @@ use log::warn;
use tokio::io::DuplexStream;
use tokio_util::io::SyncIoBridge;
-pub async fn stream_segment(
+pub async fn segment_stream(
node: Node,
track_sources: Vec<LocalTrack>,
spec: StreamSpec,