aboutsummaryrefslogtreecommitdiff
path: root/stream/src/fragment.rs
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/fragment.rs')
-rw-r--r--stream/src/fragment.rs126
1 files changed, 73 insertions, 53 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index e276d29..dfe101e 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,16 +3,10 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, bail, Result};
-use jellybase::{
- common::{
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
+use jellybase::common::stream::StreamContainer;
+use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm};
use jellytranscoder::fragment::transcode;
use log::warn;
use std::sync::Arc;
@@ -20,40 +14,57 @@ use tokio::{fs::File, io::DuplexStream};
use tokio_util::io::SyncIoBridge;
pub async fn fragment_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
mut b: DuplexStream,
- perms: &PermissionSet,
+ info: Arc<SMediaInfo>,
+ track: usize,
+ segment: usize,
+ index: usize,
+ format_num: usize,
+ container: StreamContainer,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("unsupported number of tracks for segment, must be exactly one");
- }
- let track = spec.track[0];
- let n = spec.index.ok_or(anyhow!("segment index missing"))?;
-
- let local_track = local_tracks
- .first()
- .ok_or(anyhow!("track missing"))?
- .to_owned();
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let path = iinfo.paths[file_index].clone();
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+ let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?;
+ let format = track
+ .formats
+ .get(format_num)
+ .ok_or(anyhow!("format not found"))?;
- if let Some(profile) = spec.profile {
- perms.assert(&UserPermission::Transcode)?;
+ if format.remux {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = jellyremuxer::write_fragment_into(
+ SyncIoBridge::new(b),
+ &path,
+ track_num,
+ container == StreamContainer::WebM,
+ &info.name.unwrap_or_default(),
+ index,
+ ) {
+ warn!("segment stream error: {err}");
+ }
+ });
+ } else {
let location = transcode(
- &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source
- CONF.transcoding_profiles
- .get(profile)
- .ok_or(anyhow!("profile out of range"))?,
+ &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source
+ track.kind,
+ format,
move |b| {
tokio::task::spawn_blocking(move || {
if let Err(err) = jellyremuxer::write_fragment_into(
SyncIoBridge::new(b),
- &CONF.media_path,
- &node,
- &local_track,
- track,
+ &path,
+ track_num,
false,
- n,
+ &info.name.unwrap_or_default(),
+ index,
) {
warn!("segment stream error: {err}");
}
@@ -61,27 +72,36 @@ pub async fn fragment_stream(
},
)
.await?;
- let mut output = File::open(location.abs()).await?;
- tokio::task::spawn(async move {
- if let Err(err) = tokio::io::copy(&mut output, &mut b).await {
- warn!("cannot write stream: {err}")
+
+ let mut frag = File::open(location.abs()).await?;
+ match container {
+ StreamContainer::WebM => {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) =
+ matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ {
+ warn!("webm transmux failed: {err}");
+ }
+ });
}
- });
- } else {
- let b = SyncIoBridge::new(b);
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- b,
- &CONF.media_path,
- &node,
- &local_track,
- track,
- spec.webm.unwrap_or(false),
- n,
- ) {
- warn!("segment stream error: {err}");
+ StreamContainer::Matroska => {
+ tokio::task::spawn(async move {
+ if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
+ warn!("cannot write stream: {err}")
+ }
+ });
}
- });
+ StreamContainer::MPEG4 => {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) =
+ matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ {
+ warn!("mpeg4 transmux failed: {err}");
+ }
+ });
+ }
+ _ => bail!("unsupported"),
+ }
}
Ok(())