aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-09-26 14:35:23 +0200
committermetamuffin <metamuffin@disroot.org>2025-09-26 14:35:23 +0200
commitf7e3841426ed4661ede6ccfff9c306141735d465 (patch)
tree793727cc80386bc1715869a3f54986e42897e656
parent81bb9dee3ed8a029bec831e7c3f204cd0ed41472 (diff)
downloadjellything-f7e3841426ed4661ede6ccfff9c306141735d465.tar
jellything-f7e3841426ed4661ede6ccfff9c306141735d465.tar.bz2
jellything-f7e3841426ed4661ede6ccfff9c306141735d465.tar.zst
refactor remuxer trait; add webm support; add back transcodingHEADmaster
-rw-r--r--cache/src/lib.rs4
-rw-r--r--remuxer/src/demuxers/mod.rs1
-rw-r--r--remuxer/src/lib.rs1
-rw-r--r--remuxer/src/muxers/matroska.rs51
-rw-r--r--remuxer/src/muxers/mod.rs26
-rw-r--r--remuxer/src/muxers/mpeg4.rs37
-rw-r--r--server/src/logic/stream.rs13
-rw-r--r--server/src/ui/error.rs9
-rw-r--r--stream/src/cues.rs2
-rw-r--r--stream/src/fragment.rs160
-rw-r--r--transcoder/src/fragment.rs69
-rw-r--r--transcoder/src/lib.rs2
12 files changed, 191 insertions, 184 deletions
diff --git a/cache/src/lib.rs b/cache/src/lib.rs
index 52245d6..115741c 100644
--- a/cache/src/lib.rs
+++ b/cache/src/lib.rs
@@ -127,10 +127,10 @@ thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::n
pub fn cache_file<Fun>(
kind: &str,
key: impl Hash,
- mut generate: Fun,
+ generate: Fun,
) -> Result<CachePath, anyhow::Error>
where
- Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>,
+ Fun: FnOnce(std::fs::File) -> Result<(), anyhow::Error>,
{
let (bucket, location) = cache_location(kind, key);
let loc_abs = location.abs();
diff --git a/remuxer/src/demuxers/mod.rs b/remuxer/src/demuxers/mod.rs
index f001250..597bf4a 100644
--- a/remuxer/src/demuxers/mod.rs
+++ b/remuxer/src/demuxers/mod.rs
@@ -40,6 +40,7 @@ pub fn create_demuxer(container: ContainerFormat, reader: Box<dyn ReadSeek>) ->
match container {
ContainerFormat::Matroska | ContainerFormat::Webm => Box::new(MatroskaDemuxer::new(reader)),
ContainerFormat::Flac => Box::new(FlacDemuxer::new(reader)),
+ ContainerFormat::Mpeg4 => todo!(),
}
}
pub fn create_demuxer_autodetect(
diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs
index 13ae06f..306121b 100644
--- a/remuxer/src/lib.rs
+++ b/remuxer/src/lib.rs
@@ -15,4 +15,5 @@ pub enum ContainerFormat {
Matroska,
Webm,
Flac,
+ Mpeg4,
}
diff --git a/remuxer/src/muxers/matroska.rs b/remuxer/src/muxers/matroska.rs
index 47210c9..c2f22e7 100644
--- a/remuxer/src/muxers/matroska.rs
+++ b/remuxer/src/muxers/matroska.rs
@@ -8,15 +8,25 @@ use crate::muxers::FragmentMuxer;
use anyhow::Result;
use std::io::Write;
use winter_ebml::{EbmlHeader, EbmlToVec};
-use winter_matroska::{Cluster, Info, MatroskaFile, Segment, Tracks};
+use winter_matroska::{MatroskaFile, Segment};
+
+fn write_fragment_shared(out: &mut dyn Write, mut segment: Segment, webm: bool) -> Result<()> {
+ segment.info.muxing_app =
+ concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")).to_string();
+ if webm {
+ if let Some(tracks) = &mut segment.tracks {
+ for track in &mut tracks.entries {
+ if let Some(video) = &mut track.video {
+ video.colour = None;
+ video.projection = None;
+ video.display_unit = 0; // pixels
+ video.display_width = Some(video.pixel_width);
+ video.display_height = Some(video.pixel_height);
+ }
+ }
+ }
+ }
-fn write_fragment_shared(
- out: &mut dyn Write,
- info: Info,
- tracks: Tracks,
- cluster: Cluster,
- webm: bool,
-) -> Result<()> {
let file = MatroskaFile {
ebml_header: EbmlHeader {
ebml_version: 1,
@@ -28,12 +38,7 @@ fn write_fragment_shared(
doc_type_read_version: 2,
..Default::default()
},
- segment: Segment {
- info,
- tracks: Some(tracks),
- clusters: vec![cluster],
- ..Default::default()
- },
+ segment,
};
out.write_all(&file.to_vec())?;
Ok(())
@@ -41,23 +46,13 @@ fn write_fragment_shared(
pub struct MatroskaFragmentMuxer;
impl FragmentMuxer for MatroskaFragmentMuxer {
- fn write_fragment(
- out: &mut dyn Write,
- info: Info,
- tracks: Tracks,
- cluster: Cluster,
- ) -> Result<()> {
- write_fragment_shared(out, info, tracks, cluster, false)
+ fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ write_fragment_shared(out, segment, false)
}
}
pub struct WebmFragmentMuxer;
impl FragmentMuxer for WebmFragmentMuxer {
- fn write_fragment(
- out: &mut dyn Write,
- info: Info,
- tracks: Tracks,
- cluster: Cluster,
- ) -> Result<()> {
- write_fragment_shared(out, info, tracks, cluster, true)
+ fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ write_fragment_shared(out, segment, true)
}
}
diff --git a/remuxer/src/muxers/mod.rs b/remuxer/src/muxers/mod.rs
index 8752373..ae544eb 100644
--- a/remuxer/src/muxers/mod.rs
+++ b/remuxer/src/muxers/mod.rs
@@ -5,36 +5,32 @@
*/
pub mod matroska;
+pub mod mpeg4;
use crate::{
ContainerFormat,
- muxers::matroska::{MatroskaFragmentMuxer, WebmFragmentMuxer},
+ muxers::{
+ matroska::{MatroskaFragmentMuxer, WebmFragmentMuxer},
+ mpeg4::Mpeg4FragmentMuxer,
+ },
};
use anyhow::Result;
use std::io::Write;
-use winter_matroska::{Cluster, Info, Tracks};
+use winter_matroska::Segment;
pub trait FragmentMuxer {
- fn write_fragment(
- out: &mut dyn Write,
- info: Info,
- tracks: Tracks,
- cluster: Cluster,
- ) -> Result<()>;
+ fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()>;
}
pub fn write_fragment(
container: ContainerFormat,
out: &mut dyn Write,
- info: Info,
- tracks: Tracks,
- cluster: Cluster,
+ segment: Segment,
) -> Result<()> {
match container {
- ContainerFormat::Matroska => {
- MatroskaFragmentMuxer::write_fragment(out, info, tracks, cluster)
- }
- ContainerFormat::Webm => WebmFragmentMuxer::write_fragment(out, info, tracks, cluster),
+ ContainerFormat::Matroska => MatroskaFragmentMuxer::write_fragment(out, segment),
+ ContainerFormat::Webm => WebmFragmentMuxer::write_fragment(out, segment),
+ ContainerFormat::Mpeg4 => Mpeg4FragmentMuxer::write_fragment(out, segment),
_ => unimplemented!(),
}
}
diff --git a/remuxer/src/muxers/mpeg4.rs b/remuxer/src/muxers/mpeg4.rs
new file mode 100644
index 0000000..437f117
--- /dev/null
+++ b/remuxer/src/muxers/mpeg4.rs
@@ -0,0 +1,37 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+
+use crate::muxers::{FragmentMuxer, matroska::MatroskaFragmentMuxer};
+use anyhow::Result;
+use std::{
+ io::{Cursor, Write, copy},
+ process::{Command, Stdio},
+ thread::spawn,
+};
+use winter_matroska::Segment;
+
+pub struct Mpeg4FragmentMuxer;
+impl FragmentMuxer for Mpeg4FragmentMuxer {
+ fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> {
+ let mut mk_frag = Vec::new();
+ MatroskaFragmentMuxer::write_fragment(&mut mk_frag, segment)?;
+
+ let mut child = Command::new("ffmpeg")
+ .args(
+ "-hide_banner -f matroska -i pipe:0 -c copy -f mp4 -movflags frag_keyframe+empty_moov pipe:1"
+ .split(" "),
+ )
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .spawn()?;
+
+ let mut stdin = child.stdin.take().unwrap();
+ let mut stdout = child.stdout.take().unwrap();
+ spawn(move || copy(&mut Cursor::new(mk_frag), &mut stdin));
+ copy(&mut stdout, out)?;
+ Ok(())
+ }
+}
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs
index dfe2f86..d239c92 100644
--- a/server/src/logic/stream.rs
+++ b/server/src/logic/stream.rs
@@ -9,7 +9,7 @@ use jellycommon::{api::NodeFilterSort, stream::StreamSpec, NodeID, TrackSource};
use jellyimport::asset_token::AssetInner;
use jellylogic::{node::get_node, session::Session};
use jellystream::SMediaInfo;
-use log::info;
+use log::{info, warn};
use rocket::{
get, head,
http::{Header, Status},
@@ -148,9 +148,16 @@ pub async fn r_stream(
});
// TODO cleaner solution needed
- let mut reader = spawn_blocking(move || jellystream::stream(media, spec, urange))
+ let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange))
.await
- .unwrap()?;
+ .unwrap()
+ {
+ Ok(o) => o,
+ Err(e) => {
+ warn!("stream error: {e:?}");
+ Err(e)?
+ }
+ };
let (stream_write, stream_read) = duplex(4096);
spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write)));
diff --git a/server/src/ui/error.rs b/server/src/ui/error.rs
index 6fc3284..f1c9d3a 100644
--- a/server/src/ui/error.rs
+++ b/server/src/ui/error.rs
@@ -26,10 +26,11 @@ static ERROR_IMAGE: LazyLock<Vec<u8>> = LazyLock::new(|| {
#[catch(default)]
pub fn r_catch<'a>(status: Status, _request: &Request) -> RawHtml<String> {
+ catch_with_message(format!("{status}"))
+}
+fn catch_with_message(message: String) -> RawHtml<String> {
RawHtml(render_page(
- &ErrorPage {
- status: format!("{status}"),
- },
+ &ErrorPage { status: message },
RenderInfo {
importing: false,
session: None,
@@ -56,7 +57,7 @@ impl<'r> Responder<'r, 'static> for MyError {
Some(x) if x.is_avif() || x.is_png() || x.is_jpeg() => {
(ContentType::AVIF, ERROR_IMAGE.as_slice()).respond_to(req)
}
- _ => r_catch(Status::InternalServerError, req).respond_to(req),
+ _ => catch_with_message(format!("{:#}", self.0)).respond_to(req),
}
}
}
diff --git a/stream/src/cues.rs b/stream/src/cues.rs
index b486a6f..646db5b 100644
--- a/stream/src/cues.rs
+++ b/stream/src/cues.rs
@@ -16,7 +16,7 @@ pub struct TrackStat {
pub total_size: u64,
}
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Copy, Serialize, Deserialize)]
pub struct GeneratedCue {
pub position: u64,
pub time: u64,
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index a14732b..4cdc55e 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,12 +3,19 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{cues::generate_cues, stream_info, SMediaInfo};
+use crate::{
+ cues::{generate_cues, GeneratedCue},
+ stream_info, SMediaInfo,
+};
use anyhow::{anyhow, Result};
use jellyremuxer::{
- demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat,
+ demuxers::create_demuxer_autodetect,
+ matroska::{self, Segment},
+ muxers::write_fragment,
+ ContainerFormat,
};
use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum};
+use jellytranscoder::fragment::transcode;
use std::{
fs::File,
io::{Cursor, Read},
@@ -44,24 +51,35 @@ pub fn fragment_stream(
.find(|t| t.track_number == track_num)
.unwrap();
+ let timestamp_scale = iinfo.metadata[file_index].info.timestamp_scale;
+ let total_duration = iinfo.metadata[file_index].info.duration;
+ let cue_stat = generate_cues(&media_path)?;
+ let start_cue = cue_stat
+ .cues
+ .get(index)
+ .ok_or(anyhow!("fragment index out of range"))?;
+ let end_cue = cue_stat
+ .cues
+ .get(index + 1)
+ .copied()
+ .unwrap_or(GeneratedCue {
+ position: 0,
+ time: total_duration.unwrap_or_default() as u64 * timestamp_scale, // TODO rounding?
+ });
+ let cluster_offset = start_cue.position;
+ let duration = (end_cue.time - start_cue.time) as f64 / timestamp_scale as f64;
+
let mk_info = matroska::Info {
- duration: Some(info.duration),
- timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale,
+ duration: Some(duration),
+ timestamp_scale,
..Default::default()
};
let mk_tracks = matroska::Tracks {
entries: vec![mk_track.to_owned()],
};
- let cue_stat = generate_cues(&media_path)?;
- let cluster_offset = cue_stat
- .cues
- .get(index)
- .ok_or(anyhow!("fragment index out of range"))?
- .position;
-
let mut cluster = {
- let media_file = File::open(media_path)?;
+ let media_file = File::open(&media_path)?;
let mut media = create_demuxer_autodetect(Box::new(media_file))?
.ok_or(anyhow!("media container unknown"))?;
media.seek_cluster(Some(cluster_offset))?;
@@ -78,107 +96,29 @@ pub fn fragment_stream(
StreamContainer::WebM => ContainerFormat::Webm,
StreamContainer::Matroska => ContainerFormat::Matroska,
StreamContainer::WebVTT => todo!(),
- StreamContainer::MPEG4 => todo!(),
+ StreamContainer::MPEG4 => ContainerFormat::Mpeg4,
StreamContainer::JVTT => todo!(),
};
- if format.remux {
- let mut out = Vec::new();
- write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?;
- Ok(Box::new(Cursor::new(out)))
- } else {
- todo!()
- }
+ let mut segment = Segment {
+ info: mk_info,
+ tracks: Some(mk_tracks),
+ clusters: vec![cluster],
+ ..Default::default()
+ };
+ segment.info.writing_app =
+ concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")).to_string();
- // if format.remux {
- // match container {
- // StreamContainer::WebM | StreamContainer::Matroska => {
- // tokio::task::spawn_blocking(move || {
- // if let Err(err) = jellyremuxer::write_fragment_into(
- // SyncIoBridge::new(b),
- // &path,
- // track_num,
- // container == StreamContainer::WebM,
- // &info.name.unwrap_or_default(),
- // index,
- // ) {
- // warn!("segment stream error: {err}");
- // }
- // });
- // }
- // StreamContainer::MPEG4 => {
- // tokio::task::spawn_blocking(move || {
- // let mut buf = Cursor::new(Vec::new());
- // if let Err(err) = jellyremuxer::write_fragment_into(
- // &mut buf,
- // &path,
- // track_num,
- // false,
- // &info.name.unwrap_or_default(),
- // index,
- // ) {
- // warn!("segment stream error: {err}");
- // }
- // buf.seek(SeekFrom::Start(0)).unwrap();
- // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) {
- // warn!("mpeg4 transmux failed: {err}");
- // }
- // });
- // }
- // StreamContainer::JVTT => {}
- // _ => bail!("not yet supported"),
- // }
- // } else {
- // let location = transcode(
- // track.kind,
- // orig_track,
- // format,
- // &format!("{path:?} {track_num} {index}"),
- // move |b| {
- // tokio::task::spawn_blocking(move || {
- // if let Err(err) = jellyremuxer::write_fragment_into(
- // SyncIoBridge::new(b),
- // &path,
- // track_num,
- // false,
- // &info.name.unwrap_or_default(),
- // index,
- // ) {
- // warn!("segment stream error: {err}");
- // }
- // });
- // },
- // )
- // .await?;
+ if !format.remux {
+ segment = transcode(
+ track.kind,
+ &format!("{media_path:?} {track_num} {index}"),
+ format,
+ segment,
+ )?;
+ }
- // let mut frag = File::open(location.abs()).await?;
- // match container {
- // StreamContainer::WebM => {
- // tokio::task::spawn_blocking(move || {
- // if let Err(err) =
- // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
- // {
- // warn!("webm transmux failed: {err}");
- // }
- // });
- // }
- // StreamContainer::Matroska => {
- // tokio::task::spawn(async move {
- // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
- // warn!("cannot write stream: {err}")
- // }
- // });
- // }
- // StreamContainer::MPEG4 => {
- // tokio::task::spawn_blocking(move || {
- // if let Err(err) =
- // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
- // {
- // warn!("mpeg4 transmux failed: {err}");
- // }
- // });
- // }
- // _ => bail!("unsupported"),
- // }
- // }
+ let mut out = Vec::new();
+ write_fragment(jr_container, &mut out, segment)?;
+ Ok(Box::new(Cursor::new(out)))
}
diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs
index 4cd8b6f..c94b877 100644
--- a/transcoder/src/fragment.rs
+++ b/transcoder/src/fragment.rs
@@ -5,30 +5,36 @@
*/
use crate::{Config, CONF, LOCAL_VIDEO_TRANSCODING_TASKS};
use anyhow::Result;
-use jellycache::{async_cache_file, CachePath};
+use jellycache::cache_file;
+use jellyremuxer::{demuxers::create_demuxer, muxers::write_fragment, ContainerFormat};
use jellystream_types::{StreamFormatInfo, TrackKind};
use log::info;
use std::fmt::Write;
-use std::process::Stdio;
-use tokio::{
- io::copy,
- process::{ChildStdin, Command},
-};
-use winter_matroska::TrackEntry as MatroskaTrackEntry;
+use std::fs::File;
+use std::io::{copy, Write as W2};
+use std::process::{Command, Stdio};
+use std::thread::spawn;
+use winter_matroska::{Segment, TrackEntry as MatroskaTrackEntry};
// TODO odd video resolutions can cause errors when transcoding to YUV42{0,2}
// TODO with an implementation that cant handle it (SVT-AV1 is such an impl).
-pub async fn transcode(
+pub fn transcode(
kind: TrackKind,
- orig_metadata: &MatroskaTrackEntry,
- format: &StreamFormatInfo,
input_key: &str,
- input: impl FnOnce(ChildStdin),
-) -> anyhow::Result<CachePath> {
- let command = transcode_command(kind, orig_metadata, format, &*CONF).unwrap();
- async_cache_file("frag-tc", (input_key, &command), async |mut output| {
- let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?;
+ output_format: &StreamFormatInfo,
+ input: Segment,
+) -> Result<Segment> {
+ let command = transcode_command(
+ kind,
+ &input.tracks.as_ref().unwrap().entries[0],
+ output_format,
+ &*CONF,
+ )
+ .unwrap();
+
+ let output = cache_file("frag-tc", (input_key, &command), |mut output| {
+ let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.lock().unwrap();
info!("encoding with {command:?}");
let mut args = command.split(" ");
let mut proc = Command::new(args.next().unwrap())
@@ -37,17 +43,40 @@ pub async fn transcode(
.args(args)
.spawn()?;
- let stdin = proc.stdin.take().unwrap();
+ let mut stdin = proc.stdin.take().unwrap();
let mut stdout = proc.stdout.take().unwrap();
- input(stdin);
- copy(&mut stdout, &mut output).await?;
+ spawn(move || {
+ copy(&mut stdout, &mut output).unwrap();
+ });
+
+ write_fragment(ContainerFormat::Matroska, &mut stdin, input)?;
+ stdin.flush()?;
+ drop(stdin);
- proc.wait().await.unwrap().exit_ok()?;
+ proc.wait().unwrap().exit_ok()?;
info!("done");
Ok(())
+ })?;
+
+ let mut demuxer = create_demuxer(
+ ContainerFormat::Matroska,
+ Box::new(File::open(output.abs())?),
+ );
+
+ let info = demuxer.info()?;
+ let tracks = demuxer.tracks()?;
+ let mut clusters = Vec::new();
+ while let Some((_, cluster)) = demuxer.read_cluster()? {
+ clusters.push(cluster);
+ }
+
+ Ok(Segment {
+ info,
+ tracks,
+ clusters,
+ ..Default::default()
})
- .await
}
fn transcode_command(
diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs
index 665f470..1eac15b 100644
--- a/transcoder/src/lib.rs
+++ b/transcoder/src/lib.rs
@@ -37,4 +37,4 @@ static CONF: LazyLock<Config> = LazyLock::new(|| {
});
static LOCAL_IMAGE_TRANSCODING_TASKS: Semaphore = Semaphore::const_new(8);
-static LOCAL_VIDEO_TRANSCODING_TASKS: Semaphore = Semaphore::const_new(2);
+static LOCAL_VIDEO_TRANSCODING_TASKS: Mutex<()> = Mutex::new(());