aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--remuxer/src/muxers/matroska.rs71
-rw-r--r--remuxer/src/muxers/mod.rs38
-rw-r--r--remuxer/src/muxers/mpeg4.rs29
-rw-r--r--stream/src/dash.rs190
-rw-r--r--stream/src/fragment.rs184
-rw-r--r--stream/src/hls.rs9
-rw-r--r--stream/src/lib.rs31
-rw-r--r--stream/src/stream_info.rs3
-rw-r--r--stream/types/src/lib.rs71
-rw-r--r--transcoder/src/fragment.rs80
10 files changed, 524 insertions, 182 deletions
diff --git a/remuxer/src/muxers/matroska.rs b/remuxer/src/muxers/matroska.rs
index e5ce311..319ab21 100644
--- a/remuxer/src/muxers/matroska.rs
+++ b/remuxer/src/muxers/matroska.rs
@@ -7,10 +7,10 @@
use crate::muxers::FragmentMuxer;
use anyhow::Result;
use std::io::Write;
-use winter_ebml::{EbmlHeader, EbmlToVec};
+use winter_ebml::{Ebml, EbmlHeader, EbmlToVec, write_vint_slice};
use winter_matroska::{MatroskaFile, Segment};
-fn write_fragment_shared(out: &mut dyn Write, mut segment: Segment, webm: bool) -> Result<()> {
+fn write_init_shared(out: &mut dyn Write, mut segment: Segment, webm: bool) -> Result<()> {
segment.info.muxing_app =
concat!(env!("CARGO_PKG_NAME"), "-", env!("CARGO_PKG_VERSION")).to_string();
if webm && let Some(tracks) = &mut segment.tracks {
@@ -25,32 +25,65 @@ fn write_fragment_shared(out: &mut dyn Write, mut segment: Segment, webm: bool)
}
}
- let file = MatroskaFile {
- ebml_header: EbmlHeader {
- ebml_version: 1,
- ebml_read_version: 1,
- ebml_max_id_length: 4,
- ebml_max_size_length: 8,
- doc_type: if webm { "webm" } else { "matroska" }.to_string(),
- doc_type_version: 4,
- doc_type_read_version: 2,
- ..Default::default()
- },
- segment,
+ let header = EbmlHeader {
+ ebml_version: 1,
+ ebml_read_version: 1,
+ ebml_max_id_length: 4,
+ ebml_max_size_length: 8,
+ doc_type: if webm { "webm" } else { "matroska" }.to_string(),
+ doc_type_version: 4,
+ doc_type_read_version: 2,
+ ..Default::default()
};
- out.write_all(&file.to_vec())?;
+
+ let mut buf = Vec::new();
+ write_vint_slice(&mut buf, MatroskaFile::TAG_EBML_HEADER);
+ write_vint_slice(&mut buf, header.size() as u64);
+ header.write(&mut buf);
+
+ write_vint_slice(&mut buf, MatroskaFile::TAG_SEGMENT);
+ write_vint_slice(&mut buf, 0x00ff_ffff_ffff_ffff);
+
+ write_vint_slice(&mut buf, Segment::TAG_INFO);
+ write_vint_slice(&mut buf, segment.info.size() as u64);
+ segment.info.write(&mut buf);
+
+ if let Some(tracks) = segment.tracks {
+ write_vint_slice(&mut buf, Segment::TAG_TRACKS);
+ write_vint_slice(&mut buf, tracks.size() as u64);
+ tracks.write(&mut buf);
+ }
+
+ out.write_all(&buf)?;
+ Ok(())
+}
+fn write_frag_shared(out: &mut dyn Write, segment: Segment, _webm: bool) -> Result<()> {
+ for cluster in segment.clusters {
+ let cluster = cluster.to_vec();
+ let mut buf = Vec::new();
+ write_vint_slice(&mut buf, Segment::TAG_CLUSTERS);
+ write_vint_slice(&mut buf, cluster.len() as u64);
+ out.write_all(&buf)?;
+ out.write_all(&cluster)?;
+ }
Ok(())
}
pub struct MatroskaFragmentMuxer;
impl FragmentMuxer for MatroskaFragmentMuxer {
- fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> {
- write_fragment_shared(out, segment, false)
+ fn write_init(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ write_init_shared(out, segment, false)
+ }
+ fn write_frag(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ write_frag_shared(out, segment, false)
}
}
pub struct WebmFragmentMuxer;
impl FragmentMuxer for WebmFragmentMuxer {
- fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> {
- write_fragment_shared(out, segment, true)
+ fn write_init(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ write_init_shared(out, segment, true)
+ }
+ fn write_frag(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ write_frag_shared(out, segment, true)
}
}
diff --git a/remuxer/src/muxers/mod.rs b/remuxer/src/muxers/mod.rs
index d7e8e49..9349b77 100644
--- a/remuxer/src/muxers/mod.rs
+++ b/remuxer/src/muxers/mod.rs
@@ -19,18 +19,40 @@ use std::io::Write;
use winter_matroska::Segment;
pub trait FragmentMuxer {
- fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()>;
+ fn write_init(out: &mut dyn Write, segment: Segment) -> Result<()>;
+ fn write_frag(out: &mut dyn Write, segment: Segment) -> Result<()>;
}
-pub fn write_fragment(
+pub fn write_init(container: ContainerFormat, out: &mut dyn Write, segment: Segment) -> Result<()> {
+ match container {
+ ContainerFormat::Matroska => MatroskaFragmentMuxer::write_init(out, segment),
+ ContainerFormat::Webm => WebmFragmentMuxer::write_init(out, segment),
+ ContainerFormat::Mpeg4 => Mpeg4FragmentMuxer::write_init(out, segment),
+ _ => unimplemented!(),
+ }
+}
+pub fn write_frag(container: ContainerFormat, out: &mut dyn Write, segment: Segment) -> Result<()> {
+ match container {
+ ContainerFormat::Matroska => MatroskaFragmentMuxer::write_frag(out, segment),
+ ContainerFormat::Webm => WebmFragmentMuxer::write_frag(out, segment),
+ ContainerFormat::Mpeg4 => Mpeg4FragmentMuxer::write_frag(out, segment),
+ _ => unimplemented!(),
+ }
+}
+pub fn write_init_frag(
container: ContainerFormat,
out: &mut dyn Write,
segment: Segment,
) -> Result<()> {
- match container {
- ContainerFormat::Matroska => MatroskaFragmentMuxer::write_fragment(out, segment),
- ContainerFormat::Webm => WebmFragmentMuxer::write_fragment(out, segment),
- ContainerFormat::Mpeg4 => Mpeg4FragmentMuxer::write_fragment(out, segment),
- _ => unimplemented!(),
- }
+ write_init(
+ container,
+ out,
+ Segment {
+ info: segment.info.clone(),
+ tracks: segment.tracks.clone(),
+ ..Default::default()
+ },
+ )?;
+ write_frag(container, out, segment)?;
+ Ok(())
}
diff --git a/remuxer/src/muxers/mpeg4.rs b/remuxer/src/muxers/mpeg4.rs
index 2c39c35..b4c8fa8 100644
--- a/remuxer/src/muxers/mpeg4.rs
+++ b/remuxer/src/muxers/mpeg4.rs
@@ -4,34 +4,17 @@
Copyright (C) 2026 metamuffin <metamuffin.org>
*/
-use crate::muxers::{FragmentMuxer, matroska::MatroskaFragmentMuxer};
+use crate::muxers::FragmentMuxer;
use anyhow::Result;
-use std::{
- io::{Cursor, Write, copy},
- process::{Command, Stdio},
- thread::spawn,
-};
+use std::io::Write;
use winter_matroska::Segment;
pub struct Mpeg4FragmentMuxer;
impl FragmentMuxer for Mpeg4FragmentMuxer {
- fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> {
- let mut mk_frag = Vec::new();
- MatroskaFragmentMuxer::write_fragment(&mut mk_frag, segment)?;
-
- let mut child = Command::new("ffmpeg")
- .args(
- "-hide_banner -f matroska -i pipe:0 -c copy -f mp4 -movflags frag_keyframe+empty_moov pipe:1"
- .split(" "),
- )
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .spawn()?;
-
- let mut stdin = child.stdin.take().unwrap();
- let mut stdout = child.stdout.take().unwrap();
- spawn(move || copy(&mut Cursor::new(mk_frag), &mut stdin));
- copy(&mut stdout, out)?;
+ fn write_init(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ Ok(())
+ }
+ fn write_frag(out: &mut dyn Write, segment: Segment) -> Result<()> {
Ok(())
}
}
diff --git a/stream/src/dash.rs b/stream/src/dash.rs
new file mode 100644
index 0000000..17fe43e
--- /dev/null
+++ b/stream/src/dash.rs
@@ -0,0 +1,190 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2026 metamuffin <metamuffin.org>
+*/
+
+use crate::{SMediaInfo, fragment_index::fragment_index, stream_info};
+use anyhow::Result;
+use jellystream_types::{StreamContainer, StreamFormatInfo, TrackKind};
+use std::{
+ fmt::{Display, Write},
+ io::{Cursor, Read},
+ ops::Range,
+};
+
+pub fn dash(sinfo: &SMediaInfo) -> Result<Box<dyn Read + Send + Sync>> {
+ let (_iinfo, info) = stream_info(&sinfo)?;
+
+ let mut out = String::new();
+
+ writeln!(
+ out,
+ "<?xml version=\"1.0\" encoding=\"utf-8\"?> \
+ <MPD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
+ xmlns=\"urn:mpeg:dash:schema:mpd:2011\" \
+ xmlns:xlink=\"http://www.w3.org/1999/xlink\" \
+ xsi:schemaLocation=\"urn:mpeg:DASH:schema:MPD:2011 http://standards.iso.org/ittf/PubliclyAvailableStandards/MPEG-DASH_schema_files/DASH-MPD.xsd\" \
+ profiles=\"urn:mpeg:dash:profile:isoff-live:2011\" \
+ type=\"static\" \
+ mediaPresentationDuration=\"{}\" \
+ maxSegmentDuration=\"PT5.0S\" \
+ minBufferTime=\"PT10.4S\">",
+ Time(info.duration)
+ )?;
+
+ writeln!(out, "<ProgramInformation></ProgramInformation>")?;
+ writeln!(out, "<ServiceDescription id=\"0\"></ServiceDescription>")?;
+ writeln!(out, r#"<Period id="0" start="PT0.0S">"#)?;
+ for (as_id, track) in info.tracks.iter().enumerate() {
+ let frags = fragment_index(&sinfo, as_id)?;
+ match track.kind {
+ TrackKind::Video => {
+ let max_width = track
+ .formats
+ .iter()
+ .flat_map(|f| f.width)
+ .max()
+ .unwrap_or_default();
+ let max_height = track
+ .formats
+ .iter()
+ .flat_map(|f| f.height)
+ .max()
+ .unwrap_or_default();
+ let framerate = "2997/1000";
+ let par = "16:9"; // TODO
+ writeln!(
+ out,
+ "<AdaptationSet \
+ id=\"{as_id}\" \
+ contentType=\"video\" \
+ startWithSAP=\"1\" \
+ segmentAlignment=\"true\" \
+ bitstreamSwitching=\"true\" \
+ frameRate=\"{framerate}\" \
+ maxWidth=\"{max_width}\" \
+ maxHeight=\"{max_height}\" \
+ par=\"{par}\" \
+ lang=\"eng\">"
+ )?;
+ for (repr_id, format) in track.formats.iter().enumerate() {
+ let StreamFormatInfo {
+ width: Some(width),
+ height: Some(height),
+ bitrate,
+ ..
+ } = format
+ else {
+ unreachable!()
+ };
+ let container = StreamContainer::WebM;
+ let container_mime = container.mime_type();
+ let codec_param = &format.codec_param;
+ writeln!(
+ out,
+ "<Representation \
+ id=\"{repr_id}\" \
+ mimeType=\"{container_mime}\" \
+ codecs=\"{codec_param}\" \
+ bandwidth=\"{bitrate}\" \
+ width=\"{width}\" \
+ height=\"{height}\" \
+ scanType=\"unknown\" \
+ sar=\"1:1\">"
+ )?;
+ write_segment_template(&mut out, as_id, container, &frags)?;
+ writeln!(out, "</Representation>")?;
+ }
+ writeln!(out, "</AdaptationSet>")?;
+ }
+ TrackKind::Audio => {
+ writeln!(
+ out,
+ "<AdaptationSet \
+ id=\"{as_id}\" \
+ contentType=\"audio\" \
+ startWithSAP=\"1\" \
+ segmentAlignment=\"true\" \
+ bitstreamSwitching=\"true\">"
+ )?;
+ for (repr_id, format) in track.formats.iter().enumerate() {
+ let StreamFormatInfo {
+ bitrate,
+ samplerate: Some(samplerate),
+ ..
+ } = format
+ else {
+ unreachable!()
+ };
+ let container = StreamContainer::WebM;
+ let container_mime = container.mime_type();
+ let codec_param = &format.codec_param;
+ writeln!(
+ out,
+ "<Representation \
+ id=\"{repr_id}\" \
+ mimeType=\"{container_mime}\" \
+ codecs=\"{codec_param}\" \
+ bandwidth=\"{bitrate}\" \
+ audioSamplingRate=\"{samplerate:.0}\">"
+ )?;
+ write_segment_template(&mut out, as_id, container, &frags)?;
+ writeln!(out, "</Representation>")?;
+ }
+ writeln!(out, "</AdaptationSet>")?;
+ }
+ TrackKind::Subtitle => (),
+ }
+ }
+ writeln!(out, r#"</Period>"#)?;
+
+ writeln!(out, r#"</MPD>"#)?;
+
+ Ok(Box::new(Cursor::new(out)))
+}
+
+fn write_segment_template(
+ out: &mut String,
+ as_id: usize,
+ container: StreamContainer,
+ frags: &[Range<f64>],
+) -> Result<()> {
+ writeln!(
+ out,
+ "<SegmentTemplate \
+ timescale=\"1000\" \
+ initialization=\"stream?fragmentinit&amp;t={as_id}&amp;c={container}&amp;f=$RepresentationID$\" \
+ media=\"stream?fragment&amp;t={as_id}&amp;c={container}&amp;f=$RepresentationID$&amp;i=$Number$\" \
+ startNumber=\"0\">"
+ )?;
+ writeln!(out, "{}", Timeline(&frags))?;
+ writeln!(out, "</SegmentTemplate>")?;
+ Ok(())
+}
+
+struct Time(f64);
+impl Display for Time {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "PT{:.01}S", self.0)
+ }
+}
+struct Timeline<'a>(&'a [Range<f64>]);
+impl Display for Timeline<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ writeln!(f, "<SegmentTimeline>")?;
+ let mut last_t = 0;
+ for (i, r) in self.0.iter().enumerate() {
+ let t = (r.start * 1000.) as i64;
+ let d = t - last_t;
+ last_t = t;
+ if i == 0 {
+ writeln!(f, r#"<S t="0" d="{d}" />"#)?;
+ } else {
+ writeln!(f, r#"<S d="{d}" />"#)?;
+ }
+ }
+ writeln!(f, "</SegmentTimeline>")?;
+ Ok(())
+ }
+}
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index 62bb0b6..e759c86 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,41 +3,35 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2026 metamuffin <metamuffin.org>
*/
-use crate::{
- SMediaInfo,
- cues::{GeneratedCue, generate_cues},
- stream_info,
-};
-use anyhow::{Result, anyhow};
+use crate::{SMediaInfo, cues::generate_cues, stream_info};
+use anyhow::{Ok, Result, anyhow};
use jellycache::HashKey;
use jellyremuxer::{
ContainerFormat,
demuxers::create_demuxer_autodetect,
- matroska::{self, Segment},
- muxers::write_fragment,
+ matroska::{self, Info, Segment},
+ muxers::{write_frag, write_init},
};
use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum};
use jellytranscoder::fragment::transcode;
use std::{
fs::File,
io::{Cursor, Read},
+ path::Path,
sync::Arc,
};
-pub fn fragment_stream(
- sinfo: Arc<SMediaInfo>,
+pub fn fragment_init(
+ sinfo: &SMediaInfo,
track: TrackNum,
- index: IndexNum,
format_num: FormatNum,
- container: StreamContainer,
-) -> Result<Box<dyn Read + Send + Sync>> {
+) -> Result<Segment> {
let (iinfo, info) = stream_info(&sinfo)?;
let (file_index, track_num) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("track not found"))?;
- let media_path = iinfo.paths[file_index].clone();
let track = info.tracks.get(track).ok_or(anyhow!("track not found"))?;
let format = track
.formats
@@ -52,33 +46,49 @@ pub fn fragment_stream(
.iter()
.find(|t| t.track_number == track_num)
.unwrap();
-
let timestamp_scale = iinfo.metadata[file_index].info.timestamp_scale;
- let total_duration = iinfo.metadata[file_index].info.duration;
+ let duration = iinfo.metadata[file_index].info.duration;
+
+ if !format.remux {}
+
+ Ok(Segment {
+ tracks: Some(matroska::Tracks {
+ entries: vec![mk_track.to_owned()],
+ }),
+ info: Info {
+ duration,
+ timestamp_scale,
+ ..Default::default()
+ },
+ ..Default::default()
+ })
+}
+
+pub fn fragment_init_stream(
+ sinfo: Arc<SMediaInfo>,
+ track: TrackNum,
+ format_num: FormatNum,
+ container: StreamContainer,
+) -> Result<Box<dyn Read + Send + Sync>> {
+ let init = fragment_init(&sinfo, track, format_num)?;
+ let mut buf = Vec::new();
+ write_init(map_container(container), &mut buf, init)?;
+ Ok(Box::new(Cursor::new(buf)))
+}
+
+pub fn fragment_remux(
+ sinfo: &SMediaInfo,
+ media_path: &Path,
+ file_track_num: u64,
+ index: IndexNum,
+ next_kf: bool,
+) -> Result<Segment> {
let cue_stat = generate_cues(&sinfo.cache, &media_path)?;
let start_cue = cue_stat
.cues
.get(index)
.ok_or(anyhow!("fragment index out of range"))?;
- let end_cue = cue_stat
- .cues
- .get(index + 1)
- .copied()
- .unwrap_or(GeneratedCue {
- position: 0,
- time: total_duration.unwrap_or_default() as u64 * timestamp_scale, // TODO rounding?
- });
let cluster_offset = start_cue.position;
- let duration = (end_cue.time - start_cue.time) as f64 / timestamp_scale as f64;
-
- let mk_info = matroska::Info {
- duration: Some(duration),
- timestamp_scale,
- ..Default::default()
- };
- let mk_tracks = matroska::Tracks {
- entries: vec![mk_track.to_owned()],
- };
let (mut cluster, next_cluster) = {
let media_file = File::open(&media_path)?;
@@ -90,49 +100,93 @@ pub fn fragment_stream(
.read_cluster()?
.ok_or(anyhow!("cluster unexpectedly missing"))?
.1,
- media.read_cluster()?.map(|(_, x)| x),
+ next_kf
+ .then(|| Ok(media.read_cluster()?.map(|(_, x)| x)))
+ .transpose()?
+ .flatten(),
)
};
- cluster.simple_blocks.retain(|b| b.track == track_num);
- cluster.block_groups.retain(|b| b.block.track == track_num);
- let next_kf = next_cluster.and_then(|x| {
- x.simple_blocks
- .iter()
- .find(|b| b.track == track_num)
- .cloned()
- });
+ cluster.simple_blocks.retain(|b| b.track == file_track_num);
+ cluster
+ .block_groups
+ .retain(|b| b.block.track == file_track_num);
- let jr_container = match container {
- StreamContainer::WebM => ContainerFormat::Webm,
- StreamContainer::Matroska => ContainerFormat::Matroska,
- StreamContainer::WebVTT => todo!(),
- StreamContainer::MPEG4 => ContainerFormat::Mpeg4,
- StreamContainer::JVTT => todo!(),
- };
+ let mut clusters = vec![cluster];
+ if let Some(next_cluster) = next_cluster {
+ clusters.push(next_cluster);
+ }
- let mut segment = Segment {
- info: mk_info,
- tracks: Some(mk_tracks),
- clusters: vec![cluster],
+ Ok(Segment {
+ clusters,
..Default::default()
- };
- segment.info.writing_app =
- concat!(env!("CARGO_PKG_NAME"), "-", env!("CARGO_PKG_VERSION")).to_string();
+ })
+}
- if !format.remux {
- segment = transcode(
+pub fn fragment_stream(
+ sinfo: Arc<SMediaInfo>,
+ track_num: TrackNum,
+ index: IndexNum,
+ format_num: FormatNum,
+ container: StreamContainer,
+) -> Result<Box<dyn Read + Send + Sync>> {
+ let (iinfo, info) = stream_info(&sinfo)?;
+
+ let (file_index, file_track_num) = *iinfo
+ .track_to_file
+ .get(track_num)
+ .ok_or(anyhow!("track not found"))?;
+ let media_path = iinfo.paths[file_index].clone();
+ let track = info
+ .tracks
+ .get(track_num)
+ .ok_or(anyhow!("track not found"))?;
+ let format = track
+ .formats
+ .get(format_num)
+ .ok_or(anyhow!("format not found"))?;
+
+ let segment = if format.remux {
+ fragment_remux(&sinfo, &media_path, file_track_num, index, false)?
+ } else {
+ let mk_track = iinfo.metadata[file_index]
+ .tracks
+ .as_ref()
+ .unwrap()
+ .entries
+ .iter()
+ .find(|t| t.track_number == file_track_num)
+ .unwrap();
+
+ transcode(
&sinfo.cache,
&sinfo.config.transcoder,
track.kind,
- &format!("{}-T{track_num}-I{index}", HashKey(media_path)),
+ &format!("{}-T{file_track_num}-I{index}", HashKey(&media_path)),
format,
- segment,
- next_kf,
- )?;
- }
+ mk_track,
+ || {
+ let init = fragment_init(&sinfo, track_num, format_num)?;
+ let seg = fragment_remux(&sinfo, &media_path, file_track_num, index, true)?;
+ Ok(Segment {
+ clusters: seg.clusters,
+ ..init
+ })
+ },
+ )?
+ };
let mut out = Vec::new();
- write_fragment(jr_container, &mut out, segment)?;
+ write_frag(map_container(container), &mut out, segment)?;
Ok(Box::new(Cursor::new(out)))
}
+
+fn map_container(container: StreamContainer) -> ContainerFormat {
+ match container {
+ StreamContainer::WebM => ContainerFormat::Webm,
+ StreamContainer::Matroska => ContainerFormat::Matroska,
+ StreamContainer::WebVTT => todo!(),
+ StreamContainer::MPEG4 => ContainerFormat::Mpeg4,
+ StreamContainer::JVTT => todo!(),
+ }
+}
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 9414877..571d2b3 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -13,10 +13,7 @@ use std::{
ops::Range,
};
-pub fn hls_multivariant_stream(
- info: &SMediaInfo,
- container: StreamContainer,
-) -> Result<Box<dyn Read + Send + Sync>> {
+pub fn hls_multivariant_stream(info: &SMediaInfo) -> Result<Box<dyn Read + Send + Sync>> {
let (_iinfo, info) = stream_info(&info)?;
let mut out = String::new();
@@ -29,7 +26,6 @@ pub fn hls_multivariant_stream(
"stream{}",
StreamSpec::HlsVariant {
track: i,
- container,
format: j
}
.to_query()
@@ -55,7 +51,6 @@ pub fn hls_variant_stream(
info: &SMediaInfo,
track: TrackNum,
format: FormatNum,
- container: StreamContainer,
) -> Result<Box<dyn Read + Send + Sync>> {
let frags = fragment_index(&info, track)?;
let (_, info) = stream_info(&info)?;
@@ -75,7 +70,7 @@ pub fn hls_variant_stream(
StreamSpec::Fragment {
track,
index,
- container,
+ container: StreamContainer::MPEG4,
format,
}
.to_query()
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index a94f4c7..3f6e93f 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -5,6 +5,7 @@
*/
#![feature(iterator_try_collect)]
pub mod cues;
+pub mod dash;
mod fragment;
mod fragment_index;
mod hls;
@@ -17,7 +18,7 @@ use fragment::fragment_stream;
use fragment_index::fragment_index_stream;
use hls::{hls_multivariant_stream, hls_variant_stream};
use jellycache::Cache;
-use jellystream_types::{StreamContainer, StreamSpec};
+use jellystream_types::StreamSpec;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeSet,
@@ -29,6 +30,8 @@ use std::{
};
use stream_info::{stream_info, write_stream_info};
+use crate::{dash::dash, fragment::fragment_init_stream};
+
#[rustfmt::skip]
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct Config {
@@ -53,24 +56,18 @@ pub struct StreamHead {
}
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- use StreamContainer::*;
use StreamSpec::*;
- let container_ct = |x: StreamContainer| match x {
- WebM => "video/webm",
- Matroska => "video/x-matroska",
- WebVTT => "text/vtt",
- JVTT => "application/jellything-vtt+json",
- MPEG4 => "video/mp4",
- };
let range_supported = matches!(spec, Remux { .. } | Original { .. });
let content_type = match spec {
Original { .. } => "video/x-matroska",
HlsMultiVariant { .. } => "application/vnd.apple.mpegurl",
HlsVariant { .. } => "application/vnd.apple.mpegurl",
Info => "application/jellything-stream-info+json",
+ Dash => "application/dash+xml",
FragmentIndex { .. } => "application/jellything-frag-index+json",
- Fragment { container, .. } => container_ct(*container),
- Remux { container, .. } => container_ct(*container),
+ FragmentInit { container, .. } => container.mime_type(),
+ Fragment { container, .. } => container.mime_type(),
+ Remux { container, .. } => container.mime_type(),
};
StreamHead {
content_type,
@@ -85,14 +82,16 @@ pub fn stream(
) -> Result<Box<dyn Read + Send + Sync>> {
match spec {
StreamSpec::Original { track } => original_stream(info, track, range),
- StreamSpec::HlsMultiVariant { container } => hls_multivariant_stream(&info, container),
- StreamSpec::HlsVariant {
+ StreamSpec::HlsMultiVariant => hls_multivariant_stream(&info),
+ StreamSpec::HlsVariant { track, format } => hls_variant_stream(&info, track, format),
+ StreamSpec::Info => write_stream_info(&info),
+ StreamSpec::Dash => dash(&info),
+ StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track),
+ StreamSpec::FragmentInit {
track,
container,
format,
- } => hls_variant_stream(&info, track, format, container),
- StreamSpec::Info => write_stream_info(&info),
- StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track),
+ } => fragment_init_stream(info, track, format, container),
StreamSpec::Fragment {
track,
index,
diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs
index 3312c71..4a2896f 100644
--- a/stream/src/stream_info.rs
+++ b/stream/src/stream_info.rs
@@ -75,6 +75,7 @@ fn stream_formats(config: &Config, t: &TrackEntry, remux_bitrate: f64) -> Vec<St
let mut formats = Vec::new();
formats.push(StreamFormatInfo {
codec: t.codec_id.to_string(),
+ codec_param: String::new(), // TODO
remux: true,
bitrate: remux_bitrate,
containers: containers_by_codec(&t.codec_id),
@@ -112,6 +113,7 @@ fn stream_formats(config: &Config, t: &TrackEntry, remux_bitrate: f64) -> Vec<St
if enable {
formats.push(StreamFormatInfo {
codec: cid.to_string(),
+ codec_param: String::new(), // TODO
bitrate: (remux_bitrate * 3.).min(br),
remux: false,
containers: containers_by_codec(cid),
@@ -129,6 +131,7 @@ fn stream_formats(config: &Config, t: &TrackEntry, remux_bitrate: f64) -> Vec<St
for br in [256e3, 128e3, 64e3] {
formats.push(StreamFormatInfo {
codec: "A_OPUS".to_string(),
+ codec_param: "opus".to_string(),
bitrate: br,
remux: false,
containers: containers_by_codec("A_OPUS"),
diff --git a/stream/types/src/lib.rs b/stream/types/src/lib.rs
index edafe00..d1583e9 100644
--- a/stream/types/src/lib.rs
+++ b/stream/types/src/lib.rs
@@ -26,18 +26,21 @@ pub enum StreamSpec {
Original {
track: TrackNum,
},
- HlsMultiVariant {
- container: StreamContainer,
- },
+ HlsMultiVariant,
HlsVariant {
track: TrackNum,
- container: StreamContainer,
format: FormatNum,
},
+ Dash,
Info,
FragmentIndex {
track: TrackNum,
},
+ FragmentInit {
+ track: TrackNum,
+ container: StreamContainer,
+ format: FormatNum,
+ },
Fragment {
track: TrackNum,
index: IndexNum,
@@ -77,6 +80,7 @@ pub enum TrackKind {
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct StreamFormatInfo {
pub codec: String,
+ pub codec_param: String,
pub bitrate: f64,
pub remux: bool,
pub containers: Vec<StreamContainer>,
@@ -98,6 +102,18 @@ pub enum StreamContainer {
JVTT,
}
+impl StreamContainer {
+ pub fn mime_type(&self) -> &'static str {
+ match self {
+ Self::WebM => "video/webm",
+ Self::Matroska => "video/x-matroska",
+ Self::WebVTT => "text/vtt",
+ Self::JVTT => "application/jellything-vtt+json",
+ Self::MPEG4 => "video/mp4",
+ }
+ }
+}
+
impl Display for TrackKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
@@ -122,18 +138,22 @@ impl StreamSpec {
)
}
StreamSpec::Original { track } => format!("?original&track={track}"),
- StreamSpec::HlsMultiVariant { container } => {
- format!("?hlsmultivariant&container={container}")
+ StreamSpec::HlsMultiVariant => {
+ format!("?hlsmultivariant")
+ }
+ StreamSpec::HlsVariant { track, format } => {
+ format!("?hlsvariant&track={track}&format={format}")
}
- StreamSpec::HlsVariant {
- track,
- container,
- format,
- } => format!("?hlsvariant&track={track}&container={container}&format={format}"),
StreamSpec::Info => "?info".to_string(),
+ StreamSpec::Dash => "?info".to_string(),
StreamSpec::FragmentIndex { track } => {
format!("?fragmentindex&track={track}")
}
+ StreamSpec::FragmentInit {
+ track,
+ container,
+ format,
+ } => format!("?fragmentinit&track={track}&container={container}&format={format}"),
StreamSpec::Fragment {
track,
index,
@@ -157,18 +177,22 @@ impl StreamSpec {
)
}
StreamSpec::Original { track } => format!("?original&t={track}"),
- StreamSpec::HlsMultiVariant { container } => {
- format!("?hlsmultivariant&c={container}")
+ StreamSpec::HlsMultiVariant => {
+ format!("?hlsmultivariant")
+ }
+ StreamSpec::HlsVariant { track, format } => {
+ format!("?hlsvariant&t={track}&f={format}")
}
- StreamSpec::HlsVariant {
- track,
- container,
- format,
- } => format!("?hlsvariant&t={track}&c={container}&f={format}"),
StreamSpec::Info => "?info".to_string(),
+ StreamSpec::Dash => "?dash".to_string(),
StreamSpec::FragmentIndex { track } => {
format!("?fragmentindex&t={track}")
}
+ StreamSpec::FragmentInit {
+ track,
+ container,
+ format,
+ } => format!("?fragmentinit&t={track}&c={container}&f={format}"),
StreamSpec::Fragment {
track,
index,
@@ -194,14 +218,19 @@ impl StreamSpec {
};
if query.contains_key("info") {
Ok(Self::Info)
+ } else if query.contains_key("dash") {
+ Ok(Self::Dash)
} else if query.contains_key("hlsmultivariant") {
- Ok(Self::HlsMultiVariant {
- container: get_container()?,
- })
+ Ok(Self::HlsMultiVariant)
} else if query.contains_key("hlsvariant") {
Ok(Self::HlsVariant {
track: get_num("track", "t")? as TrackNum,
format: get_num("format", "f")? as FormatNum,
+ })
+ } else if query.contains_key("fragmentinit") {
+ Ok(Self::FragmentInit {
+ track: get_num("track", "t")? as TrackNum,
+ format: get_num("format", "f")? as FormatNum,
container: get_container()?,
})
} else if query.contains_key("fragment") {
diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs
index 85abbd5..b6c3b2d 100644
--- a/transcoder/src/fragment.rs
+++ b/transcoder/src/fragment.rs
@@ -6,7 +6,7 @@
use crate::{Config, LOCAL_VIDEO_TRANSCODING_TASKS};
use anyhow::Result;
use jellycache::{Cache, HashKey};
-use jellyremuxer::{ContainerFormat, demuxers::create_demuxer, muxers::write_fragment};
+use jellyremuxer::{ContainerFormat, demuxers::create_demuxer, muxers::write_init_frag};
use jellystream_types::{StreamFormatInfo, TrackKind};
use log::info;
use std::{
@@ -15,30 +15,61 @@ use std::{
process::{Command, Stdio},
thread::spawn,
};
-use winter_matroska::{Cluster, Segment, TrackEntry as MatroskaTrackEntry, block::Block};
+use winter_matroska::{Segment, TrackEntry as MatroskaTrackEntry};
// TODO odd video resolutions can cause errors when transcoding to YUV42{0,2}
// TODO with an implementation that cant handle it (SVT-AV1 is such an impl).
-pub fn transcode(
+pub fn transcode_init(
cache: &Cache,
config: &Config,
kind: TrackKind,
input_key: &str,
output_format: &StreamFormatInfo,
- mut input: Segment,
- next_kf: Option<Block>,
) -> Result<Segment> {
let command = transcode_command(
kind,
- &input.tracks.as_ref().unwrap().entries[0],
+ &MatroskaTrackEntry::default(),
output_format,
+ true,
config,
- )
- .unwrap();
+ )?;
+ let output = cache.cache(
+ &format!(
+ "transcode/media-fragment/{input_key}-{}.mkv",
+ HashKey(&command)
+ ),
+ || {
+ info!("init encode with {command:?}");
+ let mut args = command.split(" ");
+ let proc = Command::new(args.next().unwrap())
+ .stdout(Stdio::piped())
+ .args(args)
+ .spawn()?;
+ Ok(proc.wait_with_output()?.exit_ok()?.stdout)
+ },
+ )?;
- let input_duration = input.info.duration;
- let had_next_kf = next_kf.is_some();
+ let mut demuxer = create_demuxer(ContainerFormat::Matroska, Box::new(Cursor::new(output)));
+ let info = demuxer.info()?;
+ let tracks = demuxer.tracks()?;
+ Ok(Segment {
+ info,
+ tracks,
+ ..Default::default()
+ })
+}
+
+pub fn transcode(
+ cache: &Cache,
+ config: &Config,
+ kind: TrackKind,
+ input_key: &str,
+ output_format: &StreamFormatInfo,
+ input_track: &MatroskaTrackEntry,
+ segment: impl FnOnce() -> Result<Segment>,
+) -> Result<Segment> {
+ let command = transcode_command(kind, &input_track, output_format, false, config).unwrap();
let output = cache.cache(
&format!(
@@ -47,6 +78,7 @@ pub fn transcode(
),
|| {
let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.lock().unwrap();
+ let input = segment()?;
info!("encoding with {command:?}");
let mut args = command.split(" ");
let mut proc = Command::new(args.next().unwrap())
@@ -59,11 +91,7 @@ pub fn transcode(
let mut stdout = proc.stdout.take().unwrap();
spawn(move || {
- input.clusters.extend(next_kf.map(|kf| Cluster {
- simple_blocks: vec![kf],
- ..Default::default()
- }));
- write_fragment(ContainerFormat::Matroska, &mut stdin, input).unwrap(); // TODO
+ write_init_frag(ContainerFormat::Matroska, &mut stdin, input).unwrap();
stdin.flush().unwrap();
drop(stdin);
});
@@ -78,18 +106,19 @@ pub fn transcode(
let mut demuxer = create_demuxer(ContainerFormat::Matroska, Box::new(Cursor::new(output)));
- let mut info = demuxer.info()?;
- info.duration = input_duration;
+ let info = demuxer.info()?;
let tracks = demuxer.tracks()?;
let mut clusters = Vec::new();
while let Some((_, cluster)) = demuxer.read_cluster()? {
clusters.push(cluster);
}
- if had_next_kf {
- if let Some(c) = clusters.last_mut() {
- c.simple_blocks.pop().expect("empty last cluster");
- }
- clusters.retain(|c| !c.simple_blocks.is_empty() || !c.block_groups.is_empty());
+
+ //? Remove extra kf hack
+ if clusters
+ .last()
+ .map_or(false, |c| c.simple_blocks.len() == 1)
+ {
+ clusters.pop();
}
Ok(Segment {
@@ -104,6 +133,7 @@ fn transcode_command(
kind: TrackKind,
orig_metadata: &MatroskaTrackEntry,
format: &StreamFormatInfo,
+ dummy: bool,
config: &Config,
) -> Result<String> {
let br = format.bitrate as u64;
@@ -120,7 +150,11 @@ fn transcode_command(
write!(o, "-afbc rga ")?;
}
- write!(o, "-f matroska -i pipe:0 -copyts ")?;
+ if dummy {
+ write!(o, "-f lavfi -i testsrc2 -to 1 ")?;
+ } else {
+ write!(o, "-f matroska -i pipe:0 -copyts ")?;
+ }
if config.enable_rkrga {
write!(o, "-vf scale_rkrga=w={w}:h={h}:format=nv12:afbc=1 ")?;