From 2676e755286d117b100d379fce84ec3da6d8ae98 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 29 Jan 2024 13:22:21 +0100 Subject: consistent name for {snippet,segment?,fragment} --- common/src/stream.rs | 4 +- common/src/user.rs | 2 +- remuxer/src/fragment.rs | 212 +++++++++++++++++++++++++++++++++++++++++ remuxer/src/lib.rs | 4 +- remuxer/src/snippet.rs | 212 ----------------------------------------- stream/src/fragment.rs | 84 ++++++++++++++++ stream/src/hls.rs | 8 +- stream/src/jhls.rs | 2 +- stream/src/lib.rs | 8 +- stream/src/segment.rs | 84 ---------------- stream/src/webvtt.rs | 2 +- transcoder/src/fragment.rs | 102 ++++++++++++++++++++ transcoder/src/lib.rs | 2 +- transcoder/src/snippet.rs | 102 -------------------- web/script/player/track/mse.ts | 2 +- 15 files changed, 415 insertions(+), 415 deletions(-) create mode 100644 remuxer/src/fragment.rs delete mode 100644 remuxer/src/snippet.rs create mode 100644 stream/src/fragment.rs delete mode 100644 stream/src/segment.rs create mode 100644 transcoder/src/fragment.rs delete mode 100644 transcoder/src/snippet.rs diff --git a/common/src/stream.rs b/common/src/stream.rs index aa7195e..d0b1373 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -28,7 +28,7 @@ pub enum StreamFormat { #[cfg_attr(feature = "rocket", field(value = "hlsmaster"))] HlsMaster, #[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant, #[cfg_attr(feature = "rocket", field(value = "jhlsi"))] JhlsIndex, - #[cfg_attr(feature = "rocket", field(value = "snippet"))] Snippet, + #[cfg_attr(feature = "rocket", field(value = "frag"))] Fragment, #[cfg_attr(feature = "rocket", field(value = "webvtt"))] Webvtt, #[cfg_attr(feature = "rocket", field(value = "jvtt"))] Jvtt, } @@ -85,7 +85,7 @@ impl StreamFormat { StreamFormat::HlsMaster => "hlsmaster", StreamFormat::HlsVariant => "hlsvariant", StreamFormat::JhlsIndex => "jhlsi", - StreamFormat::Snippet => "snippet", + StreamFormat::Fragment => "frag", StreamFormat::Webvtt => "webvtt", } } diff --git a/common/src/user.rs b/common/src/user.rs index 81e1cc9..085f9db 100644 --- a/common/src/user.rs +++ b/common/src/user.rs @@ -99,7 +99,7 @@ impl UserPermission { | ManageSelf | FederatedContent | StreamFormat( - JhlsIndex | Jvtt | HlsMaster | HlsVariant | Matroska | Snippet | Webvtt + JhlsIndex | Jvtt | HlsMaster | HlsVariant | Matroska | Fragment | Webvtt ) ) } diff --git a/remuxer/src/fragment.rs b/remuxer/src/fragment.rs new file mode 100644 index 0000000..660e2d9 --- /dev/null +++ b/remuxer/src/fragment.rs @@ -0,0 +1,212 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2024 metamuffin +*/ + +use crate::{ + ebml_header, ebml_segment_info, ebml_track_entry, seek_index::get_seek_index, + segment_extractor::SegmentExtractIter, +}; +use anyhow::{anyhow, Context, Result}; +use jellycommon::{LocalTrack, NodePublic, SourceTrackKind}; +use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag}; +use log::{debug, info}; +use std::{ + fs::File, + io::{BufReader, BufWriter, Write}, + ops::Range, + path::Path, +}; + +const FRAGMENT_LENGTH: f64 = 2.; + +pub fn fragment_index( + path_base: &Path, + item: &NodePublic, + local_track: &LocalTrack, + track_index: usize, +) -> Result>> { + let media_info = item.media.as_ref().unwrap(); + let source_path = path_base.join(&local_track.path); + let index = get_seek_index(&source_path)?; + let index = index + .get(&(local_track.track as u64)) + .ok_or(anyhow!("seek index track missing"))?; + + // everything is a keyframe (even though nothing is...) + let force_kf = matches!( + media_info.tracks[track_index].kind, + SourceTrackKind::Subtitles { .. } + ); + + let n_kf = if force_kf { + index.blocks.len() + } else { + index.keyframes.len() + }; + + let average_kf_interval = media_info.duration / n_kf as f64; + let kf_per_snip = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize; + debug!("average keyframe interval: {average_kf_interval}"); + debug!(" => keyframes per frag {kf_per_snip}"); + + let n_snips = n_kf.div_ceil(kf_per_snip); + Ok((0..n_snips) + .map(|i| { + let start = index.blocks[if force_kf { + i * kf_per_snip + } else { + index.keyframes[i * kf_per_snip] + }] + .pts as f64 + / 1000.; + let end = if force_kf { + let n = (i + 1) * kf_per_snip; + if n >= index.blocks.len() { + None + } else { + Some(n) + } + } else { + index.keyframes.get((i + 1) * kf_per_snip).copied() + } + .map(|i| index.blocks[i].pts as f64 / 1000.) + .unwrap_or(media_info.duration); + start..end + }) + .collect()) +} + +pub fn write_fragment_into( + writer: impl Write, + path_base: &Path, + item: &NodePublic, + local_track: &LocalTrack, + track: usize, + webm: bool, + n: usize, +) -> anyhow::Result<()> { + info!("writing fragment {n} of {:?} (track {track})", item.title); + let mut output = EbmlWriter::new(BufWriter::new(writer), 0); + let media_info = item.media.as_ref().unwrap(); + let info = media_info + .tracks + .get(track) + .ok_or(anyhow!("track not available"))? + .to_owned(); + let source_path = path_base.join(&local_track.path); + let mapped = 1; + info!( + "\t- {track} {source_path:?} ({} => {mapped})", + local_track.track + ); + info!("\t {}", info); + let file = File::open(&source_path).context("opening source file")?; + let index = get_seek_index(&source_path)?; + let index = index + .get(&(local_track.track as u64)) + .ok_or(anyhow!("track missing 2"))? + .to_owned(); + debug!("\t seek index: {} blocks loaded", index.blocks.len()); + let mut reader = EbmlReader::new(BufReader::new(file)); + + let force_kf = matches!(info.kind, SourceTrackKind::Subtitles { .. }); + let n_kf = if force_kf { + index.blocks.len() + } else { + index.keyframes.len() + }; + + let average_kf_interval = media_info.duration / n_kf as f64; + let kf_per_snip = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize; + debug!("average keyframe interval: {average_kf_interval}"); + debug!(" => keyframes per frag {kf_per_snip}"); + + let (start_block_index, end_block_index) = if force_kf { + (n * kf_per_snip, (n + 1) * kf_per_snip) + } else { + ( + *index + .keyframes + .get(n * kf_per_snip) + .ok_or(anyhow!("fragment index out of range"))?, + *index + .keyframes + .get((n + 1) * kf_per_snip) + .unwrap_or(&index.blocks.len()), + ) + }; + debug!("writing blocks {start_block_index} to {end_block_index}."); + + let start_block = &index.blocks[start_block_index]; + let last_block_pts = index + .blocks + .get(end_block_index) + .map(|b| b.pts) + .unwrap_or((media_info.duration * 1000.) as u64); + + reader.seek(start_block.source_off, MatroskaTag::Cluster(Master::Start))?; + + output.write_tag(&ebml_header(webm))?; + output.write_tag(&MatroskaTag::Segment(Master::Start))?; + output.write_tag(&ebml_segment_info( + format!( + "{} (track {track}; frag {n})", + item.title.clone().unwrap_or_default() + ), + (last_block_pts - start_block.pts) as f64 / 1000., + ))?; + output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; + output.write_tag(&MatroskaTag::Tracks(Master::Collected(vec![ + ebml_track_entry(mapped, &info, local_track.codec_private.clone()), + ])))?; + + let mut reader = SegmentExtractIter::new(&mut reader, local_track.track as u64); + + { + // TODO this one caused fragments to get dropped by MSE for no reason + // for i in start_block_index..end_block_index { + // let index_block = &index.blocks[i]; + // let mut block = reader.next()?; + + // assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); + + // block.track = 1; + // block.timestamp_off = 0; + // output.write_tag(&MatroskaTag::Cluster(Master::Collected(vec![ + // MatroskaTag::Timestamp(index_block.pts - start_block.pts), + // MatroskaTag::SimpleBlock(block.dump()), + // ])))?; + // } + } + { + let mut blocks = vec![MatroskaTag::Timestamp(start_block.pts)]; + for i in start_block_index..end_block_index { + let index_block = &index.blocks[i]; + let (mut block, duration) = reader.next()?; + + assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); + + block.track = 1; + // TODO this does generate overflows sometimes + block.timestamp_off = (index_block.pts as i64 - start_block.pts as i64) + .try_into() + .unwrap(); + if let Some(duration) = duration { + blocks.push(MatroskaTag::BlockGroup(Master::Collected(vec![ + MatroskaTag::BlockDuration(duration), + MatroskaTag::Block(block), + ]))) + } else { + blocks.push(MatroskaTag::SimpleBlock(block)) + } + } + output.write_tag(&MatroskaTag::Cluster(Master::Collected(blocks)))?; + } + + // output.write_tag(&MatroskaTag::Segment(Master::End))?; + + debug!("wrote {} bytes", output.position()); + Ok(()) +} diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index b49dedc..49af2e7 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -8,11 +8,11 @@ pub mod import; pub mod remux; pub mod seek_index; pub mod segment_extractor; -pub mod snippet; +pub mod fragment; pub mod trim_writer; pub use remux::remux_stream_into; -pub use snippet::write_snippet_into; +pub use fragment::write_fragment_into; use jellycommon::{SourceTrack, SourceTrackKind}; use jellymatroska::{Master, MatroskaTag}; diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs deleted file mode 100644 index 1378e70..0000000 --- a/remuxer/src/snippet.rs +++ /dev/null @@ -1,212 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2024 metamuffin -*/ - -use crate::{ - ebml_header, ebml_segment_info, ebml_track_entry, seek_index::get_seek_index, - segment_extractor::SegmentExtractIter, -}; -use anyhow::{anyhow, Context, Result}; -use jellycommon::{LocalTrack, NodePublic, SourceTrackKind}; -use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag}; -use log::{debug, info}; -use std::{ - fs::File, - io::{BufReader, BufWriter, Write}, - ops::Range, - path::Path, -}; - -const SNIPPET_LENGTH: f64 = 2.; - -pub fn snippet_index( - path_base: &Path, - item: &NodePublic, - local_track: &LocalTrack, - track_index: usize, -) -> Result>> { - let media_info = item.media.as_ref().unwrap(); - let source_path = path_base.join(&local_track.path); - let index = get_seek_index(&source_path)?; - let index = index - .get(&(local_track.track as u64)) - .ok_or(anyhow!("seek index track missing"))?; - - // everything is a keyframe (even though nothing is...) - let force_kf = matches!( - media_info.tracks[track_index].kind, - SourceTrackKind::Subtitles { .. } - ); - - let n_kf = if force_kf { - index.blocks.len() - } else { - index.keyframes.len() - }; - - let average_kf_interval = media_info.duration / n_kf as f64; - let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize; - debug!("average keyframe interval: {average_kf_interval}"); - debug!(" => keyframes per snippet {kf_per_snip}"); - - let n_snips = n_kf.div_ceil(kf_per_snip); - Ok((0..n_snips) - .map(|i| { - let start = index.blocks[if force_kf { - i * kf_per_snip - } else { - index.keyframes[i * kf_per_snip] - }] - .pts as f64 - / 1000.; - let end = if force_kf { - let n = (i + 1) * kf_per_snip; - if n >= index.blocks.len() { - None - } else { - Some(n) - } - } else { - index.keyframes.get((i + 1) * kf_per_snip).copied() - } - .map(|i| index.blocks[i].pts as f64 / 1000.) - .unwrap_or(media_info.duration); - start..end - }) - .collect()) -} - -pub fn write_snippet_into( - writer: impl Write, - path_base: &Path, - item: &NodePublic, - local_track: &LocalTrack, - track: usize, - webm: bool, - n: usize, -) -> anyhow::Result<()> { - info!("writing snippet {n} of {:?} (track {track})", item.title); - let mut output = EbmlWriter::new(BufWriter::new(writer), 0); - let media_info = item.media.as_ref().unwrap(); - let info = media_info - .tracks - .get(track) - .ok_or(anyhow!("track not available"))? - .to_owned(); - let source_path = path_base.join(&local_track.path); - let mapped = 1; - info!( - "\t- {track} {source_path:?} ({} => {mapped})", - local_track.track - ); - info!("\t {}", info); - let file = File::open(&source_path).context("opening source file")?; - let index = get_seek_index(&source_path)?; - let index = index - .get(&(local_track.track as u64)) - .ok_or(anyhow!("track missing 2"))? - .to_owned(); - debug!("\t seek index: {} blocks loaded", index.blocks.len()); - let mut reader = EbmlReader::new(BufReader::new(file)); - - let force_kf = matches!(info.kind, SourceTrackKind::Subtitles { .. }); - let n_kf = if force_kf { - index.blocks.len() - } else { - index.keyframes.len() - }; - - let average_kf_interval = media_info.duration / n_kf as f64; - let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize; - debug!("average keyframe interval: {average_kf_interval}"); - debug!(" => keyframes per snippet {kf_per_snip}"); - - let (start_block_index, end_block_index) = if force_kf { - (n * kf_per_snip, (n + 1) * kf_per_snip) - } else { - ( - *index - .keyframes - .get(n * kf_per_snip) - .ok_or(anyhow!("snippet index out of range"))?, - *index - .keyframes - .get((n + 1) * kf_per_snip) - .unwrap_or(&index.blocks.len()), - ) - }; - debug!("writing blocks {start_block_index} to {end_block_index}."); - - let start_block = &index.blocks[start_block_index]; - let last_block_pts = index - .blocks - .get(end_block_index) - .map(|b| b.pts) - .unwrap_or((media_info.duration * 1000.) as u64); - - reader.seek(start_block.source_off, MatroskaTag::Cluster(Master::Start))?; - - output.write_tag(&ebml_header(webm))?; - output.write_tag(&MatroskaTag::Segment(Master::Start))?; - output.write_tag(&ebml_segment_info( - format!( - "{} (track {track}; snippet {n})", - item.title.clone().unwrap_or_default() - ), - (last_block_pts - start_block.pts) as f64 / 1000., - ))?; - output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; - output.write_tag(&MatroskaTag::Tracks(Master::Collected(vec![ - ebml_track_entry(mapped, &info, local_track.codec_private.clone()), - ])))?; - - let mut reader = SegmentExtractIter::new(&mut reader, local_track.track as u64); - - { - // TODO this one caused snippets to get dropped by MSE for no reason - // for i in start_block_index..end_block_index { - // let index_block = &index.blocks[i]; - // let mut block = reader.next()?; - - // assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); - - // block.track = 1; - // block.timestamp_off = 0; - // output.write_tag(&MatroskaTag::Cluster(Master::Collected(vec![ - // MatroskaTag::Timestamp(index_block.pts - start_block.pts), - // MatroskaTag::SimpleBlock(block.dump()), - // ])))?; - // } - } - { - let mut blocks = vec![MatroskaTag::Timestamp(start_block.pts)]; - for i in start_block_index..end_block_index { - let index_block = &index.blocks[i]; - let (mut block, duration) = reader.next()?; - - assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); - - block.track = 1; - // TODO this does generate overflows sometimes - block.timestamp_off = (index_block.pts as i64 - start_block.pts as i64) - .try_into() - .unwrap(); - if let Some(duration) = duration { - blocks.push(MatroskaTag::BlockGroup(Master::Collected(vec![ - MatroskaTag::BlockDuration(duration), - MatroskaTag::Block(block), - ]))) - } else { - blocks.push(MatroskaTag::SimpleBlock(block)) - } - } - output.write_tag(&MatroskaTag::Cluster(Master::Collected(blocks)))?; - } - - // output.write_tag(&MatroskaTag::Segment(Master::End))?; - - debug!("wrote {} bytes", output.position()); - Ok(()) -} diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs new file mode 100644 index 0000000..f08114c --- /dev/null +++ b/stream/src/fragment.rs @@ -0,0 +1,84 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2024 metamuffin +*/ +use anyhow::{anyhow, bail, Result}; +use jellybase::{permission::PermissionSetExt, CONF}; +use jellycommon::{ + stream::StreamSpec, + user::{PermissionSet, UserPermission}, + LocalTrack, Node, +}; +use jellytranscoder::fragment::transcode; +use log::warn; +use tokio::{fs::File, io::DuplexStream}; +use tokio_util::io::SyncIoBridge; + +pub async fn fragment_stream( + node: Node, + local_tracks: Vec, + spec: StreamSpec, + mut b: DuplexStream, + perms: &PermissionSet, +) -> Result<()> { + if spec.tracks.len() != 1 { + bail!("unsupported number of tracks for segment, must be exactly one"); + } + let track = spec.tracks[0]; + let n = spec.index.ok_or(anyhow!("segment index missing"))?; + + let local_track = local_tracks + .get(0) + .ok_or(anyhow!("track missing"))? + .to_owned(); + + if let Some(profile) = spec.profile { + perms.assert(&UserPermission::Transcode)?; + let location = transcode( + &format!("{track} {n} {:?}", node.private.source), // TODO maybe not use the entire source + CONF.transcoding_profiles + .get(profile) + .ok_or(anyhow!("profile out of range"))?, + move |b| { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + SyncIoBridge::new(b), + &CONF.media_path, + &node.public, + &local_track, + track, + false, + n, + ) { + warn!("segment stream error: {err}"); + } + }); + }, + ) + .await?; + let mut output = File::open(location.abs()).await?; + tokio::task::spawn(async move { + if let Err(err) = tokio::io::copy(&mut output, &mut b).await { + warn!("cannot write stream: {err}") + } + }); + } else { + let b = SyncIoBridge::new(b); + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_fragment_into( + b, + &CONF.media_path, + &node.public, + &local_track, + track, + spec.webm.unwrap_or(false), + n, + ) { + warn!("segment stream error: {err}"); + } + }); + } + + Ok(()) +} diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 310c6b1..7f69d2c 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -53,8 +53,8 @@ pub async fn hls_variant_stream( let local_track = local_tracks.get(0).ok_or(anyhow!("no track"))?.to_owned(); let track_index = spec.tracks[0]; let media_info = node.public.media.to_owned().ok_or(anyhow!("no media?"))?; - let snips = spawn_blocking(move || { - jellyremuxer::snippet::snippet_index( + let frags = spawn_blocking(move || { + jellyremuxer::fragment::fragment_index( &CONF.media_path, &node.public, &local_track, @@ -70,8 +70,8 @@ pub async fn hls_variant_stream( writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; - spec.format = StreamFormat::Snippet; - for (i, Range { start, end }) in snips.iter().enumerate() { + spec.format = StreamFormat::Fragment; + for (i, Range { start, end }) in frags.iter().enumerate() { writeln!(out, "#EXTINF:{:},", end - start)?; spec.index = Some(i); writeln!(out, "stream?{}", spec.to_query())?; diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs index e26df1b..1313c7a 100644 --- a/stream/src/jhls.rs +++ b/stream/src/jhls.rs @@ -26,7 +26,7 @@ pub async fn jhls_index( .to_owned(); let segments = tokio::task::spawn_blocking(move || { - jellyremuxer::snippet::snippet_index( + jellyremuxer::fragment::fragment_index( &CONF.media_path, &node.public, &local_track, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 906e638..4d96f8c 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -6,7 +6,7 @@ #![feature(iterator_try_collect)] pub mod hls; pub mod jhls; -pub mod segment; +pub mod fragment; pub mod webvtt; use anyhow::{anyhow, bail, Context, Result}; @@ -18,7 +18,7 @@ use jellycommon::{ LocalTrack, Node, TrackSource, }; use jhls::jhls_index; -use segment::segment_stream; +use fragment::fragment_stream; use std::{io::SeekFrom, ops::Range}; use tokio::{ fs::File, @@ -41,7 +41,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead { StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false }, StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false }, StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false }, - StreamFormat::Snippet => StreamHead { content_type: webm_or_mkv, range_supported: false }, + StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false }, StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false }, } } @@ -87,7 +87,7 @@ pub async fn stream( StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?, StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?, StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?, - StreamFormat::Snippet => segment_stream(node, local_tracks, spec, b, perms).await?, + StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?, StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?, StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?, } diff --git a/stream/src/segment.rs b/stream/src/segment.rs deleted file mode 100644 index a5162cd..0000000 --- a/stream/src/segment.rs +++ /dev/null @@ -1,84 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2024 metamuffin -*/ -use anyhow::{anyhow, bail, Result}; -use jellybase::{permission::PermissionSetExt, CONF}; -use jellycommon::{ - stream::StreamSpec, - user::{PermissionSet, UserPermission}, - LocalTrack, Node, -}; -use jellytranscoder::snippet::transcode; -use log::warn; -use tokio::{fs::File, io::DuplexStream}; -use tokio_util::io::SyncIoBridge; - -pub async fn segment_stream( - node: Node, - local_tracks: Vec, - spec: StreamSpec, - mut b: DuplexStream, - perms: &PermissionSet, -) -> Result<()> { - if spec.tracks.len() != 1 { - bail!("unsupported number of tracks for segment, must be exactly one"); - } - let track = spec.tracks[0]; - let n = spec.index.ok_or(anyhow!("segment index missing"))?; - - let local_track = local_tracks - .get(0) - .ok_or(anyhow!("track missing"))? - .to_owned(); - - if let Some(profile) = spec.profile { - perms.assert(&UserPermission::Transcode)?; - let location = transcode( - &format!("{track} {n} {:?}", node.private.source), // TODO maybe not use the entire source - CONF.transcoding_profiles - .get(profile) - .ok_or(anyhow!("profile out of range"))?, - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_snippet_into( - SyncIoBridge::new(b), - &CONF.media_path, - &node.public, - &local_track, - track, - false, - n, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - let mut output = File::open(location.abs()).await?; - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut output, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } else { - let b = SyncIoBridge::new(b); - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_snippet_into( - b, - &CONF.media_path, - &node.public, - &local_track, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); - } - }); - } - - Ok(()) -} diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs index 3c9ec41..ec26398 100644 --- a/stream/src/webvtt.rs +++ b/stream/src/webvtt.rs @@ -19,7 +19,7 @@ pub async fn vtt_stream( ) -> Result<()> { // TODO cache - // TODO should use snippets too? big films take too long... + // TODO should use fragments too? big films take too long... let tracki = *spec.tracks.get(0).ok_or(anyhow!("no track selected"))?; let local_track = local_tracks.get(0).ok_or(anyhow!("no tracks"))?.clone(); diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs new file mode 100644 index 0000000..90512c4 --- /dev/null +++ b/transcoder/src/fragment.rs @@ -0,0 +1,102 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2024 metamuffin +*/ + +use crate::LOCAL_VIDEO_TRANSCODING_TASKS; +use jellybase::cache::{async_cache_file, CachePath}; +use jellycommon::jhls::EncodingProfile; +use log::{debug, info}; +use std::process::Stdio; +use tokio::{ + io::copy, + process::{ChildStdin, Command}, +}; + +// TODO odd video resolutions can cause errors when transcoding to YUV42{0,2} +// TODO with an implementation that cant handle it (SVT-AV1 such an impl). + +pub async fn transcode( + key: &str, + enc: &EncodingProfile, + input: impl FnOnce(ChildStdin), +) -> anyhow::Result { + Ok(async_cache_file( + &["snip-tc", key, &format!("{enc:?}")], + move |mut output| async move { + let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; + debug!("transcoding snippet with {enc:?}"); + + let mut args = Vec::new(); + match enc { + EncodingProfile::Video { + codec, + preset, + bitrate, + width, + } => { + if let Some(width) = width { + args.push("-vf".to_string()); + args.push(format!("scale={width}:-1")); + } + args.push("-c:v".to_string()); + args.push(codec.to_string()); + if let Some(preset) = preset { + args.push("-preset".to_string()); + args.push(format!("{preset}")); + } + args.push("-b:v".to_string()); + args.push(format!("{bitrate}")); + } + EncodingProfile::Audio { + codec, + bitrate, + sample_rate, + channels, + } => { + if let Some(channels) = channels { + args.push("-ac".to_string()); + args.push(format!("{channels}")) + } + if let Some(sample_rate) = sample_rate { + args.push("-ar".to_string()); + args.push(format!("{sample_rate}")) + } + args.push("-c:a".to_string()); + args.push(codec.to_string()); + args.push("-b:a".to_string()); + args.push(format!("{bitrate}")); + } + EncodingProfile::Subtitles { codec } => { + args.push("-c:s".to_string()); + args.push(codec.to_string()); + } + }; + info!("encoding with {:?}", args.join(" ")); + + let mut proc = Command::new("ffmpeg") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .args(&["-f", "matroska", "-i", "pipe:0"]) + .args(args) + .args(&["-f", "webm", "pipe:1"]) + .spawn()?; + // let mut proc = Command::new("cat") + // .stdin(Stdio::piped()) + // .stdout(Stdio::piped()) + // .spawn()?; + + let stdin = proc.stdin.take().unwrap(); + let mut stdout = proc.stdout.take().unwrap(); + + input(stdin); + copy(&mut stdout, &mut output).await?; + + proc.wait().await.unwrap().exit_ok()?; + info!("done"); + Ok(()) + }, + ) + .await?) +} diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs index 35b5605..eb56229 100644 --- a/transcoder/src/lib.rs +++ b/transcoder/src/lib.rs @@ -8,7 +8,7 @@ use tokio::sync::Semaphore; pub mod image; -pub mod snippet; +pub mod fragment; pub mod subtitles; pub mod thumbnail; diff --git a/transcoder/src/snippet.rs b/transcoder/src/snippet.rs deleted file mode 100644 index 90512c4..0000000 --- a/transcoder/src/snippet.rs +++ /dev/null @@ -1,102 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2024 metamuffin -*/ - -use crate::LOCAL_VIDEO_TRANSCODING_TASKS; -use jellybase::cache::{async_cache_file, CachePath}; -use jellycommon::jhls::EncodingProfile; -use log::{debug, info}; -use std::process::Stdio; -use tokio::{ - io::copy, - process::{ChildStdin, Command}, -}; - -// TODO odd video resolutions can cause errors when transcoding to YUV42{0,2} -// TODO with an implementation that cant handle it (SVT-AV1 such an impl). - -pub async fn transcode( - key: &str, - enc: &EncodingProfile, - input: impl FnOnce(ChildStdin), -) -> anyhow::Result { - Ok(async_cache_file( - &["snip-tc", key, &format!("{enc:?}")], - move |mut output| async move { - let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; - debug!("transcoding snippet with {enc:?}"); - - let mut args = Vec::new(); - match enc { - EncodingProfile::Video { - codec, - preset, - bitrate, - width, - } => { - if let Some(width) = width { - args.push("-vf".to_string()); - args.push(format!("scale={width}:-1")); - } - args.push("-c:v".to_string()); - args.push(codec.to_string()); - if let Some(preset) = preset { - args.push("-preset".to_string()); - args.push(format!("{preset}")); - } - args.push("-b:v".to_string()); - args.push(format!("{bitrate}")); - } - EncodingProfile::Audio { - codec, - bitrate, - sample_rate, - channels, - } => { - if let Some(channels) = channels { - args.push("-ac".to_string()); - args.push(format!("{channels}")) - } - if let Some(sample_rate) = sample_rate { - args.push("-ar".to_string()); - args.push(format!("{sample_rate}")) - } - args.push("-c:a".to_string()); - args.push(codec.to_string()); - args.push("-b:a".to_string()); - args.push(format!("{bitrate}")); - } - EncodingProfile::Subtitles { codec } => { - args.push("-c:s".to_string()); - args.push(codec.to_string()); - } - }; - info!("encoding with {:?}", args.join(" ")); - - let mut proc = Command::new("ffmpeg") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .args(&["-f", "matroska", "-i", "pipe:0"]) - .args(args) - .args(&["-f", "webm", "pipe:1"]) - .spawn()?; - // let mut proc = Command::new("cat") - // .stdin(Stdio::piped()) - // .stdout(Stdio::piped()) - // .spawn()?; - - let stdin = proc.stdin.take().unwrap(); - let mut stdout = proc.stdout.take().unwrap(); - - input(stdin); - copy(&mut stdout, &mut output).await?; - - proc.wait().await.unwrap().exit_ok()?; - info!("done"); - Ok(()) - }, - ) - .await?) -} diff --git a/web/script/player/track/mse.ts b/web/script/player/track/mse.ts index c916204..2949890 100644 --- a/web/script/player/track/mse.ts +++ b/web/script/player/track/mse.ts @@ -122,7 +122,7 @@ export class MSEPlayerTrack extends PlayerTrack { async load(index: number) { this.loading.add(index); await this.profile_selector.select_optimal_profile(this.track_index, this.profile); - const url = `/n/${encodeURIComponent(this.node_id)}/stream?format=snippet&webm=true&tracks=${this.track_index}&index=${index}${this.profile.value ? `&profile=${this.profile.value.id}` : ""}`; + const url = `/n/${encodeURIComponent(this.node_id)}/stream?format=frag&webm=true&tracks=${this.track_index}&index=${index}${this.profile.value ? `&profile=${this.profile.value.id}` : ""}`; const buf = await this.player.downloader.download(url); await new Promise(cb => { if (!this.index) return; -- cgit v1.2.3-70-g09d2