diff options
Diffstat (limited to 'stream/src/fragment.rs')
-rw-r--r-- | stream/src/fragment.rs | 126 |
1 files changed, 73 insertions, 53 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e276d29..dfe101e 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,16 +3,10 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, bail, Result}; -use jellybase::{ - common::{ - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; +use jellybase::common::stream::StreamContainer; +use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; use jellytranscoder::fragment::transcode; use log::warn; use std::sync::Arc; @@ -20,40 +14,57 @@ use tokio::{fs::File, io::DuplexStream}; use tokio_util::io::SyncIoBridge; pub async fn fragment_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, mut b: DuplexStream, - perms: &PermissionSet, + info: Arc<SMediaInfo>, + track: usize, + segment: usize, + index: usize, + format_num: usize, + container: StreamContainer, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("unsupported number of tracks for segment, must be exactly one"); - } - let track = spec.track[0]; - let n = spec.index.ok_or(anyhow!("segment index missing"))?; - - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let path = iinfo.paths[file_index].clone(); + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; + let format = track + .formats + .get(format_num) + .ok_or(anyhow!("format not found"))?; - if let Some(profile) = spec.profile { - perms.assert(&UserPermission::Transcode)?; + if format.remux { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + SyncIoBridge::new(b), + &path, + track_num, + container == StreamContainer::WebM, + &info.name.unwrap_or_default(), + index, + ) { + warn!("segment stream error: {err}"); + } + }); + } else { let location = transcode( - &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source - CONF.transcoding_profiles - .get(profile) - .ok_or(anyhow!("profile out of range"))?, + &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source + track.kind, + format, move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( SyncIoBridge::new(b), - &CONF.media_path, - &node, - &local_track, - track, + &path, + track_num, false, - n, + &info.name.unwrap_or_default(), + index, ) { warn!("segment stream error: {err}"); } @@ -61,27 +72,36 @@ pub async fn fragment_stream( }, ) .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") + + let mut frag = File::open(location.abs()).await?; + match container { + StreamContainer::WebM => { + tokio::task::spawn_blocking(move || { + if let Err(err) = + matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + { + warn!("webm transmux failed: {err}"); + } + }); } - }); - } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - b, - &CONF.media_path, - &node, - &local_track, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); + StreamContainer::Matroska => { + tokio::task::spawn(async move { + if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { + warn!("cannot write stream: {err}") + } + }); } - }); + StreamContainer::MPEG4 => { + tokio::task::spawn_blocking(move || { + if let Err(err) = + matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + { + warn!("mpeg4 transmux failed: {err}"); + } + }); + } + _ => bail!("unsupported"), + } } Ok(()) |