diff options
Diffstat (limited to 'stream/src/fragment.rs')
-rw-r--r-- | stream/src/fragment.rs | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs new file mode 100644 index 0000000..f08114c --- /dev/null +++ b/stream/src/fragment.rs @@ -0,0 +1,84 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2024 metamuffin <metamuffin.org> +*/ +use anyhow::{anyhow, bail, Result}; +use jellybase::{permission::PermissionSetExt, CONF}; +use jellycommon::{ + stream::StreamSpec, + user::{PermissionSet, UserPermission}, + LocalTrack, Node, +}; +use jellytranscoder::fragment::transcode; +use log::warn; +use tokio::{fs::File, io::DuplexStream}; +use tokio_util::io::SyncIoBridge; + +pub async fn fragment_stream( + node: Node, + local_tracks: Vec<LocalTrack>, + spec: StreamSpec, + mut b: DuplexStream, + perms: &PermissionSet, +) -> Result<()> { + if spec.tracks.len() != 1 { + bail!("unsupported number of tracks for segment, must be exactly one"); + } + let track = spec.tracks[0]; + let n = spec.index.ok_or(anyhow!("segment index missing"))?; + + let local_track = local_tracks + .get(0) + .ok_or(anyhow!("track missing"))? + .to_owned(); + + if let Some(profile) = spec.profile { + perms.assert(&UserPermission::Transcode)?; + let location = transcode( + &format!("{track} {n} {:?}", node.private.source), // TODO maybe not use the entire source + CONF.transcoding_profiles + .get(profile) + .ok_or(anyhow!("profile out of range"))?, + move |b| { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + SyncIoBridge::new(b), + &CONF.media_path, + &node.public, + &local_track, + track, + false, + n, + ) { + warn!("segment stream error: {err}"); + } + }); + }, + ) + .await?; + let mut output = File::open(location.abs()).await?; + tokio::task::spawn(async move { + if let Err(err) = tokio::io::copy(&mut output, &mut b).await { + warn!("cannot write stream: {err}") + } + }); + } else { + let b = SyncIoBridge::new(b); + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + b, + &CONF.media_path, + &node.public, + &local_track, + track, + spec.webm.unwrap_or(false), + n, + ) { + warn!("segment stream error: {err}"); + } + }); + } + + Ok(()) +} |