diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-14 13:41:42 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-14 13:41:42 +0200 |
commit | a3afc2756a52f7d6fedc928b97c8ff3eb1ade338 (patch) | |
tree | 9bb043975a6b92e45bbc2f09e19641f1109950b1 /stream | |
parent | 48a57a52d85d387efe122fb4d9fb113f577a0a98 (diff) | |
download | jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar.bz2 jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar.zst |
lots of rewriting and removing dumb code
Diffstat (limited to 'stream')
-rw-r--r-- | stream/src/fragment.rs | 45 | ||||
-rw-r--r-- | stream/src/fragment_index.rs | 32 | ||||
-rw-r--r-- | stream/src/hls.rs | 72 | ||||
-rw-r--r-- | stream/src/jhls.rs | 47 | ||||
-rw-r--r-- | stream/src/lib.rs | 36 | ||||
-rw-r--r-- | stream/src/webvtt.rs | 3 |
6 files changed, 112 insertions, 123 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index a34bb8d..52d32f4 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,37 +3,29 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; -use jellytranscoder::fragment::transcode; +use jellybase::common::stream::StreamContainer; use log::warn; use std::sync::Arc; -use tokio::{fs::File, io::DuplexStream}; +use tokio::io::DuplexStream; use tokio_util::io::SyncIoBridge; pub async fn fragment_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, mut b: DuplexStream, - perms: &PermissionSet, - webm: bool, - track: u64, - segment: u64, + info: Arc<SMediaInfo>, + track: usize, + segment: usize, index: usize, + format: usize, + container: StreamContainer, ) -> Result<()> { - let local_track = local_tracks - .first() - .ok_or(anyhow!("track missing"))? - .to_owned(); + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let path = iinfo.paths[file_index].clone(); // if let Some(profile) = None { // perms.assert(&UserPermission::Transcode)?; @@ -70,11 +62,10 @@ pub async fn fragment_stream( tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( b, - &CONF.media_path, - &node, - &local_track, - track as usize, - webm, + &path, + track_num, + container == StreamContainer::WebM, + &info.name.unwrap_or_default(), index, ) { warn!("segment stream error: {err}"); diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs new file mode 100644 index 0000000..6fbddc6 --- /dev/null +++ b/stream/src/fragment_index.rs @@ -0,0 +1,32 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use crate::{stream_info, SMediaInfo}; +use anyhow::{anyhow, Result}; +use jellybase::common::stream::{SegmentNum, TrackNum}; +use std::sync::Arc; +use tokio::io::{AsyncWriteExt, DuplexStream}; + +pub async fn fragment_index_stream( + mut b: DuplexStream, + info: Arc<SMediaInfo>, + _segment: SegmentNum, + track: TrackNum, +) -> Result<()> { + let (iinfo, _info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + + let fragments = tokio::task::spawn_blocking(move || { + jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) + }) + .await??; + + let out = serde_json::to_string(&fragments)?; + tokio::spawn(async move { b.write_all(out.as_bytes()).await }); + Ok(()) +} diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 27630b2..f06ac72 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -4,13 +4,10 @@ Copyright (C) 2025 metamuffin <metamuffin.org> */ +use crate::{stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - stream::{StreamContainer, StreamSpec}, - LocalTrack, Node, SourceTrackKind, - }, - CONF, +use jellybase::common::stream::{ + FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum, }; use std::{fmt::Write, ops::Range, sync::Arc}; use tokio::{ @@ -19,20 +16,24 @@ use tokio::{ }; pub async fn hls_master_stream( - node: Arc<Node>, - _local_tracks: Vec<LocalTrack>, - segment: u64, - container: StreamContainer, mut b: DuplexStream, + info: Arc<SMediaInfo>, + segment: SegmentNum, + container: StreamContainer, ) -> Result<()> { - let media = node.media.as_ref().ok_or(anyhow!("no media"))?; + let (_iinfo, info) = stream_info(info).await?; + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-VERSION:4")?; // writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?; - for (i, t) in media.tracks.iter().enumerate() { + for (i, t) in seg.tracks.iter().enumerate() { let uri = format!( - "stream?{}", + "stream{}", StreamSpec::HlsVariant { segment, track: i, @@ -42,10 +43,11 @@ pub async fn hls_master_stream( .to_query() ); let r#type = match t.kind { - SourceTrackKind::Video { .. } => "VIDEO", - SourceTrackKind::Audio { .. } => "AUDIO", - SourceTrackKind::Subtitles => "SUBTITLES", + TrackKind::Video => "VIDEO", + TrackKind::Audio => "AUDIO", + TrackKind::Subtitle => "SUBTITLES", }; + // TODO bw writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?; writeln!(out, "{uri}")?; } @@ -54,42 +56,44 @@ pub async fn hls_master_stream( } pub async fn hls_variant_stream( - node: Arc<Node>, - local_tracks: Vec<LocalTrack>, - segment: u64, - track: usize, - format: usize, - container: StreamContainer, mut b: DuplexStream, + info: Arc<SMediaInfo>, + segment: SegmentNum, + track: TrackNum, + format: FormatNum, + container: StreamContainer, ) -> Result<()> { - let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned(); - let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?; + let (iinfo, info) = stream_info(info).await?; + let (file_index, track_num) = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("track not found"))?; + let seg = info + .segments + .get(segment) + .ok_or(anyhow!("segment not found"))?; + let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index( - &CONF.media_path, - &node, - &local_track, - track as usize, - ) + jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) }) .await??; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?; - writeln!(out, "#EXT-X-TARGETDURATION:{}", media_info.duration)?; + writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?; writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - for (i, Range { start, end }) in frags.iter().enumerate() { + for (index, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; writeln!( out, - "stream?{}", + "stream{}", StreamSpec::Fragment { segment, track, - index: i as u64, + index, container, format, } diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs deleted file mode 100644 index 2a2faec..0000000 --- a/stream/src/jhls.rs +++ /dev/null @@ -1,47 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin <metamuffin.org> -*/ -use anyhow::{anyhow, Result}; -use jellybase::{ - common::{ - jhls::JhlsTrackIndex, - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, - }, - permission::PermissionSetExt, - CONF, -}; -use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; - -pub async fn jhls_index( - node: Arc<Node>, - local_tracks: &[LocalTrack], - spec: StreamSpec, - mut b: DuplexStream, - perms: &PermissionSet, -) -> Result<()> { - // let local_track = local_tracks - // .first() - // .ok_or(anyhow!("track missing"))? - // .to_owned(); - - // let fragments = tokio::task::spawn_blocking(move || { - // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0]) - // }) - // .await??; - - // let out = serde_json::to_string(&JhlsTrackIndex { - // extra_profiles: if perms.check(&UserPermission::Transcode) { - // CONF.transcoding_profiles.clone() - // } else { - // vec![] - // }, - // fragments, - // })?; - // tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) -} diff --git a/stream/src/lib.rs b/stream/src/lib.rs index d09759f..a6faf54 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -5,17 +5,20 @@ */ #![feature(iterator_try_collect)] pub mod fragment; +pub mod fragment_index; pub mod hls; -pub mod jhls; pub mod webvtt; use anyhow::{anyhow, Context, Result}; +use fragment::fragment_stream; +use fragment_index::fragment_index_stream; +use hls::{hls_master_stream, hls_variant_stream}; use jellybase::common::{ stream::{ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, StreamTrackInfo, TrackKind, }, - LocalTrack, Node, + Node, }; use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; @@ -75,22 +78,26 @@ pub async fn stream( StreamSpec::Remux { tracks, container } => todo!(), StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => todo!(), - StreamSpec::HlsMultiVariant { segment, container } => todo!(), + StreamSpec::HlsMultiVariant { segment, container } => { + hls_master_stream(b, info, segment, container).await? + } StreamSpec::HlsVariant { segment, track, container, format, - } => todo!(), - StreamSpec::Info { segment } => write_stream_info(info, b).await?, - StreamSpec::FragmentIndex { segment, track } => todo!(), + } => hls_variant_stream(b, info, segment, track, format, container).await?, + StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, + StreamSpec::FragmentIndex { segment, track } => { + fragment_index_stream(b, info, segment, track).await? + } StreamSpec::Fragment { segment, track, index, container, format, - } => todo!(), + } => fragment_stream(b, info, track, segment, index, format, container).await?, } Ok(a) @@ -103,7 +110,7 @@ async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>> pub(crate) struct InternalStreamInfo { pub paths: Vec<PathBuf>, pub metadata: Vec<Arc<MatroskaMetadata>>, - pub track_to_file: Vec<usize>, + pub track_to_file: Vec<(usize, u64)>, } async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> { @@ -142,14 +149,19 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, Strea }, formats, }); - track_to_file.push(i); + track_to_file.push((i, t.track_number)); } } } let segment = StreamSegmentInfo { name: None, - duration: 0, + duration: metadata[0] + .info + .as_ref() + .unwrap() + .duration + .unwrap_or_default(), tracks, }; Ok(( @@ -173,7 +185,6 @@ async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result async fn remux_stream( node: Arc<Node>, - local_tracks: Vec<LocalTrack>, spec: StreamSpec, range: Range<usize>, b: DuplexStream, @@ -202,8 +213,7 @@ async fn original_stream( b: DuplexStream, ) -> Result<()> { let (iinfo, _info) = stream_info(info).await?; - - let file_index = *iinfo + let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index fbd6382..960849c 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, Context, Result}; use jellybase::{ cache::async_cache_memory, - common::{stream::StreamSpec, LocalTrack, Node}, + common::{stream::StreamSpec, Node}, CONF, }; use jellyremuxer::extract::extract_track; @@ -17,7 +17,6 @@ use tokio::io::{AsyncWriteExt, DuplexStream}; pub async fn vtt_stream( json: bool, node: Arc<Node>, - local_tracks: Vec<LocalTrack>, spec: StreamSpec, mut b: DuplexStream, ) -> Result<()> { |