aboutsummaryrefslogtreecommitdiff
path: root/stream/src/fragment.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-09-25 04:00:51 +0200
committermetamuffin <metamuffin@disroot.org>2025-09-25 04:00:51 +0200
commitf8d4c438d10450ead56c0082b037e466ef5f9f24 (patch)
tree703612747f06b91f063dcd9e8a778675e6aa1303 /stream/src/fragment.rs
parent0fe48987c336b9b50cba09355aa3c1cf11740edc (diff)
downloadjellything-f8d4c438d10450ead56c0082b037e466ef5f9f24.tar
jellything-f8d4c438d10450ead56c0082b037e466ef5f9f24.tar.bz2
jellything-f8d4c438d10450ead56c0082b037e466ef5f9f24.tar.zst
start media processing refactor
Diffstat (limited to 'stream/src/fragment.rs')
-rw-r--r--stream/src/fragment.rs262
1 files changed, 149 insertions, 113 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index 89ce94f..3b4bb0f 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,44 +3,39 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{stream_info, SMediaInfo};
-use anyhow::{anyhow, bail, Result};
-use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm};
-use jellystream_types::StreamContainer;
-use jellytranscoder::fragment::transcode;
-use log::warn;
+use crate::{cues::generate_cues, stream_info, SMediaInfo};
+use anyhow::{anyhow, Result};
+use jellyremuxer::{
+ demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat,
+};
+use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum};
use std::{
- io::{Cursor, Seek, SeekFrom},
+ fs::File,
+ io::{Cursor, Read},
sync::Arc,
};
-use tokio::{fs::File, io::DuplexStream};
-use tokio_util::io::SyncIoBridge;
-pub async fn fragment_stream(
- mut b: DuplexStream,
+pub fn fragment_stream(
info: Arc<SMediaInfo>,
- track: usize,
- segment: usize,
- index: usize,
- format_num: usize,
+ track: TrackNum,
+ index: IndexNum,
+ format_num: FormatNum,
container: StreamContainer,
-) -> Result<()> {
- let (iinfo, info) = stream_info(info).await?;
+) -> Result<Box<dyn Read + Send + Sync>> {
+ let (iinfo, info) = stream_info(info)?;
+
let (file_index, track_num) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("track not found"))?;
- let path = iinfo.paths[file_index].clone();
- let seg = info
- .segments
- .get(segment)
- .ok_or(anyhow!("segment not found"))?;
- let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?;
+ let media_path = iinfo.paths[file_index].clone();
+ let track = info.tracks.get(track).ok_or(anyhow!("track not found"))?;
let format = track
.formats
.get(format_num)
.ok_or(anyhow!("format not found"))?;
- let orig_track = iinfo.metadata[file_index]
+
+ let mk_track = iinfo.metadata[file_index]
.tracks
.as_ref()
.unwrap()
@@ -49,97 +44,138 @@ pub async fn fragment_stream(
.find(|t| t.track_number == track_num)
.unwrap();
+ let mk_info = matroska::Info {
+ duration: Some(info.duration),
+ timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale,
+ ..Default::default()
+ };
+ let mk_tracks = matroska::Tracks {
+ entries: vec![mk_track.to_owned()],
+ };
+
+ let cue_stat = generate_cues(&media_path)?;
+ let cluster_offset = cue_stat
+ .cues
+ .get(index)
+ .ok_or(anyhow!("fragment index out of range"))?
+ .position;
+
+ let cluster = {
+ let media_file = File::open(media_path)?;
+ let mut media = create_demuxer_autodetect(Box::new(media_file))?
+ .ok_or(anyhow!("media container unknown"))?;
+ media.seek_cluster(Some(cluster_offset))?;
+ media
+ .read_cluster()?
+ .ok_or(anyhow!("cluster unexpectedly missing"))?
+ .1
+ };
+
+ let jr_container = match container {
+ StreamContainer::WebM => ContainerFormat::Webm,
+ StreamContainer::Matroska => ContainerFormat::Matroska,
+ StreamContainer::WebVTT => todo!(),
+ StreamContainer::MPEG4 => todo!(),
+ StreamContainer::JVTT => todo!(),
+ };
+
if format.remux {
- match container {
- StreamContainer::WebM | StreamContainer::Matroska => {
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- SyncIoBridge::new(b),
- &path,
- track_num,
- container == StreamContainer::WebM,
- &info.name.unwrap_or_default(),
- index,
- ) {
- warn!("segment stream error: {err}");
- }
- });
- }
- StreamContainer::MPEG4 => {
- tokio::task::spawn_blocking(move || {
- let mut buf = Cursor::new(Vec::new());
- if let Err(err) = jellyremuxer::write_fragment_into(
- &mut buf,
- &path,
- track_num,
- false,
- &info.name.unwrap_or_default(),
- index,
- ) {
- warn!("segment stream error: {err}");
- }
- buf.seek(SeekFrom::Start(0)).unwrap();
- if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) {
- warn!("mpeg4 transmux failed: {err}");
- }
- });
- }
- StreamContainer::JVTT => {}
- _ => bail!("not yet supported"),
- }
+ let mut out = Vec::new();
+ write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?;
+ Ok(Box::new(Cursor::new(out)))
} else {
- let location = transcode(
- track.kind,
- orig_track,
- format,
- &format!("{path:?} {track_num} {index}"),
- move |b| {
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- SyncIoBridge::new(b),
- &path,
- track_num,
- false,
- &info.name.unwrap_or_default(),
- index,
- ) {
- warn!("segment stream error: {err}");
- }
- });
- },
- )
- .await?;
-
- let mut frag = File::open(location.abs()).await?;
- match container {
- StreamContainer::WebM => {
- tokio::task::spawn_blocking(move || {
- if let Err(err) =
- matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
- {
- warn!("webm transmux failed: {err}");
- }
- });
- }
- StreamContainer::Matroska => {
- tokio::task::spawn(async move {
- if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
- warn!("cannot write stream: {err}")
- }
- });
- }
- StreamContainer::MPEG4 => {
- tokio::task::spawn_blocking(move || {
- if let Err(err) =
- matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
- {
- warn!("mpeg4 transmux failed: {err}");
- }
- });
- }
- _ => bail!("unsupported"),
- }
+ todo!()
}
- Ok(())
+ // if format.remux {
+ // match container {
+ // StreamContainer::WebM | StreamContainer::Matroska => {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // SyncIoBridge::new(b),
+ // &path,
+ // track_num,
+ // container == StreamContainer::WebM,
+ // &info.name.unwrap_or_default(),
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // });
+ // }
+ // StreamContainer::MPEG4 => {
+ // tokio::task::spawn_blocking(move || {
+ // let mut buf = Cursor::new(Vec::new());
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // &mut buf,
+ // &path,
+ // track_num,
+ // false,
+ // &info.name.unwrap_or_default(),
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // buf.seek(SeekFrom::Start(0)).unwrap();
+ // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) {
+ // warn!("mpeg4 transmux failed: {err}");
+ // }
+ // });
+ // }
+ // StreamContainer::JVTT => {}
+ // _ => bail!("not yet supported"),
+ // }
+ // } else {
+ // let location = transcode(
+ // track.kind,
+ // orig_track,
+ // format,
+ // &format!("{path:?} {track_num} {index}"),
+ // move |b| {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // SyncIoBridge::new(b),
+ // &path,
+ // track_num,
+ // false,
+ // &info.name.unwrap_or_default(),
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // });
+ // },
+ // )
+ // .await?;
+
+ // let mut frag = File::open(location.abs()).await?;
+ // match container {
+ // StreamContainer::WebM => {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) =
+ // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ // {
+ // warn!("webm transmux failed: {err}");
+ // }
+ // });
+ // }
+ // StreamContainer::Matroska => {
+ // tokio::task::spawn(async move {
+ // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
+ // warn!("cannot write stream: {err}")
+ // }
+ // });
+ // }
+ // StreamContainer::MPEG4 => {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) =
+ // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ // {
+ // warn!("mpeg4 transmux failed: {err}");
+ // }
+ // });
+ // }
+ // _ => bail!("unsupported"),
+ // }
+ // }
}