/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ #![feature(iterator_try_collect)] pub mod fragment; pub mod fragment_index; pub mod hls; pub mod stream_info; pub mod webvtt; use anyhow::{anyhow, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; use hls::{hls_master_stream, hls_variant_stream}; use jellybase::common::{ stream::{StreamContainer, StreamSpec}, Node, }; use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; use stream_info::{stream_info, write_stream_info}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; use tokio_util::io::SyncIoBridge; #[derive(Debug)] pub struct SMediaInfo { pub info: Arc, pub files: BTreeSet, } pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } pub fn stream_head(spec: &StreamSpec) -> StreamHead { let cons = |ct: &'static str, rs: bool| StreamHead { content_type: ct, range_supported: rs, }; let container_ct = |x: StreamContainer| match x { StreamContainer::WebM => "video/webm", StreamContainer::Matroska => "video/x-matroska", StreamContainer::WebVTT => "text/vtt", StreamContainer::JVTT => "application/jellything-vtt+json", StreamContainer::MPEG4 => "video/mp4", }; match spec { StreamSpec::Whep { .. } => cons("application/x-todo", false), StreamSpec::WhepControl { .. } => cons("application/x-todo", false), StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), StreamSpec::Original { .. } => cons("video/x-matroska", true), StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( info: Arc, spec: StreamSpec, range: Range, ) -> Result { let (a, b) = duplex(4096); match spec { StreamSpec::Whep { track, seek } => todo!(), StreamSpec::WhepControl { token } => todo!(), StreamSpec::Remux { tracks, container } => todo!(), StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => todo!(), StreamSpec::HlsMultiVariant { segment, container } => { hls_master_stream(b, info, segment, container).await? } StreamSpec::HlsVariant { segment, track, container, format, } => hls_variant_stream(b, info, segment, track, format, container).await?, StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, StreamSpec::FragmentIndex { segment, track } => { fragment_index_stream(b, info, segment, track).await? } StreamSpec::Fragment { segment, track, index, container, format, } => fragment_stream(b, info, track, segment, index, format, container).await?, } Ok(a) } async fn remux_stream( node: Arc, spec: StreamSpec, range: Range, b: DuplexStream, ) -> Result<()> { let b = SyncIoBridge::new(b); // tokio::task::spawn_blocking(move || { // jellyremuxer::remux_stream_into( // b, // range, // CONF.media_path.to_owned(), // &node, // local_tracks, // spec.track, // spec.webm.unwrap_or(false), // ) // }); Ok(()) } async fn original_stream( info: Arc, track: usize, range: Range, b: DuplexStream, ) -> Result<()> { let (iinfo, _info) = stream_info(info).await?; let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; let mut file = File::open(&iinfo.paths[file_index]) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) .await .context("seek source")?; tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { let mut buf = [0u8; 4096]; loop { let size = inp.read(&mut buf[..amount.min(4096)]).await?; if size == 0 { break Ok(()); } out.write_all(&buf[..size]).await?; amount -= size; } }