diff options
author | metamuffin <metamuffin@disroot.org> | 2025-09-25 04:00:51 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-09-25 04:00:51 +0200 |
commit | f8d4c438d10450ead56c0082b037e466ef5f9f24 (patch) | |
tree | 703612747f06b91f063dcd9e8a778675e6aa1303 /stream/src/fragment.rs | |
parent | 0fe48987c336b9b50cba09355aa3c1cf11740edc (diff) | |
download | jellything-f8d4c438d10450ead56c0082b037e466ef5f9f24.tar jellything-f8d4c438d10450ead56c0082b037e466ef5f9f24.tar.bz2 jellything-f8d4c438d10450ead56c0082b037e466ef5f9f24.tar.zst |
start media processing refactor
Diffstat (limited to 'stream/src/fragment.rs')
-rw-r--r-- | stream/src/fragment.rs | 262 |
1 files changed, 149 insertions, 113 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index 89ce94f..3b4bb0f 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,44 +3,39 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{stream_info, SMediaInfo}; -use anyhow::{anyhow, bail, Result}; -use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; -use jellystream_types::StreamContainer; -use jellytranscoder::fragment::transcode; -use log::warn; +use crate::{cues::generate_cues, stream_info, SMediaInfo}; +use anyhow::{anyhow, Result}; +use jellyremuxer::{ + demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat, +}; +use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum}; use std::{ - io::{Cursor, Seek, SeekFrom}, + fs::File, + io::{Cursor, Read}, sync::Arc, }; -use tokio::{fs::File, io::DuplexStream}; -use tokio_util::io::SyncIoBridge; -pub async fn fragment_stream( - mut b: DuplexStream, +pub fn fragment_stream( info: Arc<SMediaInfo>, - track: usize, - segment: usize, - index: usize, - format_num: usize, + track: TrackNum, + index: IndexNum, + format_num: FormatNum, container: StreamContainer, -) -> Result<()> { - let (iinfo, info) = stream_info(info).await?; +) -> Result<Box<dyn Read + Send + Sync>> { + let (iinfo, info) = stream_info(info)?; + let (file_index, track_num) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("track not found"))?; - let path = iinfo.paths[file_index].clone(); - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; - let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; + let media_path = iinfo.paths[file_index].clone(); + let track = info.tracks.get(track).ok_or(anyhow!("track not found"))?; let format = track .formats .get(format_num) .ok_or(anyhow!("format not found"))?; - let orig_track = iinfo.metadata[file_index] + + let mk_track = iinfo.metadata[file_index] .tracks .as_ref() .unwrap() @@ -49,97 +44,138 @@ pub async fn fragment_stream( .find(|t| t.track_number == track_num) .unwrap(); + let mk_info = matroska::Info { + duration: Some(info.duration), + timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale, + ..Default::default() + }; + let mk_tracks = matroska::Tracks { + entries: vec![mk_track.to_owned()], + }; + + let cue_stat = generate_cues(&media_path)?; + let cluster_offset = cue_stat + .cues + .get(index) + .ok_or(anyhow!("fragment index out of range"))? + .position; + + let cluster = { + let media_file = File::open(media_path)?; + let mut media = create_demuxer_autodetect(Box::new(media_file))? + .ok_or(anyhow!("media container unknown"))?; + media.seek_cluster(Some(cluster_offset))?; + media + .read_cluster()? + .ok_or(anyhow!("cluster unexpectedly missing"))? + .1 + }; + + let jr_container = match container { + StreamContainer::WebM => ContainerFormat::Webm, + StreamContainer::Matroska => ContainerFormat::Matroska, + StreamContainer::WebVTT => todo!(), + StreamContainer::MPEG4 => todo!(), + StreamContainer::JVTT => todo!(), + }; + if format.remux { - match container { - StreamContainer::WebM | StreamContainer::Matroska => { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &path, - track_num, - container == StreamContainer::WebM, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - } - StreamContainer::MPEG4 => { - tokio::task::spawn_blocking(move || { - let mut buf = Cursor::new(Vec::new()); - if let Err(err) = jellyremuxer::write_fragment_into( - &mut buf, - &path, - track_num, - false, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - buf.seek(SeekFrom::Start(0)).unwrap(); - if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { - warn!("mpeg4 transmux failed: {err}"); - } - }); - } - StreamContainer::JVTT => {} - _ => bail!("not yet supported"), - } + let mut out = Vec::new(); + write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?; + Ok(Box::new(Cursor::new(out))) } else { - let location = transcode( - track.kind, - orig_track, - format, - &format!("{path:?} {track_num} {index}"), - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &path, - track_num, - false, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - - let mut frag = File::open(location.abs()).await?; - match container { - StreamContainer::WebM => { - tokio::task::spawn_blocking(move || { - if let Err(err) = - matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - { - warn!("webm transmux failed: {err}"); - } - }); - } - StreamContainer::Matroska => { - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } - StreamContainer::MPEG4 => { - tokio::task::spawn_blocking(move || { - if let Err(err) = - matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - { - warn!("mpeg4 transmux failed: {err}"); - } - }); - } - _ => bail!("unsupported"), - } + todo!() } - Ok(()) + // if format.remux { + // match container { + // StreamContainer::WebM | StreamContainer::Matroska => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &path, + // track_num, + // container == StreamContainer::WebM, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // } + // StreamContainer::MPEG4 => { + // tokio::task::spawn_blocking(move || { + // let mut buf = Cursor::new(Vec::new()); + // if let Err(err) = jellyremuxer::write_fragment_into( + // &mut buf, + // &path, + // track_num, + // false, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // buf.seek(SeekFrom::Start(0)).unwrap(); + // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { + // warn!("mpeg4 transmux failed: {err}"); + // } + // }); + // } + // StreamContainer::JVTT => {} + // _ => bail!("not yet supported"), + // } + // } else { + // let location = transcode( + // track.kind, + // orig_track, + // format, + // &format!("{path:?} {track_num} {index}"), + // move |b| { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &path, + // track_num, + // false, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // }, + // ) + // .await?; + + // let mut frag = File::open(location.abs()).await?; + // match container { + // StreamContainer::WebM => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = + // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + // { + // warn!("webm transmux failed: {err}"); + // } + // }); + // } + // StreamContainer::Matroska => { + // tokio::task::spawn(async move { + // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { + // warn!("cannot write stream: {err}") + // } + // }); + // } + // StreamContainer::MPEG4 => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = + // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + // { + // warn!("mpeg4 transmux failed: {err}"); + // } + // }); + // } + // _ => bail!("unsupported"), + // } + // } } |