diff options
-rw-r--r-- | cache/src/lib.rs | 4 | ||||
-rw-r--r-- | remuxer/src/demuxers/mod.rs | 1 | ||||
-rw-r--r-- | remuxer/src/lib.rs | 1 | ||||
-rw-r--r-- | remuxer/src/muxers/matroska.rs | 51 | ||||
-rw-r--r-- | remuxer/src/muxers/mod.rs | 26 | ||||
-rw-r--r-- | remuxer/src/muxers/mpeg4.rs | 37 | ||||
-rw-r--r-- | server/src/logic/stream.rs | 13 | ||||
-rw-r--r-- | server/src/ui/error.rs | 9 | ||||
-rw-r--r-- | stream/src/cues.rs | 2 | ||||
-rw-r--r-- | stream/src/fragment.rs | 160 | ||||
-rw-r--r-- | transcoder/src/fragment.rs | 69 | ||||
-rw-r--r-- | transcoder/src/lib.rs | 2 |
12 files changed, 191 insertions, 184 deletions
diff --git a/cache/src/lib.rs b/cache/src/lib.rs index 52245d6..115741c 100644 --- a/cache/src/lib.rs +++ b/cache/src/lib.rs @@ -127,10 +127,10 @@ thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::n pub fn cache_file<Fun>( kind: &str, key: impl Hash, - mut generate: Fun, + generate: Fun, ) -> Result<CachePath, anyhow::Error> where - Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, + Fun: FnOnce(std::fs::File) -> Result<(), anyhow::Error>, { let (bucket, location) = cache_location(kind, key); let loc_abs = location.abs(); diff --git a/remuxer/src/demuxers/mod.rs b/remuxer/src/demuxers/mod.rs index f001250..597bf4a 100644 --- a/remuxer/src/demuxers/mod.rs +++ b/remuxer/src/demuxers/mod.rs @@ -40,6 +40,7 @@ pub fn create_demuxer(container: ContainerFormat, reader: Box<dyn ReadSeek>) -> match container { ContainerFormat::Matroska | ContainerFormat::Webm => Box::new(MatroskaDemuxer::new(reader)), ContainerFormat::Flac => Box::new(FlacDemuxer::new(reader)), + ContainerFormat::Mpeg4 => todo!(), } } pub fn create_demuxer_autodetect( diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index 13ae06f..306121b 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -15,4 +15,5 @@ pub enum ContainerFormat { Matroska, Webm, Flac, + Mpeg4, } diff --git a/remuxer/src/muxers/matroska.rs b/remuxer/src/muxers/matroska.rs index 47210c9..c2f22e7 100644 --- a/remuxer/src/muxers/matroska.rs +++ b/remuxer/src/muxers/matroska.rs @@ -8,15 +8,25 @@ use crate::muxers::FragmentMuxer; use anyhow::Result; use std::io::Write; use winter_ebml::{EbmlHeader, EbmlToVec}; -use winter_matroska::{Cluster, Info, MatroskaFile, Segment, Tracks}; +use winter_matroska::{MatroskaFile, Segment}; + +fn write_fragment_shared(out: &mut dyn Write, mut segment: Segment, webm: bool) -> Result<()> { + segment.info.muxing_app = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")).to_string(); + if webm { + if let Some(tracks) = &mut segment.tracks { + for track in &mut tracks.entries { + if let Some(video) = &mut track.video { + video.colour = None; + video.projection = None; + video.display_unit = 0; // pixels + video.display_width = Some(video.pixel_width); + video.display_height = Some(video.pixel_height); + } + } + } + } -fn write_fragment_shared( - out: &mut dyn Write, - info: Info, - tracks: Tracks, - cluster: Cluster, - webm: bool, -) -> Result<()> { let file = MatroskaFile { ebml_header: EbmlHeader { ebml_version: 1, @@ -28,12 +38,7 @@ fn write_fragment_shared( doc_type_read_version: 2, ..Default::default() }, - segment: Segment { - info, - tracks: Some(tracks), - clusters: vec![cluster], - ..Default::default() - }, + segment, }; out.write_all(&file.to_vec())?; Ok(()) @@ -41,23 +46,13 @@ fn write_fragment_shared( pub struct MatroskaFragmentMuxer; impl FragmentMuxer for MatroskaFragmentMuxer { - fn write_fragment( - out: &mut dyn Write, - info: Info, - tracks: Tracks, - cluster: Cluster, - ) -> Result<()> { - write_fragment_shared(out, info, tracks, cluster, false) + fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> { + write_fragment_shared(out, segment, false) } } pub struct WebmFragmentMuxer; impl FragmentMuxer for WebmFragmentMuxer { - fn write_fragment( - out: &mut dyn Write, - info: Info, - tracks: Tracks, - cluster: Cluster, - ) -> Result<()> { - write_fragment_shared(out, info, tracks, cluster, true) + fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> { + write_fragment_shared(out, segment, true) } } diff --git a/remuxer/src/muxers/mod.rs b/remuxer/src/muxers/mod.rs index 8752373..ae544eb 100644 --- a/remuxer/src/muxers/mod.rs +++ b/remuxer/src/muxers/mod.rs @@ -5,36 +5,32 @@ */ pub mod matroska; +pub mod mpeg4; use crate::{ ContainerFormat, - muxers::matroska::{MatroskaFragmentMuxer, WebmFragmentMuxer}, + muxers::{ + matroska::{MatroskaFragmentMuxer, WebmFragmentMuxer}, + mpeg4::Mpeg4FragmentMuxer, + }, }; use anyhow::Result; use std::io::Write; -use winter_matroska::{Cluster, Info, Tracks}; +use winter_matroska::Segment; pub trait FragmentMuxer { - fn write_fragment( - out: &mut dyn Write, - info: Info, - tracks: Tracks, - cluster: Cluster, - ) -> Result<()>; + fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()>; } pub fn write_fragment( container: ContainerFormat, out: &mut dyn Write, - info: Info, - tracks: Tracks, - cluster: Cluster, + segment: Segment, ) -> Result<()> { match container { - ContainerFormat::Matroska => { - MatroskaFragmentMuxer::write_fragment(out, info, tracks, cluster) - } - ContainerFormat::Webm => WebmFragmentMuxer::write_fragment(out, info, tracks, cluster), + ContainerFormat::Matroska => MatroskaFragmentMuxer::write_fragment(out, segment), + ContainerFormat::Webm => WebmFragmentMuxer::write_fragment(out, segment), + ContainerFormat::Mpeg4 => Mpeg4FragmentMuxer::write_fragment(out, segment), _ => unimplemented!(), } } diff --git a/remuxer/src/muxers/mpeg4.rs b/remuxer/src/muxers/mpeg4.rs new file mode 100644 index 0000000..437f117 --- /dev/null +++ b/remuxer/src/muxers/mpeg4.rs @@ -0,0 +1,37 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ + +use crate::muxers::{FragmentMuxer, matroska::MatroskaFragmentMuxer}; +use anyhow::Result; +use std::{ + io::{Cursor, Write, copy}, + process::{Command, Stdio}, + thread::spawn, +}; +use winter_matroska::Segment; + +pub struct Mpeg4FragmentMuxer; +impl FragmentMuxer for Mpeg4FragmentMuxer { + fn write_fragment(out: &mut dyn Write, segment: Segment) -> Result<()> { + let mut mk_frag = Vec::new(); + MatroskaFragmentMuxer::write_fragment(&mut mk_frag, segment)?; + + let mut child = Command::new("ffmpeg") + .args( + "-hide_banner -f matroska -i pipe:0 -c copy -f mp4 -movflags frag_keyframe+empty_moov pipe:1" + .split(" "), + ) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + let mut stdin = child.stdin.take().unwrap(); + let mut stdout = child.stdout.take().unwrap(); + spawn(move || copy(&mut Cursor::new(mk_frag), &mut stdin)); + copy(&mut stdout, out)?; + Ok(()) + } +} diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs index dfe2f86..d239c92 100644 --- a/server/src/logic/stream.rs +++ b/server/src/logic/stream.rs @@ -9,7 +9,7 @@ use jellycommon::{api::NodeFilterSort, stream::StreamSpec, NodeID, TrackSource}; use jellyimport::asset_token::AssetInner; use jellylogic::{node::get_node, session::Session}; use jellystream::SMediaInfo; -use log::info; +use log::{info, warn}; use rocket::{ get, head, http::{Header, Status}, @@ -148,9 +148,16 @@ pub async fn r_stream( }); // TODO cleaner solution needed - let mut reader = spawn_blocking(move || jellystream::stream(media, spec, urange)) + let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange)) .await - .unwrap()?; + .unwrap() + { + Ok(o) => o, + Err(e) => { + warn!("stream error: {e:?}"); + Err(e)? + } + }; let (stream_write, stream_read) = duplex(4096); spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); diff --git a/server/src/ui/error.rs b/server/src/ui/error.rs index 6fc3284..f1c9d3a 100644 --- a/server/src/ui/error.rs +++ b/server/src/ui/error.rs @@ -26,10 +26,11 @@ static ERROR_IMAGE: LazyLock<Vec<u8>> = LazyLock::new(|| { #[catch(default)] pub fn r_catch<'a>(status: Status, _request: &Request) -> RawHtml<String> { + catch_with_message(format!("{status}")) +} +fn catch_with_message(message: String) -> RawHtml<String> { RawHtml(render_page( - &ErrorPage { - status: format!("{status}"), - }, + &ErrorPage { status: message }, RenderInfo { importing: false, session: None, @@ -56,7 +57,7 @@ impl<'r> Responder<'r, 'static> for MyError { Some(x) if x.is_avif() || x.is_png() || x.is_jpeg() => { (ContentType::AVIF, ERROR_IMAGE.as_slice()).respond_to(req) } - _ => r_catch(Status::InternalServerError, req).respond_to(req), + _ => catch_with_message(format!("{:#}", self.0)).respond_to(req), } } } diff --git a/stream/src/cues.rs b/stream/src/cues.rs index b486a6f..646db5b 100644 --- a/stream/src/cues.rs +++ b/stream/src/cues.rs @@ -16,7 +16,7 @@ pub struct TrackStat { pub total_size: u64, } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Copy, Serialize, Deserialize)] pub struct GeneratedCue { pub position: u64, pub time: u64, diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index a14732b..4cdc55e 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,12 +3,19 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{cues::generate_cues, stream_info, SMediaInfo}; +use crate::{ + cues::{generate_cues, GeneratedCue}, + stream_info, SMediaInfo, +}; use anyhow::{anyhow, Result}; use jellyremuxer::{ - demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat, + demuxers::create_demuxer_autodetect, + matroska::{self, Segment}, + muxers::write_fragment, + ContainerFormat, }; use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum}; +use jellytranscoder::fragment::transcode; use std::{ fs::File, io::{Cursor, Read}, @@ -44,24 +51,35 @@ pub fn fragment_stream( .find(|t| t.track_number == track_num) .unwrap(); + let timestamp_scale = iinfo.metadata[file_index].info.timestamp_scale; + let total_duration = iinfo.metadata[file_index].info.duration; + let cue_stat = generate_cues(&media_path)?; + let start_cue = cue_stat + .cues + .get(index) + .ok_or(anyhow!("fragment index out of range"))?; + let end_cue = cue_stat + .cues + .get(index + 1) + .copied() + .unwrap_or(GeneratedCue { + position: 0, + time: total_duration.unwrap_or_default() as u64 * timestamp_scale, // TODO rounding? + }); + let cluster_offset = start_cue.position; + let duration = (end_cue.time - start_cue.time) as f64 / timestamp_scale as f64; + let mk_info = matroska::Info { - duration: Some(info.duration), - timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale, + duration: Some(duration), + timestamp_scale, ..Default::default() }; let mk_tracks = matroska::Tracks { entries: vec![mk_track.to_owned()], }; - let cue_stat = generate_cues(&media_path)?; - let cluster_offset = cue_stat - .cues - .get(index) - .ok_or(anyhow!("fragment index out of range"))? - .position; - let mut cluster = { - let media_file = File::open(media_path)?; + let media_file = File::open(&media_path)?; let mut media = create_demuxer_autodetect(Box::new(media_file))? .ok_or(anyhow!("media container unknown"))?; media.seek_cluster(Some(cluster_offset))?; @@ -78,107 +96,29 @@ pub fn fragment_stream( StreamContainer::WebM => ContainerFormat::Webm, StreamContainer::Matroska => ContainerFormat::Matroska, StreamContainer::WebVTT => todo!(), - StreamContainer::MPEG4 => todo!(), + StreamContainer::MPEG4 => ContainerFormat::Mpeg4, StreamContainer::JVTT => todo!(), }; - if format.remux { - let mut out = Vec::new(); - write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?; - Ok(Box::new(Cursor::new(out))) - } else { - todo!() - } + let mut segment = Segment { + info: mk_info, + tracks: Some(mk_tracks), + clusters: vec![cluster], + ..Default::default() + }; + segment.info.writing_app = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")).to_string(); - // if format.remux { - // match container { - // StreamContainer::WebM | StreamContainer::Matroska => { - // tokio::task::spawn_blocking(move || { - // if let Err(err) = jellyremuxer::write_fragment_into( - // SyncIoBridge::new(b), - // &path, - // track_num, - // container == StreamContainer::WebM, - // &info.name.unwrap_or_default(), - // index, - // ) { - // warn!("segment stream error: {err}"); - // } - // }); - // } - // StreamContainer::MPEG4 => { - // tokio::task::spawn_blocking(move || { - // let mut buf = Cursor::new(Vec::new()); - // if let Err(err) = jellyremuxer::write_fragment_into( - // &mut buf, - // &path, - // track_num, - // false, - // &info.name.unwrap_or_default(), - // index, - // ) { - // warn!("segment stream error: {err}"); - // } - // buf.seek(SeekFrom::Start(0)).unwrap(); - // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { - // warn!("mpeg4 transmux failed: {err}"); - // } - // }); - // } - // StreamContainer::JVTT => {} - // _ => bail!("not yet supported"), - // } - // } else { - // let location = transcode( - // track.kind, - // orig_track, - // format, - // &format!("{path:?} {track_num} {index}"), - // move |b| { - // tokio::task::spawn_blocking(move || { - // if let Err(err) = jellyremuxer::write_fragment_into( - // SyncIoBridge::new(b), - // &path, - // track_num, - // false, - // &info.name.unwrap_or_default(), - // index, - // ) { - // warn!("segment stream error: {err}"); - // } - // }); - // }, - // ) - // .await?; + if !format.remux { + segment = transcode( + track.kind, + &format!("{media_path:?} {track_num} {index}"), + format, + segment, + )?; + } - // let mut frag = File::open(location.abs()).await?; - // match container { - // StreamContainer::WebM => { - // tokio::task::spawn_blocking(move || { - // if let Err(err) = - // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - // { - // warn!("webm transmux failed: {err}"); - // } - // }); - // } - // StreamContainer::Matroska => { - // tokio::task::spawn(async move { - // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { - // warn!("cannot write stream: {err}") - // } - // }); - // } - // StreamContainer::MPEG4 => { - // tokio::task::spawn_blocking(move || { - // if let Err(err) = - // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - // { - // warn!("mpeg4 transmux failed: {err}"); - // } - // }); - // } - // _ => bail!("unsupported"), - // } - // } + let mut out = Vec::new(); + write_fragment(jr_container, &mut out, segment)?; + Ok(Box::new(Cursor::new(out))) } diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 4cd8b6f..c94b877 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -5,30 +5,36 @@ */ use crate::{Config, CONF, LOCAL_VIDEO_TRANSCODING_TASKS}; use anyhow::Result; -use jellycache::{async_cache_file, CachePath}; +use jellycache::cache_file; +use jellyremuxer::{demuxers::create_demuxer, muxers::write_fragment, ContainerFormat}; use jellystream_types::{StreamFormatInfo, TrackKind}; use log::info; use std::fmt::Write; -use std::process::Stdio; -use tokio::{ - io::copy, - process::{ChildStdin, Command}, -}; -use winter_matroska::TrackEntry as MatroskaTrackEntry; +use std::fs::File; +use std::io::{copy, Write as W2}; +use std::process::{Command, Stdio}; +use std::thread::spawn; +use winter_matroska::{Segment, TrackEntry as MatroskaTrackEntry}; // TODO odd video resolutions can cause errors when transcoding to YUV42{0,2} // TODO with an implementation that cant handle it (SVT-AV1 is such an impl). -pub async fn transcode( +pub fn transcode( kind: TrackKind, - orig_metadata: &MatroskaTrackEntry, - format: &StreamFormatInfo, input_key: &str, - input: impl FnOnce(ChildStdin), -) -> anyhow::Result<CachePath> { - let command = transcode_command(kind, orig_metadata, format, &*CONF).unwrap(); - async_cache_file("frag-tc", (input_key, &command), async |mut output| { - let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; + output_format: &StreamFormatInfo, + input: Segment, +) -> Result<Segment> { + let command = transcode_command( + kind, + &input.tracks.as_ref().unwrap().entries[0], + output_format, + &*CONF, + ) + .unwrap(); + + let output = cache_file("frag-tc", (input_key, &command), |mut output| { + let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.lock().unwrap(); info!("encoding with {command:?}"); let mut args = command.split(" "); let mut proc = Command::new(args.next().unwrap()) @@ -37,17 +43,40 @@ pub async fn transcode( .args(args) .spawn()?; - let stdin = proc.stdin.take().unwrap(); + let mut stdin = proc.stdin.take().unwrap(); let mut stdout = proc.stdout.take().unwrap(); - input(stdin); - copy(&mut stdout, &mut output).await?; + spawn(move || { + copy(&mut stdout, &mut output).unwrap(); + }); + + write_fragment(ContainerFormat::Matroska, &mut stdin, input)?; + stdin.flush()?; + drop(stdin); - proc.wait().await.unwrap().exit_ok()?; + proc.wait().unwrap().exit_ok()?; info!("done"); Ok(()) + })?; + + let mut demuxer = create_demuxer( + ContainerFormat::Matroska, + Box::new(File::open(output.abs())?), + ); + + let info = demuxer.info()?; + let tracks = demuxer.tracks()?; + let mut clusters = Vec::new(); + while let Some((_, cluster)) = demuxer.read_cluster()? { + clusters.push(cluster); + } + + Ok(Segment { + info, + tracks, + clusters, + ..Default::default() }) - .await } fn transcode_command( diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs index 665f470..1eac15b 100644 --- a/transcoder/src/lib.rs +++ b/transcoder/src/lib.rs @@ -37,4 +37,4 @@ static CONF: LazyLock<Config> = LazyLock::new(|| { }); static LOCAL_IMAGE_TRANSCODING_TASKS: Semaphore = Semaphore::const_new(8); -static LOCAL_VIDEO_TRANSCODING_TASKS: Semaphore = Semaphore::const_new(2); +static LOCAL_VIDEO_TRANSCODING_TASKS: Mutex<()> = Mutex::new(()); |