/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ #![feature(iterator_try_collect)] pub mod hls; pub mod jhls; pub mod segment; use anyhow::{anyhow, bail, Context, Result}; use hls::{hls_master_stream, hls_variant_stream}; use jellybase::CONF; use jellycommon::{ stream::{StreamFormat, StreamSpec}, LocalTrack, MediaSource, Node, }; use jhls::jhls_stream; use segment::segment_stream; use std::{io::SeekFrom, ops::Range}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; use tokio_util::io::SyncIoBridge; pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } #[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; match spec.format { StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, StreamFormat::Jhls => StreamHead { content_type: "application/jellything-jhls+json", range_supported: false }, StreamFormat::Segment => StreamHead { content_type: webm_or_mkv, range_supported: false }, } } pub async fn stream(node: Node, spec: StreamSpec, range: Range) -> Result { let (a, b) = duplex(4096); let track_sources = match node .private .source .as_ref() .ok_or(anyhow!("node has no media"))? { MediaSource::Local { tracks } => tracks.to_owned(), _ => bail!("node tracks are not local"), }; match spec.format { StreamFormat::Original => original_stream(track_sources, spec, range, b).await?, StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?, StreamFormat::HlsMaster => hls_master_stream(node, track_sources, spec, b).await?, StreamFormat::HlsVariant => hls_variant_stream(node, track_sources, spec, b).await?, StreamFormat::Jhls => jhls_stream(node, track_sources, spec, b).await?, StreamFormat::Segment => segment_stream(node, track_sources, spec, b).await?, } Ok(a) } async fn remux_stream( node: Node, track_sources: Vec, spec: StreamSpec, range: Range, b: DuplexStream, ) -> Result<()> { let b = SyncIoBridge::new(b); tokio::task::spawn_blocking(move || { jellyremuxer::remux_stream_into( b, range, CONF.library_path.to_owned(), node.public, track_sources, spec.tracks, spec.webm.unwrap_or(false), ) }); Ok(()) } async fn original_stream( track_sources: Vec, spec: StreamSpec, range: Range, b: DuplexStream, ) -> Result<()> { if spec.tracks.len() != 1 { bail!("invalid amout of source \"tracks\". original only allows for exactly one.") } let source = track_sources[spec.tracks[0]].clone(); let mut file = File::open(CONF.library_path.join(source.path)) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) .await .context("seek source")?; tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { let mut buf = [0u8; 4096]; loop { let size = inp.read(&mut buf[..amount.min(4096)]).await?; if size == 0 { break Ok(()); } out.write_all(&buf[..size]).await?; amount -= size; } }