/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ use anyhow::{anyhow, bail, Result}; use jellybase::{permission::PermissionSetExt, AssetLocationExt, CONF}; use jellycommon::{ stream::StreamSpec, user::{PermissionSet, UserPermission}, LocalTrack, Node, }; use jellytranscoder::snippet::transcode; use log::warn; use tokio::{fs::File, io::DuplexStream}; use tokio_util::io::SyncIoBridge; pub async fn segment_stream( node: Node, local_tracks: Vec, spec: StreamSpec, mut b: DuplexStream, perms: &PermissionSet, ) -> Result<()> { if spec.tracks.len() != 1 { bail!("unsupported number of tracks for segment, must be exactly one"); } let track = spec.tracks[0]; let n = spec.index.ok_or(anyhow!("segment index missing"))?; let local_track = local_tracks .get(0) .ok_or(anyhow!("track missing"))? .to_owned(); if let Some(profile) = spec.profile { perms.assert(&UserPermission::Transcode)?; let location = transcode( &format!("{track} {n} {:?}", node.private.source), // TODO maybe not use the entire source CONF.transcoding_profiles .get(profile) .ok_or(anyhow!("profile out of range"))?, move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_snippet_into( SyncIoBridge::new(b), &CONF.library_path, &node.public, &local_track, track, false, n, ) { warn!("segment stream error: {err}"); } }); }, ) .await?; let mut output = File::open(location.path()).await?; tokio::task::spawn(async move { if let Err(err) = tokio::io::copy(&mut output, &mut b).await { warn!("cannot write stream: {err}") } }); } else { let b = SyncIoBridge::new(b); tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_snippet_into( b, &CONF.library_path, &node.public, &local_track, track, spec.webm.unwrap_or(false), n, ) { warn!("segment stream error: {err}"); } }); } Ok(()) }