From 3b147cb1dfcbd5c7218e0accd5784d992d5ae21c Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 14 Apr 2025 18:42:16 +0200 Subject: things --- common/src/config.rs | 17 ++++++++-------- common/src/stream.rs | 2 +- server/src/routes/stream.rs | 4 ++-- stream/src/fragment.rs | 1 + stream/src/hls.rs | 26 ++++++++++++++++++++++++- stream/src/lib.rs | 38 +++++++----------------------------- stream/src/stream_info.rs | 6 +++--- transcoder/src/fragment.rs | 47 +++++++++++++++++++++++++++++++++++++-------- 8 files changed, 87 insertions(+), 54 deletions(-) diff --git a/common/src/config.rs b/common/src/config.rs index 3a48fea..df16ef0 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -29,19 +29,20 @@ pub struct GlobalConfig { #[serde(default)] pub default_permission_set: PermissionSet, #[serde(default)] - pub encoders: EncoderPreferences, + pub encoders: EncoderArgs, } #[derive(Debug, Deserialize, Serialize, Default)] -pub struct EncoderPreferences { - pub avc: Option, - pub hevc: Option, - pub vp8: Option, - pub vp9: Option, - pub av1: Option, +pub struct EncoderArgs { + pub avc: Option, + pub hevc: Option, + pub vp8: Option, + pub vp9: Option, + pub av1: Option, + pub generic: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum EncoderClass { Aom, diff --git a/common/src/stream.rs b/common/src/stream.rs index 555a5d0..9fd7daf 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -76,7 +76,7 @@ pub struct StreamTrackInfo { pub formats: Vec, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Copy, Clone, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub enum TrackKind { Video, diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 8f97aec..0fbeb3a 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -42,8 +42,8 @@ pub async fn r_stream_head( #[get("/n//stream?")] pub async fn r_stream( - session: Session, - federation: &State, + _session: Session, + _federation: &State, db: &State, id: &str, range: Option, diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e0644aa..26746fc 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -53,6 +53,7 @@ pub async fn fragment_stream( } else { let location = transcode( &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source + track.kind, format, container, move |b| { diff --git a/stream/src/hls.rs b/stream/src/hls.rs index f06ac72..3dfbf01 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -15,7 +15,31 @@ use tokio::{ task::spawn_blocking, }; -pub async fn hls_master_stream( +pub async fn hls_supermultivariant_stream( + mut b: DuplexStream, + info: Arc, + container: StreamContainer, +) -> Result<()> { + let (_iinfo, info) = stream_info(info).await?; + let mut out = String::new(); + writeln!(out, "#EXTM3U")?; + writeln!(out, "#EXT-X-VERSION:4")?; + for (i, _seg) in info.segments.iter().enumerate() { + let uri = format!( + "stream{}", + StreamSpec::HlsMultiVariant { + segment: i, + container, + } + .to_query() + ); + writeln!(out, "{uri}")?; + } + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} + +pub async fn hls_multivariant_stream( mut b: DuplexStream, info: Arc, segment: SegmentNum, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 18ad2a7..4df87ae 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -10,10 +10,10 @@ pub mod hls; pub mod stream_info; pub mod webvtt; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; -use hls::{hls_master_stream, hls_variant_stream}; +use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; use jellybase::common::{ stream::{StreamContainer, StreamSpec}, Node, @@ -24,7 +24,6 @@ use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; -use tokio_util::io::SyncIoBridge; #[derive(Debug)] pub struct SMediaInfo { @@ -71,13 +70,12 @@ pub async fn stream( let (a, b) = duplex(4096); match spec { - StreamSpec::Whep { track, seek } => todo!(), - StreamSpec::WhepControl { token } => todo!(), - StreamSpec::Remux { tracks, container } => todo!(), StreamSpec::Original { track } => original_stream(info, track, range, b).await?, - StreamSpec::HlsSuperMultiVariant { container } => todo!(), + StreamSpec::HlsSuperMultiVariant { container } => { + hls_supermultivariant_stream(b, info, container).await?; + } StreamSpec::HlsMultiVariant { segment, container } => { - hls_master_stream(b, info, segment, container).await? + hls_multivariant_stream(b, info, segment, container).await? } StreamSpec::HlsVariant { segment, @@ -96,34 +94,12 @@ pub async fn stream( container, format, } => fragment_stream(b, info, track, segment, index, format, container).await?, + _ => bail!("todo"), } Ok(a) } -async fn remux_stream( - node: Arc, - spec: StreamSpec, - range: Range, - b: DuplexStream, -) -> Result<()> { - let b = SyncIoBridge::new(b); - - // tokio::task::spawn_blocking(move || { - // jellyremuxer::remux_stream_into( - // b, - // range, - // CONF.media_path.to_owned(), - // &node, - // local_tracks, - // spec.track, - // spec.webm.unwrap_or(false), - // ) - // }); - - Ok(()) -} - async fn original_stream( info: Arc, track: usize, diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index 9d3d741..a8b6989 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -23,7 +23,7 @@ async fn async_matroska_metadata(path: PathBuf) -> Result> pub(crate) struct InternalStreamInfo { pub paths: Vec, - pub metadata: Vec>, + pub _metadata: Vec>, pub track_to_file: Vec<(usize, u64)>, } @@ -67,7 +67,7 @@ pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStream }; Ok(( InternalStreamInfo { - metadata, + _metadata: metadata, paths, track_to_file, }, @@ -83,7 +83,7 @@ fn stream_formats(t: &TrackEntry) -> Vec { formats.push(StreamFormatInfo { codec: t.codec_id.to_string(), remux: true, - bitrate: 2_000_000., // TODO + bitrate: 10_000_000., // TODO containers: containers_by_codec(&t.codec_id), bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 3cb4c40..1d06e9a 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -7,7 +7,8 @@ use crate::LOCAL_VIDEO_TRANSCODING_TASKS; use jellybase::{ cache::{async_cache_file, CachePath}, - common::stream::{StreamContainer, StreamFormatInfo}, + common::stream::{StreamContainer, StreamFormatInfo, TrackKind}, + CONF, }; use log::{debug, info}; use std::process::Stdio; @@ -21,6 +22,7 @@ use tokio::{ pub async fn transcode( key: &str, + kind: TrackKind, format: &StreamFormatInfo, container: StreamContainer, input: impl FnOnce(ChildStdin), @@ -31,15 +33,44 @@ pub async fn transcode( let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; debug!("transcoding fragment with {format:?}"); - let mut args = Vec::::new(); - - match format.codec.as_str() { - "V_AVC" => {} + let template = match format.codec.as_str() { + "V_AVC" => CONF.encoders.avc.as_ref(), + "V_HEVC" => CONF.encoders.hevc.as_ref(), + "V_VP8" => CONF.encoders.vp8.as_ref(), + "V_VP9" => CONF.encoders.vp9.as_ref(), + "V_AV1" => CONF.encoders.av1.as_ref(), + _ => None, + } + .or(CONF.encoders.generic.as_ref()) + .cloned() + .unwrap_or("ffmpeg %i %f %e %o".to_owned()); + let filter = match kind { + TrackKind::Video => format!("-vf scale={}:-1", format.width.unwrap()), + TrackKind::Audio => format!(""), + TrackKind::Subtitle => String::new(), + }; + let typechar = match kind { + TrackKind::Video => "v", + TrackKind::Audio => "a", + TrackKind::Subtitle => "s", + }; + let fallback_encoder = match format.codec.as_str() { + "A_OPUS" => "libopus", _ => unreachable!(), - } + }; + + let args = template + .replace("%i", "-f matroska -i pipe:0") + .replace("%o", "-f %C pipe:1") + .replace("%f", &filter) + .replace("%e", "-c:%t %c -b:%t %r") + .replace("%t", typechar) + .replace("%c", fallback_encoder) + .replace("%r", &(format.bitrate as i64).to_string()) + .replace("%C", &container.to_string()); - info!("encoding with {:?}", args.join(" ")); + info!("encoding with {:?}", args); let container = match container { StreamContainer::WebM => "webm", @@ -53,7 +84,7 @@ pub async fn transcode( .stdin(Stdio::piped()) .stdout(Stdio::piped()) .args(["-f", "matroska", "-i", "pipe:0"]) - .args(args) + .args(args.split(" ")) .args(["-f", container, "pipe:1"]) .spawn()?; -- cgit v1.2.3-70-g09d2