/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ #![feature(iterator_try_collect)] pub mod hls; pub mod jhls; pub mod segment; pub mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use hls::{hls_master_stream, hls_variant_stream}; use jellybase::{permission::PermissionSetExt, CONF}; use jellycommon::{ stream::{StreamFormat, StreamSpec}, user::{PermissionSet, UserPermission}, LocalTrack, Node, TrackSource, }; use jhls::jhls_index; use segment::segment_stream; use std::{io::SeekFrom, ops::Range}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; use tokio_util::io::SyncIoBridge; use webvtt::webvtt_stream; pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } #[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; match spec.format { StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, StreamFormat::Snippet => StreamHead { content_type: webm_or_mkv, range_supported: false }, } } pub async fn stream( node: Node, spec: StreamSpec, range: Range, perms: &PermissionSet, ) -> Result { perms.assert(&UserPermission::StreamFormat(spec.format))?; let (a, b) = duplex(4096); // TODO remux of mixed remote and local tracks?! let track_sources = node .private .source .to_owned() .ok_or(anyhow!("node has no media"))?; let local_tracks = spec .tracks .iter() .map(|i| { anyhow::Ok( match track_sources .get(*i) .ok_or(anyhow!("track does not exist"))? { TrackSource::Local(t) => t.to_owned(), TrackSource::Remote(_) => bail!("track is not local"), }, ) }) .collect::>>()? .into_iter() .collect::>(); match spec.format { StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, StreamFormat::Snippet => segment_stream(node, local_tracks, spec, b, perms).await?, StreamFormat::Webvtt => webvtt_stream(node, local_tracks, spec, b).await?, } Ok(a) } async fn remux_stream( node: Node, local_tracks: Vec, spec: StreamSpec, range: Range, b: DuplexStream, ) -> Result<()> { let b = SyncIoBridge::new(b); tokio::task::spawn_blocking(move || { jellyremuxer::remux_stream_into( b, range, CONF.library_path.to_owned(), node.public, local_tracks, spec.tracks, spec.webm.unwrap_or(false), ) }); Ok(()) } async fn original_stream( local_tracks: Vec, spec: StreamSpec, range: Range, b: DuplexStream, ) -> Result<()> { if spec.tracks.len() != 1 { bail!("invalid amout of source \"tracks\". original only allows for exactly one.") } let source = local_tracks[spec.tracks[0]].clone(); let mut file = File::open(CONF.library_path.join(source.path)) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) .await .context("seek source")?; tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { let mut buf = [0u8; 4096]; loop { let size = inp.read(&mut buf[..amount.min(4096)]).await?; if size == 0 { break Ok(()); } out.write_all(&buf[..size]).await?; amount -= size; } }