/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ #![feature(iterator_try_collect)] pub mod fragment; pub mod hls; pub mod jhls; pub mod webvtt; use anyhow::{anyhow, Context, Result}; use jellybase::common::{ stream::{ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, StreamTrackInfo, TrackKind, }, LocalTrack, Node, }; use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; #[derive(Debug)] pub struct SMediaInfo { pub info: Arc, pub files: BTreeSet, } pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } pub fn stream_head(spec: &StreamSpec) -> StreamHead { let cons = |ct: &'static str, rs: bool| StreamHead { content_type: ct, range_supported: rs, }; let container_ct = |x: StreamContainer| match x { StreamContainer::WebM => "video/webm", StreamContainer::Matroska => "video/x-matroska", StreamContainer::WebVTT => "text/vtt", StreamContainer::JVTT => "application/jellything-vtt+json", }; match spec { StreamSpec::Whep { .. } => cons("application/x-todo", false), StreamSpec::WhepControl { .. } => cons("application/x-todo", false), StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), StreamSpec::Original { .. } => cons("video/x-matroska", true), StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( info: Arc, spec: StreamSpec, range: Range, ) -> Result { let (a, b) = duplex(4096); match spec { StreamSpec::Whep { track, seek } => todo!(), StreamSpec::WhepControl { token } => todo!(), StreamSpec::Remux { tracks, container } => todo!(), StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => todo!(), StreamSpec::HlsMultiVariant { segment, container } => todo!(), StreamSpec::HlsVariant { segment, track, container, format, } => todo!(), StreamSpec::Info { segment } => write_stream_info(info, b).await?, StreamSpec::FragmentIndex { segment, track } => todo!(), StreamSpec::Fragment { segment, track, index, container, format, } => todo!(), } Ok(a) } async fn async_matroska_metadata(path: PathBuf) -> Result> { Ok(spawn_blocking(move || matroska_metadata(&path)).await??) } pub(crate) struct InternalStreamInfo { pub paths: Vec, pub metadata: Vec>, pub track_to_file: Vec, } async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { let mut metadata = Vec::new(); let mut paths = Vec::new(); for path in &info.files { metadata.push(async_matroska_metadata(path.clone()).await?); paths.push(path.clone()); } let mut tracks = Vec::new(); let mut track_to_file = Vec::new(); for (i, m) in metadata.iter().enumerate() { if let Some(t) = &m.tracks { for t in &t.entries { let mut formats = Vec::new(); formats.push(StreamFormatInfo { codec: t.codec_id.to_string(), remux: true, byterate: 10., // TODO containers: [StreamContainer::Matroska].to_vec(), bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), channels: t.audio.as_ref().map(|a| a.channels as usize), pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height), ..Default::default() }); tracks.push(StreamTrackInfo { name: None, kind: match t.track_type { 1 => TrackKind::Video, 2 => TrackKind::Audio, 17 => TrackKind::Subtitle, _ => todo!(), }, formats, }); track_to_file.push(i); } } } let segment = StreamSegmentInfo { name: None, duration: 0, tracks, }; Ok(( InternalStreamInfo { metadata, paths, track_to_file, }, StreamInfo { name: info.info.title.clone(), segments: vec![segment], }, )) } async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { let (_, info) = stream_info(info).await?; b.write_all(&serde_json::to_vec(&info)?).await?; Ok(()) } async fn remux_stream( node: Arc, local_tracks: Vec, spec: StreamSpec, range: Range, b: DuplexStream, ) -> Result<()> { let b = SyncIoBridge::new(b); // tokio::task::spawn_blocking(move || { // jellyremuxer::remux_stream_into( // b, // range, // CONF.media_path.to_owned(), // &node, // local_tracks, // spec.track, // spec.webm.unwrap_or(false), // ) // }); Ok(()) } async fn original_stream( info: Arc, track: usize, range: Range, b: DuplexStream, ) -> Result<()> { let (iinfo, _info) = stream_info(info).await?; let file_index = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; let mut file = File::open(&iinfo.paths[file_index]) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) .await .context("seek source")?; tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { let mut buf = [0u8; 4096]; loop { let size = inp.read(&mut buf[..amount.min(4096)]).await?; if size == 0 { break Ok(()); } out.write_all(&buf[..size]).await?; amount -= size; } }