diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-16 20:06:01 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-16 20:06:01 +0200 |
commit | d26849375c70c795fdf664f9dfea68c273b6d483 (patch) | |
tree | 53ad4f0eff3604e80b27ff0abf0438ea6c69d432 /stream | |
parent | 1cd966f7454f052fda6c6c9ae1597479f05e23d9 (diff) | |
parent | cdf95d7b80bd2b78895671da8f462145bb5db522 (diff) | |
download | jellything-d26849375c70c795fdf664f9dfea68c273b6d483.tar jellything-d26849375c70c795fdf664f9dfea68c273b6d483.tar.bz2 jellything-d26849375c70c795fdf664f9dfea68c273b6d483.tar.zst |
Merge branch 'rewrite-stream'
Diffstat (limited to 'stream')
-rw-r--r-- | stream/Cargo.toml | 2 | ||||
-rw-r--r-- | stream/src/fragment.rs | 126 | ||||
-rw-r--r-- | stream/src/fragment_index.rs | 32 | ||||
-rw-r--r-- | stream/src/hls.rs | 110 | ||||
-rw-r--r-- | stream/src/jhls.rs | 47 | ||||
-rw-r--r-- | stream/src/lib.rs | 167 | ||||
-rw-r--r-- | stream/src/stream_info.rs | 169 | ||||
-rw-r--r-- | stream/src/webvtt.rs | 116 |
8 files changed, 499 insertions, 270 deletions
diff --git a/stream/Cargo.toml b/stream/Cargo.toml index 36979c9..21d1650 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" jellybase = { path = "../base", features = ["rocket"] } jellytranscoder = { path = "../transcoder" } jellyremuxer = { path = "../remuxer" } +jellymatroska = { path = "../matroska" } +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" } log = { workspace = true } anyhow = { workspace = true } diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index e276d29..dfe101e 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,16 +3,10 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, bail, Result}; -use jellybase::{ - common::{ - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; +use jellybase::common::stream::StreamContainer; +use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; use jellytranscoder::fragment::transcode; use log::warn; use std::sync::Arc; @@ -20,40 +14,57 @@ use tokio::{fs::File, io::DuplexStream}; use tokio_util::io::SyncIoBridge; pub async fn fragment_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, mut b: DuplexStream, - perms: &PermissionSet, + info: Arc<SMediaInfo>, + track: usize, + segment: usize, + index: usize, + format_num: usize, + container: StreamContainer, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("unsupported number of tracks for segment, must be exactly one"); - } - let track = spec.track[0]; - let n = spec.index.ok_or(anyhow!("segment index missing"))?; - - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let path = iinfo.paths[file_index].clone(); + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; + let format = track + .formats + .get(format_num) + .ok_or(anyhow!("format not found"))?; - if let Some(profile) = spec.profile { - perms.assert(&UserPermission::Transcode)?; + if format.remux { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + SyncIoBridge::new(b), + &path, + track_num, + container == StreamContainer::WebM, + &info.name.unwrap_or_default(), + index, + ) { + warn!("segment stream error: {err}"); + } + }); + } else { let location = transcode( - &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source - CONF.transcoding_profiles - .get(profile) - .ok_or(anyhow!("profile out of range"))?, + &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source + track.kind, + format, move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( SyncIoBridge::new(b), - &CONF.media_path, - &node, - &local_track, - track, + &path, + track_num, false, - n, + &info.name.unwrap_or_default(), + index, ) { warn!("segment stream error: {err}"); } @@ -61,27 +72,36 @@ pub async fn fragment_stream( }, ) .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") + + let mut frag = File::open(location.abs()).await?; + match container { + StreamContainer::WebM => { + tokio::task::spawn_blocking(move || { + if let Err(err) = + matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + { + warn!("webm transmux failed: {err}"); + } + }); } - }); - } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - b, - &CONF.media_path, - &node, - &local_track, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); + StreamContainer::Matroska => { + tokio::task::spawn(async move { + if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { + warn!("cannot write stream: {err}") + } + }); } - }); + StreamContainer::MPEG4 => { + tokio::task::spawn_blocking(move || { + if let Err(err) = + matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + { + warn!("mpeg4 transmux failed: {err}"); + } + }); + } + _ => bail!("unsupported"), + } } Ok(()) diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs new file mode 100644 index 0000000..6fbddc6 --- /dev/null +++ b/stream/src/fragment_index.rs @@ -0,0 +1,32 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use crate::{stream_info, SMediaInfo}; +use anyhow::{anyhow, Result}; +use jellybase::common::stream::{SegmentNum, TrackNum}; +use std::sync::Arc; +use tokio::io::{AsyncWriteExt, DuplexStream}; + +pub async fn fragment_index_stream( + mut b: DuplexStream, + info: Arc<SMediaInfo>, + _segment: SegmentNum, + track: TrackNum, +) -> Result<()> { + let (iinfo, _info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + + let fragments = tokio::task::spawn_blocking(move || { + jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) + }) + .await??; + + let out = serde_json::to_string(&fragments)?; + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} diff --git a/stream/src/hls.rs b/stream/src/hls.rs index dca1036..3dfbf01 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -4,13 +4,10 @@ Copyright (C) 2025 metamuffin <metamuffin.org> */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - stream::{StreamFormat, StreamSpec}, - LocalTrack, Node, SourceTrackKind, - }, - CONF, +use jellybase::common::stream::{ + FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum, }; use std::{fmt::Write, ops::Range, sync::Arc}; use tokio::{ @@ -18,32 +15,63 @@ use tokio::{ task::spawn_blocking, }; -pub async fn hls_master_stream( - node: Arc<Node>, - _local_tracks: Vec<LocalTrack>, - _spec: StreamSpec, +pub async fn hls_supermultivariant_stream( mut b: DuplexStream, + info: Arc<SMediaInfo>, + container: StreamContainer, ) -> Result<()> { - let media = node.media.as_ref().ok_or(anyhow!("no media"))?; + let (_iinfo, info) = stream_info(info).await?; + let mut out = String::new(); + writeln!(out, "#EXTM3U")?; + writeln!(out, "#EXT-X-VERSION:4")?; + for (i, _seg) in info.segments.iter().enumerate() { + let uri = format!( + "stream{}", + StreamSpec::HlsMultiVariant { + segment: i, + container, + } + .to_query() + ); + writeln!(out, "{uri}")?; + } + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} + +pub async fn hls_multivariant_stream( + mut b: DuplexStream, + info: Arc<SMediaInfo>, + segment: SegmentNum, + container: StreamContainer, +) -> Result<()> { + let (_iinfo, info) = stream_info(info).await?; + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-VERSION:4")?; // writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?; - for (i, t) in media.tracks.iter().enumerate() { + for (i, t) in seg.tracks.iter().enumerate() { let uri = format!( - "stream?{}", - StreamSpec { - track: vec![i], - format: StreamFormat::HlsVariant, - ..Default::default() + "stream{}", + StreamSpec::HlsVariant { + segment, + track: i, + container, + format: 0 } .to_query() ); let r#type = match t.kind { - SourceTrackKind::Video { .. } => "VIDEO", - SourceTrackKind::Audio { .. } => "AUDIO", - SourceTrackKind::Subtitles => "SUBTITLES", + TrackKind::Video => "VIDEO", + TrackKind::Audio => "AUDIO", + TrackKind::Subtitle => "SUBTITLES", }; + // TODO bw writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?; writeln!(out, "{uri}")?; } @@ -52,31 +80,49 @@ pub async fn hls_master_stream( } pub async fn hls_variant_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - mut spec: StreamSpec, mut b: DuplexStream, + info: Arc<SMediaInfo>, + segment: SegmentNum, + track: TrackNum, + format: FormatNum, + container: StreamContainer, ) -> Result<()> { - let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned(); - let track_index = spec.track[0]; - let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?; + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, track_index) + jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) }) .await??; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?; - writeln!(out, "#EXT-X-TARGETDURATION:{}", media_info.duration)?; + writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?; writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - spec.format = StreamFormat::Fragment; - for (i, Range { start, end }) in frags.iter().enumerate() { + for (index, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; - spec.index = Some(i); - writeln!(out, "stream?{}", spec.to_query())?; + writeln!( + out, + "stream{}", + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } + .to_query() + )?; } writeln!(out, "#EXT-X-ENDLIST")?; diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs deleted file mode 100644 index b222e39..0000000 --- a/stream/src/jhls.rs +++ /dev/null @@ -1,47 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin <metamuffin.org> -*/ -use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - jhls::JhlsTrackIndex, - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; -use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; - -pub async fn jhls_index( - node: Arc<Node>, - local_tracks: &[LocalTrack], - spec: StreamSpec, - mut b: DuplexStream, - perms: &PermissionSet, -) -> Result<()> { - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); - - let fragments = tokio::task::spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) - }) - .await??; - - let out = serde_json::to_string(&JhlsTrackIndex { - extra_profiles: if perms.check(&UserPermission::Transcode) { - CONF.transcoding_profiles.clone() - } else { - vec![] - }, - fragments, - })?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) -} diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 00338c1..4df87ae 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -5,136 +5,113 @@ */ #![feature(iterator_try_collect)] pub mod fragment; +pub mod fragment_index; pub mod hls; -pub mod jhls; +pub mod stream_info; pub mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; -use hls::{hls_master_stream, hls_variant_stream}; -use jellybase::{ - assetfed::AssetInner, - common::{ - stream::{StreamFormat, StreamSpec}, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, TrackSource, - }, - permission::PermissionSetExt, - CONF, +use fragment_index::fragment_index_stream; +use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; +use jellybase::common::{ + stream::{StreamContainer, StreamSpec}, + Node, }; -use jhls::jhls_index; -use std::{io::SeekFrom, ops::Range, sync::Arc}; +use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; +use stream_info::{stream_info, write_stream_info}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, }; -use tokio_util::io::SyncIoBridge; -use webvtt::vtt_stream; + +#[derive(Debug)] +pub struct SMediaInfo { + pub info: Arc<Node>, + pub files: BTreeSet<PathBuf>, +} pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, } -#[rustfmt::skip] pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" }; - match spec.format { - StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true }, - StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true }, - StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, - StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, - StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, - StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false }, - StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false }, + let cons = |ct: &'static str, rs: bool| StreamHead { + content_type: ct, + range_supported: rs, + }; + let container_ct = |x: StreamContainer| match x { + StreamContainer::WebM => "video/webm", + StreamContainer::Matroska => "video/x-matroska", + StreamContainer::WebVTT => "text/vtt", + StreamContainer::JVTT => "application/jellything-vtt+json", + StreamContainer::MPEG4 => "video/mp4", + }; + match spec { + StreamSpec::Whep { .. } => cons("application/x-todo", false), + StreamSpec::WhepControl { .. } => cons("application/x-todo", false), + StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), + StreamSpec::Original { .. } => cons("video/x-matroska", true), + StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), + StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), + StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), + StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), } } pub async fn stream( - node: Arc<Node>, + info: Arc<SMediaInfo>, spec: StreamSpec, range: Range<usize>, - perms: &PermissionSet, ) -> Result<DuplexStream> { - perms.assert(&UserPermission::StreamFormat(spec.format))?; - let (a, b) = duplex(4096); - // TODO remux of mixed remote and local tracks?! - let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?; - - let local_tracks = spec - .track - .iter() - .map(|i| { - anyhow::Ok( - match &track_sources - .tracks - .get(*i) - .ok_or(anyhow!("track does not exist"))? - .source - { - TrackSource::Local(t) => AssetInner::deser(&t.0)? - .as_local_track() - .ok_or(anyhow!("asset not a track"))?, - TrackSource::Remote(_) => bail!("track is not local"), - }, - ) - }) - .collect::<anyhow::Result<Vec<_>>>()? - .into_iter() - .collect::<Vec<_>>(); - - match spec.format { - StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?, - StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?, - StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, - StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, - StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, - StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?, - StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?, - StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?, + match spec { + StreamSpec::Original { track } => original_stream(info, track, range, b).await?, + StreamSpec::HlsSuperMultiVariant { container } => { + hls_supermultivariant_stream(b, info, container).await?; + } + StreamSpec::HlsMultiVariant { segment, container } => { + hls_multivariant_stream(b, info, segment, container).await? + } + StreamSpec::HlsVariant { + segment, + track, + container, + format, + } => hls_variant_stream(b, info, segment, track, format, container).await?, + StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, + StreamSpec::FragmentIndex { segment, track } => { + fragment_index_stream(b, info, segment, track).await? + } + StreamSpec::Fragment { + segment, + track, + index, + container, + format, + } => fragment_stream(b, info, track, segment, index, format, container).await?, + _ => bail!("todo"), } Ok(a) } -async fn remux_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, - range: Range<usize>, - b: DuplexStream, -) -> Result<()> { - let b = SyncIoBridge::new(b); - - tokio::task::spawn_blocking(move || { - jellyremuxer::remux_stream_into( - b, - range, - CONF.media_path.to_owned(), - &node, - local_tracks, - spec.track, - spec.webm.unwrap_or(false), - ) - }); - - Ok(()) -} - async fn original_stream( - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, + info: Arc<SMediaInfo>, + track: usize, range: Range<usize>, b: DuplexStream, ) -> Result<()> { - if spec.track.len() != 1 { - bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - } - - let source = local_tracks[spec.track[0]].clone(); - let mut file = File::open(CONF.media_path.join(source.path)) + let (iinfo, _info) = stream_info(info).await?; + let (file_index, _) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("unknown track"))?; + let mut file = File::open(&iinfo.paths[file_index]) .await .context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs new file mode 100644 index 0000000..c3746c6 --- /dev/null +++ b/stream/src/stream_info.rs @@ -0,0 +1,169 @@ +use anyhow::Result; +use ebml_struct::matroska::TrackEntry; +use jellybase::{ + common::stream::{ + StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, + TrackKind, + }, + CONF, +}; +use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; +use std::{path::PathBuf, sync::Arc}; +use tokio::{ + io::{AsyncWriteExt, DuplexStream}, + spawn, + task::spawn_blocking, +}; + +use crate::SMediaInfo; + +async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>> { + Ok(spawn_blocking(move || matroska_metadata(&path)).await??) +} + +pub(crate) struct InternalStreamInfo { + pub paths: Vec<PathBuf>, + pub _metadata: Vec<Arc<MatroskaMetadata>>, + pub track_to_file: Vec<(usize, u64)>, +} + +pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> { + let mut metadata = Vec::new(); + let mut paths = Vec::new(); + for path in &info.files { + metadata.push(async_matroska_metadata(path.clone()).await?); + paths.push(path.clone()); + } + let mut tracks = Vec::new(); + let mut track_to_file = Vec::new(); + + for (i, m) in metadata.iter().enumerate() { + if let Some(t) = &m.tracks { + for t in &t.entries { + tracks.push(StreamTrackInfo { + name: None, + kind: match t.track_type { + 1 => TrackKind::Video, + 2 => TrackKind::Audio, + 17 => TrackKind::Subtitle, + _ => todo!(), + }, + formats: stream_formats(t), + }); + track_to_file.push((i, t.track_number)); + } + } + } + + let segment = StreamSegmentInfo { + name: None, + duration: media_duration(&metadata[0]), + tracks, + }; + Ok(( + InternalStreamInfo { + _metadata: metadata, + paths, + track_to_file, + }, + StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }, + )) +} + +fn stream_formats(t: &TrackEntry) -> Vec<StreamFormatInfo> { + let mut formats = Vec::new(); + formats.push(StreamFormatInfo { + codec: t.codec_id.to_string(), + remux: true, + bitrate: 10_000_000., // TODO + containers: { + let mut x = containers_by_codec(&t.codec_id); + // TODO remove this + x.retain_mut(|x| *x != StreamContainer::MPEG4); + x + }, + bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), + samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), + channels: t.audio.as_ref().map(|a| a.channels as usize), + width: t.video.as_ref().map(|v| v.pixel_width), + height: t.video.as_ref().map(|v| v.pixel_height), + ..Default::default() + }); + + match t.track_type { + 1 => { + let sw = t.video.as_ref().unwrap().pixel_width; + let sh = t.video.as_ref().unwrap().pixel_height; + for (w, br) in [(3840, 8e6), (1920, 5e6), (1280, 3e6), (640, 1e6)] { + if w > sw { + continue; + } + let h = (w * sh) / sw; + for (cid, enable) in [ + ("V_AV1", CONF.encoders.av1.is_some()), + ("V_VP8", CONF.encoders.vp8.is_some()), + ("V_VP9", CONF.encoders.vp9.is_some()), + ("V_MPEG4/ISO/AVC", CONF.encoders.avc.is_some()), + ("V_MPEGH/ISO/HEVC", CONF.encoders.hevc.is_some()), + ] { + if enable { + formats.push(StreamFormatInfo { + codec: cid.to_string(), + bitrate: br, + remux: false, + containers: containers_by_codec(cid), + width: Some(w), + height: Some(h), + samplerate: None, + channels: None, + bit_depth: None, + }); + } + } + } + } + 2 => { + for br in [256e3, 128e3, 64e3] { + formats.push(StreamFormatInfo { + codec: "A_OPUS".to_string(), + bitrate: br, + remux: false, + containers: containers_by_codec("A_OPUS"), + width: None, + height: None, + samplerate: Some(48e3), + channels: Some(2), + bit_depth: Some(32), + }); + } + } + 17 => {} + _ => {} + } + + formats +} + +fn containers_by_codec(codec: &str) -> Vec<StreamContainer> { + use StreamContainer::*; + match codec { + "V_VP8" | "V_VP9" | "V_AV1" | "A_OPUS" | "A_VORBIS" => vec![Matroska, WebM], + "V_MPEG4/ISO/AVC" | "A_AAC" => vec![Matroska, MPEG4], + "S_TEXT/UTF8" | "S_TEXT/WEBVTT" => vec![Matroska, WebVTT, WebM, JVTT], + _ => vec![Matroska], + } +} + +pub(crate) async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> { + let (_, info) = stream_info(info).await?; + spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await }); + Ok(()) +} + +fn media_duration(m: &MatroskaMetadata) -> f64 { + let info = m.info.as_ref().unwrap(); + (info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000. +} diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index f78ac2f..e9f0181 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -3,61 +3,91 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use anyhow::{anyhow, Context, Result}; -use jellybase::{ - cache::async_cache_memory, - common::{stream::StreamSpec, LocalTrack, Node}, - CONF, -}; -use jellyremuxer::extract::extract_track; -use jellytranscoder::subtitles::{parse_subtitles, write_webvtt}; +use anyhow::Result; +use jellybase::common::{stream::StreamSpec, Node}; use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; +use tokio::io::DuplexStream; pub async fn vtt_stream( json: bool, node: Arc<Node>, - local_tracks: Vec<LocalTrack>, spec: StreamSpec, - mut b: DuplexStream, + b: DuplexStream, ) -> Result<()> { + let _ = b; + let _ = spec; + let _ = node; + let _ = json; // TODO cache // TODO should use fragments too? big films take too long... - let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; - let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); - let track = &node.media.as_ref().unwrap().tracks[tracki]; - let cp = local_track.codec_private.clone(); + // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let subtitles = async_cache_memory( - &[ - "vtt", - &format!( - "{} {}", - local_track.path.to_str().unwrap(), - local_track.track - ), - ], - move || async move { - let blocks = tokio::task::spawn_blocking(move || { - extract_track(CONF.media_path.clone(), local_track) - }) - .await??; - let subtitles = parse_subtitles(&track.codec, cp, blocks)?; - Ok(subtitles) - }, - ) - .await?; + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // )spec.track.first().ok_or(anyhow!("no track selected"))?; + // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone(); + // let track = &node.media.as_ref().unwrap().tracks[tracki]; + // let cp = local_track.codec_private.clone(); - let output = if json { - serde_json::to_string(subtitles.as_ref())? - } else { - write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) - .context("writing webvtt")? - }; - tokio::task::spawn(async move { - let _ = b.write_all(output.as_bytes()).await; - }); + // let subtitles = async_cache_memory( + // &[ + // "vtt", + // &format!( + // "{} {}", + // local_track.path.to_str().unwrap(), + // local_track.track + // ), + // ], + // move || async move { + // let blocks = tokio::task::spawn_blocking(move || { + // extract_track(CONF.media_path.clone(), local_track) + // }) + // .await??; + // let subtitles = parse_subtitles(&track.codec, cp, blocks)?; + // Ok(subtitles) + // }, + // ) + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); + // .await?; + + // let output = if json { + // serde_json::to_string(subtitles.as_ref())? + // } else { + // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref()) + // .context("writing webvtt")? + // }; + // tokio::task::spawn(async move { + // let _ = b.write_all(output.as_bytes()).await; + // }); Ok(()) } |