/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, bail, Result}; use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; use jellystream_types::StreamContainer; use jellytranscoder::fragment::transcode; use log::warn; use std::{ io::{Cursor, Seek, SeekFrom}, sync::Arc, }; use tokio::{fs::File, io::DuplexStream}; use tokio_util::io::SyncIoBridge; pub async fn fragment_stream( mut b: DuplexStream, info: Arc, track: usize, segment: usize, index: usize, format_num: usize, container: StreamContainer, ) -> Result<()> { let (iinfo, info) = stream_info(info).await?; let (file_index, track_num) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("track not found"))?; let path = iinfo.paths[file_index].clone(); let seg = info .segments .get(segment) .ok_or(anyhow!("segment not found"))?; let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; let format = track .formats .get(format_num) .ok_or(anyhow!("format not found"))?; let orig_track = iinfo.metadata[file_index] .tracks .as_ref() .unwrap() .entries .iter() .find(|t| t.track_number == track_num) .unwrap(); if format.remux { match container { StreamContainer::WebM | StreamContainer::Matroska => { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( SyncIoBridge::new(b), &path, track_num, container == StreamContainer::WebM, &info.name.unwrap_or_default(), index, ) { warn!("segment stream error: {err}"); } }); } StreamContainer::MPEG4 => { tokio::task::spawn_blocking(move || { let mut buf = Cursor::new(Vec::new()); if let Err(err) = jellyremuxer::write_fragment_into( &mut buf, &path, track_num, false, &info.name.unwrap_or_default(), index, ) { warn!("segment stream error: {err}"); } buf.seek(SeekFrom::Start(0)).unwrap(); if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { warn!("mpeg4 transmux failed: {err}"); } }); } StreamContainer::JVTT => {} _ => bail!("not yet supported"), } } else { let location = transcode( track.kind, orig_track, format, &format!("{path:?} {track_num} {index}"), move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( SyncIoBridge::new(b), &path, track_num, false, &info.name.unwrap_or_default(), index, ) { warn!("segment stream error: {err}"); } }); }, ) .await?; let mut frag = File::open(location.abs()).await?; match container { StreamContainer::WebM => { tokio::task::spawn_blocking(move || { if let Err(err) = matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) { warn!("webm transmux failed: {err}"); } }); } StreamContainer::Matroska => { tokio::task::spawn(async move { if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { warn!("cannot write stream: {err}") } }); } StreamContainer::MPEG4 => { tokio::task::spawn_blocking(move || { if let Err(err) = matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) { warn!("mpeg4 transmux failed: {err}"); } }); } _ => bail!("unsupported"), } } Ok(()) }