/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ #![feature(iterator_try_collect)] pub mod fragment; pub mod fragment_index; pub mod hls; pub mod stream_info; pub mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; use jellycommon::{ stream::{StreamContainer, StreamSpec}, Node, }; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::{Arc, LazyLock, Mutex}, }; use stream_info::{stream_info, write_stream_info}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; #[rustfmt::skip] #[derive(Debug, Deserialize, Serialize, Default)] pub struct Config { #[serde(default)] pub offer_avc: bool, #[serde(default)] pub offer_hevc: bool, #[serde(default)] pub offer_vp8: bool, #[serde(default)] pub offer_vp9: bool, #[serde(default)] pub offer_av1: bool, } pub static CONF_PRELOAD: Mutex> = Mutex::new(None); static CONF: LazyLock = LazyLock::new(|| { CONF_PRELOAD .lock() .unwrap() .take() .expect("stream config not preloaded. logic error") }); #[derive(Debug)] pub struct SMediaInfo { pub info: Arc, pub files: BTreeSet, } pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } pub fn stream_head(spec: &StreamSpec) -> StreamHead { let cons = |ct: &'static str, rs: bool| StreamHead { content_type: ct, range_supported: rs, }; let container_ct = |x: StreamContainer| match x { StreamContainer::WebM => "video/webm", StreamContainer::Matroska => "video/x-matroska", StreamContainer::WebVTT => "text/vtt", StreamContainer::JVTT => "application/jellything-vtt+json", StreamContainer::MPEG4 => "video/mp4", }; match spec { StreamSpec::Whep { .. } => cons("application/x-todo", false), StreamSpec::WhepControl { .. } => cons("application/x-todo", false), StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), StreamSpec::Original { .. } => cons("video/x-matroska", true), StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( info: Arc, spec: StreamSpec, range: Range, ) -> Result { let (a, b) = duplex(4096); match spec { StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => { hls_supermultivariant_stream(b, info, container).await?; } StreamSpec::HlsMultiVariant { segment, container } => { hls_multivariant_stream(b, info, segment, container).await? } StreamSpec::HlsVariant { segment, track, container, format, } => hls_variant_stream(b, info, segment, track, format, container).await?, StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, StreamSpec::FragmentIndex { segment, track } => { fragment_index_stream(b, info, segment, track).await? } StreamSpec::Fragment { segment, track, index, container, format, } => fragment_stream(b, info, track, segment, index, format, container).await?, _ => bail!("todo"), } Ok(a) } async fn original_stream( info: Arc, track: usize, range: Range, b: DuplexStream, ) -> Result<()> { let (iinfo, _info) = stream_info(info).await?; let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; let mut file = File::open(&iinfo.paths[file_index]) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) .await .context("seek source")?; tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { let mut buf = [0u8; 4096]; loop { let size = inp.read(&mut buf[..amount.min(4096)]).await?; if size == 0 { break Ok(()); } out.write_all(&buf[..size]).await?; amount -= size; } }