diff options
author | metamuffin <metamuffin@disroot.org> | 2023-09-30 15:21:57 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-09-30 15:21:57 +0200 |
commit | 30e3d18c6ec50572365baaaaa3542769e82e763a (patch) | |
tree | 3eade459fe488729bbe61dd85ac49948d5e24ef7 | |
parent | d0d8316a015fa0434c2871541b83ea0aca781a99 (diff) | |
download | jellything-30e3d18c6ec50572365baaaaa3542769e82e763a.tar jellything-30e3d18c6ec50572365baaaaa3542769e82e763a.tar.bz2 jellything-30e3d18c6ec50572365baaaaa3542769e82e763a.tar.zst |
move some files around for new remuxer + small changes
-rw-r--r-- | Cargo.lock | 36 | ||||
-rw-r--r-- | common/src/helpers.rs | 5 | ||||
-rw-r--r-- | common/src/lib.rs | 14 | ||||
-rw-r--r-- | common/src/stream.rs | 12 | ||||
-rw-r--r-- | import/Cargo.toml | 2 | ||||
-rw-r--r-- | matroska/Cargo.toml | 2 | ||||
-rw-r--r-- | remuxer/src/import/mod.rs | 2 | ||||
-rw-r--r-- | remuxer/src/lib.rs | 379 | ||||
-rw-r--r-- | remuxer/src/remux.rs | 381 | ||||
-rw-r--r-- | remuxer/src/seek_index.rs (renamed from remuxer/src/import/seek_index.rs) | 45 | ||||
-rw-r--r-- | remuxer/src/segment_extractor.rs | 5 | ||||
-rw-r--r-- | remuxer/src/snippet.rs | 5 | ||||
-rw-r--r-- | remuxer/src/trim_writer.rs | 5 | ||||
-rw-r--r-- | server/Cargo.toml | 2 | ||||
-rw-r--r-- | stream/src/hls.rs | 5 | ||||
-rw-r--r-- | stream/src/lib.rs | 8 | ||||
-rw-r--r-- | stream/src/segment.rs | 18 |
17 files changed, 509 insertions, 417 deletions
@@ -104,9 +104,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -142,9 +142,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -607,9 +607,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.4" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -617,9 +617,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.4" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -2828,9 +2828,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -3032,18 +3032,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", @@ -3309,9 +3309,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ubyte" @@ -3437,9 +3437,9 @@ checksum = "9dcc60c0624df774c82a0ef104151231d37da4962957d691c011c852b2473314" [[package]] name = "vte" -version = "0.11.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5022b5fbf9407086c180e9557be968742d839e68346af7792b8592489732197" +checksum = "401dc1020e10f74d38616c1f1ab92ccd85dc902705a29d0730e0fbea8534f91a" dependencies = [ "arrayvec", "utf8parse", diff --git a/common/src/helpers.rs b/common/src/helpers.rs index 5150667..86072cc 100644 --- a/common/src/helpers.rs +++ b/common/src/helpers.rs @@ -1,3 +1,8 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ use std::ops::Deref; #[derive(PartialEq, PartialOrd)] diff --git a/common/src/lib.rs b/common/src/lib.rs index b7f975a..d57d2c0 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -140,9 +140,13 @@ pub enum SourceTrackKind { Subtitles, } +pub const SEEK_INDEX_VERSION: u32 = 0x5eef1de4; + #[derive(Debug, Clone, Decode, Encode)] pub struct SeekIndex { + pub version: u32, pub blocks: Vec<BlockIndex>, + pub keyframes: Vec<usize>, } #[derive(Debug, Clone, Decode, Encode)] @@ -151,3 +155,13 @@ pub struct BlockIndex { pub source_off: usize, pub size: usize, } + +impl Default for SeekIndex { + fn default() -> Self { + Self { + version: SEEK_INDEX_VERSION, + blocks: Vec::new(), + keyframes: Vec::new(), + } + } +} diff --git a/common/src/stream.rs b/common/src/stream.rs index ca09999..af19062 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -1,3 +1,8 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ #[cfg(feature = "rocket")] use rocket::{FromForm, FromFormField, UriDisplayQuery}; use serde::{Deserialize, Serialize}; @@ -10,6 +15,7 @@ pub struct StreamSpec { pub webm: Option<bool>, pub abr: Option<usize>, pub vbr: Option<usize>, + pub width: Option<usize>, pub index: Option<usize>, } @@ -31,6 +37,7 @@ impl Default for StreamSpec { tracks: Vec::new(), format: StreamFormat::Matroska, webm: Some(true), + width: None, abr: None, vbr: None, index: None, @@ -66,7 +73,10 @@ impl StreamSpec { writeln!(u, "&index={index}").unwrap(); } if let Some(webm) = self.webm { - writeln!(u, "&webmm={webm}").unwrap(); + writeln!(u, "&webm={webm}").unwrap(); + } + if let Some(width) = self.width { + writeln!(u, "&width={width}").unwrap(); } u } diff --git a/import/Cargo.toml b/import/Cargo.toml index bdb6813..89d3ea9 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -11,7 +11,7 @@ jellyremuxer = { path = "../remuxer" } log = { workspace = true } env_logger = "0.10.0" anyhow = "1.0.75" -clap = { version = "4.4.4", features = ["derive"] } +clap = { version = "4.4.6", features = ["derive"] } reqwest = { version = "0.11.20", features = ["blocking", "json"] } serde = { version = "1.0.188", features = ["derive"] } diff --git a/matroska/Cargo.toml b/matroska/Cargo.toml index de8389c..c8c22ec 100644 --- a/matroska/Cargo.toml +++ b/matroska/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" ebml_derive = { path = "../ebml_derive" } log = "0.4.20" env_logger = "0.10.0" -thiserror = "1.0.48" +thiserror = "1.0.49" diff --git a/remuxer/src/import/mod.rs b/remuxer/src/import/mod.rs index 5f76623..0ce8b47 100644 --- a/remuxer/src/import/mod.rs +++ b/remuxer/src/import/mod.rs @@ -3,8 +3,6 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ -pub mod seek_index; - use anyhow::{anyhow, bail, Context, Result}; use jellycommon::{LocalTrack, SourceTrack, SourceTrackKind}; use jellymatroska::{ diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index a9702fb..9a5e6c6 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -4,381 +4,10 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ pub mod import; +pub mod remux; pub mod segment_extractor; pub mod trim_writer; +pub mod snippet; +pub mod seek_index; -use crate::{segment_extractor::SegmentExtractIter, trim_writer::TrimWriter}; -use anyhow::{anyhow, Context}; -use jellycommon::{BlockIndex, LocalTrack, NodePublic, SeekIndex, SourceTrack, SourceTrackKind}; -use jellymatroska::{ - block::Block, - read::EbmlReader, - write::{bad_vint_length, vint_length, EbmlWriter}, - Master, MatroskaTag, -}; -use log::{debug, info, trace, warn}; -use std::{ - fs::File, - io::{Seek, SeekFrom, Write}, - ops::Range, - path::PathBuf, - time::Instant, -}; - -pub fn remux_stream_into( - writer: impl Write + 'static, - range: Range<usize>, - path_base: PathBuf, - item: NodePublic, - track_sources: Vec<LocalTrack>, - selection: Vec<usize>, - webm: bool, -) -> anyhow::Result<()> { - info!("remuxing {:?} to have tracks {selection:?}", item.title); - let writer = TrimWriter::new(writer, range.clone()); - let mut output = EbmlWriter::new(writer, 0); - - struct ReaderC { - info: SourceTrack, - reader: EbmlReader, - mapped: u64, - index: SeekIndex, - source_track_index: usize, - codec_private: Option<Vec<u8>>, - layouting_progress_index: usize, - } - - let timing_cp = Instant::now(); - - let mut inputs = selection - .iter() - .enumerate() - .map(|(index, sel)| { - let info = item - .media - .as_ref() - .unwrap() - .tracks - .get(*sel) - .ok_or(anyhow!("track not available"))? - .to_owned(); - let private = &track_sources[*sel]; - let source_path = path_base.join(&private.path); - let mapped = index as u64 + 1; - info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track); - info!("\t {}", info); - let file = File::open(&source_path).context("opening source file")?; - let mut index = File::open(source_path.with_extension(format!("si.{}", private.track))) - .context("opening seek index file")?; - let index = bincode::decode_from_std_read::<SeekIndex, _, _>( - &mut index, - bincode::config::standard(), - )?; - debug!("\t seek index: {} blocks loaded", index.blocks.len()); - let reader = EbmlReader::new(file); - Ok(ReaderC { - index, - reader, - info, - mapped, - source_track_index: private.track, - codec_private: private.codec_private.clone(), - layouting_progress_index: 0, - }) - }) - .collect::<anyhow::Result<Vec<_>>>()?; - - info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp); - let timing_cp = Instant::now(); - - output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![ - MatroskaTag::EbmlVersion(1), - MatroskaTag::EbmlReadVersion(1), - MatroskaTag::EbmlMaxIdLength(4), - MatroskaTag::EbmlMaxSizeLength(8), - MatroskaTag::DocType(if webm { - "webm".to_string() - } else { - "matroska".to_string() - }), - MatroskaTag::DocTypeVersion(4), - MatroskaTag::DocTypeReadVersion(2), - ])))?; - - output.write_tag(&MatroskaTag::Segment(Master::Start))?; - let segment_offset = output.position(); - - output.write_tag(&MatroskaTag::Info(Master::Collected(vec![ - MatroskaTag::TimestampScale(1_000_000), - MatroskaTag::Duration(item.media.unwrap().duration * 1000.0), - MatroskaTag::Title(item.title.clone()), - MatroskaTag::MuxingApp("jellyremux".to_string()), - MatroskaTag::WritingApp("jellything".to_string()), - ])))?; - output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; - - let tracks_header = inputs - .iter_mut() - .map(|rc| track_to_ebml(rc.mapped, &rc.info, rc.codec_private.take())) - .collect(); - output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?; - - struct ClusterLayout { - position: usize, - timestamp: u64, - source_offsets: Vec<Option<usize>>, - blocks: Vec<(usize, BlockIndex)>, - } - - let mut segment_layout: Vec<ClusterLayout> = { - let mut cluster_pts = 0; - let mut clusters = vec![]; - let mut cluster = vec![]; - let mut source_offsets = vec![None; inputs.len()]; - let mut gp = 0usize; // cluster position (in the segment) - let mut p = 0usize; // block position (in the cluster) - loop { - let (track, block) = { - let mut best_block = BlockIndex { - pts: u64::MAX, - size: 0, - source_off: 0, - }; - let mut best_track = 0; - for (i, r) in inputs.iter().enumerate() { - if let Some(v) = r.index.blocks.get(r.layouting_progress_index) { - if v.pts < best_block.pts { - best_block = v.to_owned(); - best_track = i; - } - }; - } - (best_track, best_block) - }; - inputs[track].layouting_progress_index += 1; - source_offsets[track].get_or_insert(block.source_off); - if block.pts > cluster_pts + 1_000 { - let cluster_content_size = 1 + 1 // timestamp {tag, size} - + bad_vint_length(cluster_pts) // timestamp tag value - + p; - let cluster_size = 4 // tag length - + vint_length(cluster_content_size as u64) // size varint - + cluster_content_size; - clusters.push(ClusterLayout { - position: gp, // relative to the first cluster - timestamp: cluster_pts, - source_offsets, - blocks: std::mem::take(&mut cluster), - }); - - cluster_pts = block.pts; - source_offsets = vec![None; inputs.len()]; - gp += cluster_size; - p = 0; - } - if block.pts == u64::MAX { - break; - } - - let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags} - // TODO does not work, if more than 127 tracks are present - + block.size; // block payload - p += 1; // simpleblock tag - p += vint_length(simpleblock_size as u64); // simpleblock size vint - p += simpleblock_size; - - cluster.push((track, block)) - } - info!("segment layout computed ({} clusters)", clusters.len()); - clusters - }; - info!( - "(perf) compute segment layout: {:?}", - Instant::now() - timing_cp - ); - let timing_cp = Instant::now(); - - let max_cue_size = 4 // cues id - + 8 // cues len - + ( // cues content - 1 // cp id - + 1 // cp len - + ( // cp content - 1 // ctime id, - + 1 // ctime len - + 8 // ctime content uint - + ( // ctps - 1 // ctp id - + 8 // ctp len - + (// ctp content - 1 // ctrack id - + 1 // ctrack size - + 1 // ctrack content int - // TODO break if inputs.len() >= 127 - + 1 // ccp id - + 1 // ccp len - + 8 // ccp content offset - ) - ) - ) * inputs.len() - ) * segment_layout.len() - + 1 // void id - + 8; // void len - - let first_cluster_offset_predict = max_cue_size + output.position(); - - // make the cluster position relative to the segment start as they should - segment_layout - .iter_mut() - .for_each(|e| e.position += first_cluster_offset_predict - segment_offset); - - output.write_tag(&MatroskaTag::Cues(Master::Collected( - segment_layout - .iter() - .map(|cluster| { - MatroskaTag::CuePoint(Master::Collected( - Some(MatroskaTag::CueTime(cluster.timestamp)) - .into_iter() - // TODO: Subtitles should not have cues for every cluster - .chain(inputs.iter().map(|i| { - MatroskaTag::CueTrackPositions(Master::Collected(vec![ - MatroskaTag::CueTrack(i.mapped), - MatroskaTag::CueClusterPosition(cluster.position as u64), - ])) - })) - .collect(), - )) - }) - .collect(), - )))?; - output.write_padding(first_cluster_offset_predict)?; - let first_cluster_offset = output.position(); - assert_eq!(first_cluster_offset, first_cluster_offset_predict); - - let mut skip = 0; - for (i, cluster) in segment_layout.iter().enumerate() { - if (cluster.position + segment_offset) >= range.start { - break; - } - skip = i; - } - - if skip != 0 { - info!("skipping {skip} clusters"); - output.seek(SeekFrom::Start( - (segment_layout[skip].position + segment_offset) as u64, - ))?; - } - - struct ReaderD<'a> { - peek: Option<Block>, - stream: SegmentExtractIter<'a>, - mapped: u64, - } - - let mut track_readers = inputs - .iter_mut() - .enumerate() - .map(|(i, inp)| { - inp.reader - .seek( - segment_layout[skip].source_offsets[i].unwrap(), // TODO will crash if there is a "hole" - MatroskaTag::Cluster(Master::Start), - ) - .context("seeking in input")?; - let mut stream = - SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64); - - Ok(ReaderD { - mapped: inp.mapped, - peek: Some(stream.next()?), - stream, - }) - }) - .collect::<anyhow::Result<Vec<_>>>()?; - - info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp); - - for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) { - debug!( - "writing cluster {cluster_index} (pts_base={}) with {} blocks", - cluster.timestamp, - cluster.blocks.len() - ); - { - let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64; - if cue_error != 0 { - warn!("calculation was {} bytes off", cue_error); - } - } - - let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)]; - for (block_track, index_block) in cluster.blocks { - let track_reader = &mut track_readers[block_track]; - let mut block = track_reader - .peek - .replace(track_reader.stream.next()?) - .expect("source file too short"); - - assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); - - block.track = track_reader.mapped; - block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap(); - trace!("n={} tso={}", block.track, block.timestamp_off); - let buf = block.dump(); - cluster_blocks.push(MatroskaTag::SimpleBlock(buf)) - } - output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?; - } - output.write_tag(&MatroskaTag::Segment(Master::End))?; - Ok(()) -} - -pub fn track_to_ebml( - number: u64, - track: &SourceTrack, - codec_private: Option<Vec<u8>>, -) -> MatroskaTag { - let mut els = vec![ - MatroskaTag::TrackNumber(number), - MatroskaTag::TrackUID(number), - MatroskaTag::FlagLacing(0), - MatroskaTag::Language(track.language.clone()), - MatroskaTag::CodecID(track.codec.clone()), - ]; - if let Some(d) = &track.default_duration { - els.push(MatroskaTag::DefaultDuration(*d)); - } - match track.kind { - SourceTrackKind::Video { - width, - height, - fps: _, - } => { - els.push(MatroskaTag::TrackType(1)); - els.push(MatroskaTag::Video(Master::Collected(vec![ - MatroskaTag::PixelWidth(width), - MatroskaTag::PixelHeight(height), - ]))) - } - SourceTrackKind::Audio { - channels, - sample_rate, - bit_depth, - } => { - els.push(MatroskaTag::TrackType(2)); - els.push(MatroskaTag::Audio(Master::Collected(vec![ - MatroskaTag::SamplingFrequency(sample_rate), - MatroskaTag::Channels(channels.try_into().unwrap()), - ]))); - els.push(MatroskaTag::BitDepth(bit_depth.try_into().unwrap())); - } - SourceTrackKind::Subtitles => { - els.push(MatroskaTag::TrackType(19)); - } - } - if let Some(d) = &codec_private { - els.push(MatroskaTag::CodecPrivate(d.clone())); - } - MatroskaTag::TrackEntry(Master::Collected(els)) -} +pub use remux::remux_stream_into; diff --git a/remuxer/src/remux.rs b/remuxer/src/remux.rs new file mode 100644 index 0000000..8807a38 --- /dev/null +++ b/remuxer/src/remux.rs @@ -0,0 +1,381 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ +use crate::{segment_extractor::SegmentExtractIter, trim_writer::TrimWriter}; +use anyhow::{anyhow, Context}; +use jellycommon::{BlockIndex, LocalTrack, NodePublic, SeekIndex, SourceTrack, SourceTrackKind}; +use jellymatroska::{ + block::Block, + read::EbmlReader, + write::{bad_vint_length, vint_length, EbmlWriter}, + Master, MatroskaTag, +}; +use log::{debug, info, trace, warn}; +use std::{ + fs::File, + io::{Seek, SeekFrom, Write}, + ops::Range, + path::PathBuf, + time::Instant, +}; + +pub fn remux_stream_into( + writer: impl Write, + range: Range<usize>, + path_base: PathBuf, + item: NodePublic, + track_sources: Vec<LocalTrack>, + selection: Vec<usize>, + webm: bool, +) -> anyhow::Result<()> { + info!("remuxing {:?} to have tracks {selection:?}", item.title); + let writer = TrimWriter::new(writer, range.clone()); + let mut output = EbmlWriter::new(writer, 0); + + struct ReaderC { + info: SourceTrack, + reader: EbmlReader, + mapped: u64, + index: SeekIndex, + source_track_index: usize, + codec_private: Option<Vec<u8>>, + layouting_progress_index: usize, + } + + let timing_cp = Instant::now(); + + let mut inputs = selection + .iter() + .enumerate() + .map(|(index, sel)| { + let info = item + .media + .as_ref() + .unwrap() + .tracks + .get(*sel) + .ok_or(anyhow!("track not available"))? + .to_owned(); + let private = &track_sources[*sel]; + let source_path = path_base.join(&private.path); + let mapped = index as u64 + 1; + info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track); + info!("\t {}", info); + let file = File::open(&source_path).context("opening source file")?; + let mut index = File::open(source_path.with_extension(format!("si.{}", private.track))) + .context("opening seek index file")?; + let index = bincode::decode_from_std_read::<SeekIndex, _, _>( + &mut index, + bincode::config::standard(), + )?; + debug!("\t seek index: {} blocks loaded", index.blocks.len()); + let reader = EbmlReader::new(file); + Ok(ReaderC { + index, + reader, + info, + mapped, + source_track_index: private.track, + codec_private: private.codec_private.clone(), + layouting_progress_index: 0, + }) + }) + .collect::<anyhow::Result<Vec<_>>>()?; + + info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp); + let timing_cp = Instant::now(); + + output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![ + MatroskaTag::EbmlVersion(1), + MatroskaTag::EbmlReadVersion(1), + MatroskaTag::EbmlMaxIdLength(4), + MatroskaTag::EbmlMaxSizeLength(8), + MatroskaTag::DocType(if webm { + "webm".to_string() + } else { + "matroska".to_string() + }), + MatroskaTag::DocTypeVersion(4), + MatroskaTag::DocTypeReadVersion(2), + ])))?; + + output.write_tag(&MatroskaTag::Segment(Master::Start))?; + let segment_offset = output.position(); + + output.write_tag(&MatroskaTag::Info(Master::Collected(vec![ + MatroskaTag::TimestampScale(1_000_000), + MatroskaTag::Duration(item.media.unwrap().duration * 1000.0), + MatroskaTag::Title(item.title.clone()), + MatroskaTag::MuxingApp("jellyremux".to_string()), + MatroskaTag::WritingApp("jellything".to_string()), + ])))?; + output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; + + let tracks_header = inputs + .iter_mut() + .map(|rc| track_to_ebml(rc.mapped, &rc.info, rc.codec_private.take())) + .collect(); + output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?; + + struct ClusterLayout { + position: usize, + timestamp: u64, + source_offsets: Vec<Option<usize>>, + blocks: Vec<(usize, BlockIndex)>, + } + + let mut segment_layout: Vec<ClusterLayout> = { + let mut cluster_pts = 0; + let mut clusters = vec![]; + let mut cluster = vec![]; + let mut source_offsets = vec![None; inputs.len()]; + let mut gp = 0usize; // cluster position (in the segment) + let mut p = 0usize; // block position (in the cluster) + loop { + let (track, block) = { + let mut best_block = BlockIndex { + pts: u64::MAX, + size: 0, + source_off: 0, + }; + let mut best_track = 0; + for (i, r) in inputs.iter().enumerate() { + if let Some(v) = r.index.blocks.get(r.layouting_progress_index) { + if v.pts < best_block.pts { + best_block = v.to_owned(); + best_track = i; + } + }; + } + (best_track, best_block) + }; + inputs[track].layouting_progress_index += 1; + source_offsets[track].get_or_insert(block.source_off); + if block.pts > cluster_pts + 1_000 { + let cluster_content_size = 1 + 1 // timestamp {tag, size} + + bad_vint_length(cluster_pts) // timestamp tag value + + p; + let cluster_size = 4 // tag length + + vint_length(cluster_content_size as u64) // size varint + + cluster_content_size; + clusters.push(ClusterLayout { + position: gp, // relative to the first cluster + timestamp: cluster_pts, + source_offsets, + blocks: std::mem::take(&mut cluster), + }); + + cluster_pts = block.pts; + source_offsets = vec![None; inputs.len()]; + gp += cluster_size; + p = 0; + } + if block.pts == u64::MAX { + break; + } + + let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags} + // TODO does not work, if more than 127 tracks are present + + block.size; // block payload + p += 1; // simpleblock tag + p += vint_length(simpleblock_size as u64); // simpleblock size vint + p += simpleblock_size; + + cluster.push((track, block)) + } + info!("segment layout computed ({} clusters)", clusters.len()); + clusters + }; + info!( + "(perf) compute segment layout: {:?}", + Instant::now() - timing_cp + ); + let timing_cp = Instant::now(); + + let max_cue_size = 4 // cues id + + 8 // cues len + + ( // cues content + 1 // cp id + + 1 // cp len + + ( // cp content + 1 // ctime id, + + 1 // ctime len + + 8 // ctime content uint + + ( // ctps + 1 // ctp id + + 8 // ctp len + + (// ctp content + 1 // ctrack id + + 1 // ctrack size + + 1 // ctrack content int + // TODO break if inputs.len() >= 127 + + 1 // ccp id + + 1 // ccp len + + 8 // ccp content offset + ) + ) + ) * inputs.len() + ) * segment_layout.len() + + 1 // void id + + 8; // void len + + let first_cluster_offset_predict = max_cue_size + output.position(); + + // make the cluster position relative to the segment start as they should + segment_layout + .iter_mut() + .for_each(|e| e.position += first_cluster_offset_predict - segment_offset); + + output.write_tag(&MatroskaTag::Cues(Master::Collected( + segment_layout + .iter() + .map(|cluster| { + MatroskaTag::CuePoint(Master::Collected( + Some(MatroskaTag::CueTime(cluster.timestamp)) + .into_iter() + // TODO: Subtitles should not have cues for every cluster + .chain(inputs.iter().map(|i| { + MatroskaTag::CueTrackPositions(Master::Collected(vec![ + MatroskaTag::CueTrack(i.mapped), + MatroskaTag::CueClusterPosition(cluster.position as u64), + ])) + })) + .collect(), + )) + }) + .collect(), + )))?; + output.write_padding(first_cluster_offset_predict)?; + let first_cluster_offset = output.position(); + assert_eq!(first_cluster_offset, first_cluster_offset_predict); + + let mut skip = 0; + for (i, cluster) in segment_layout.iter().enumerate() { + if (cluster.position + segment_offset) >= range.start { + break; + } + skip = i; + } + + if skip != 0 { + info!("skipping {skip} clusters"); + output.seek(SeekFrom::Start( + (segment_layout[skip].position + segment_offset) as u64, + ))?; + } + + struct ReaderD<'a> { + peek: Option<Block>, + stream: SegmentExtractIter<'a>, + mapped: u64, + } + + let mut track_readers = inputs + .iter_mut() + .enumerate() + .map(|(i, inp)| { + inp.reader + .seek( + segment_layout[skip].source_offsets[i].unwrap(), // TODO will crash if there is a "hole" + MatroskaTag::Cluster(Master::Start), + ) + .context("seeking in input")?; + let mut stream = + SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64); + + Ok(ReaderD { + mapped: inp.mapped, + peek: Some(stream.next()?), + stream, + }) + }) + .collect::<anyhow::Result<Vec<_>>>()?; + + info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp); + + for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) { + debug!( + "writing cluster {cluster_index} (pts_base={}) with {} blocks", + cluster.timestamp, + cluster.blocks.len() + ); + { + let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64; + if cue_error != 0 { + warn!("calculation was {} bytes off", cue_error); + } + } + + let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)]; + for (block_track, index_block) in cluster.blocks { + let track_reader = &mut track_readers[block_track]; + let mut block = track_reader + .peek + .replace(track_reader.stream.next()?) + .expect("source file too short"); + + assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); + + block.track = track_reader.mapped; + block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap(); + trace!("n={} tso={}", block.track, block.timestamp_off); + + let buf = block.dump(); + cluster_blocks.push(MatroskaTag::SimpleBlock(buf)) + } + output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?; + } + output.write_tag(&MatroskaTag::Segment(Master::End))?; + Ok(()) +} + +pub fn track_to_ebml( + number: u64, + track: &SourceTrack, + codec_private: Option<Vec<u8>>, +) -> MatroskaTag { + let mut els = vec![ + MatroskaTag::TrackNumber(number), + MatroskaTag::TrackUID(number), + MatroskaTag::FlagLacing(0), + MatroskaTag::Language(track.language.clone()), + MatroskaTag::CodecID(track.codec.clone()), + ]; + if let Some(d) = &track.default_duration { + els.push(MatroskaTag::DefaultDuration(*d)); + } + match track.kind { + SourceTrackKind::Video { + width, + height, + fps: _, + } => { + els.push(MatroskaTag::TrackType(1)); + els.push(MatroskaTag::Video(Master::Collected(vec![ + MatroskaTag::PixelWidth(width), + MatroskaTag::PixelHeight(height), + ]))) + } + SourceTrackKind::Audio { + channels, + sample_rate, + bit_depth, + } => { + els.push(MatroskaTag::TrackType(2)); + els.push(MatroskaTag::Audio(Master::Collected(vec![ + MatroskaTag::SamplingFrequency(sample_rate), + MatroskaTag::Channels(channels.try_into().unwrap()), + ]))); + els.push(MatroskaTag::BitDepth(bit_depth.try_into().unwrap())); + } + SourceTrackKind::Subtitles => { + els.push(MatroskaTag::TrackType(19)); + } + } + if let Some(d) = &codec_private { + els.push(MatroskaTag::CodecPrivate(d.clone())); + } + MatroskaTag::TrackEntry(Master::Collected(els)) +} diff --git a/remuxer/src/import/seek_index.rs b/remuxer/src/seek_index.rs index ca1ca1e..7dbb9f7 100644 --- a/remuxer/src/import/seek_index.rs +++ b/remuxer/src/seek_index.rs @@ -1,3 +1,8 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ use anyhow::Result; use jellycommon::{BlockIndex, SeekIndex}; use jellymatroska::{ @@ -94,15 +99,7 @@ fn import_seek_index_segment( "block: track={} tso={}", block.track, block.timestamp_off ); - seek_index - .entry(block.track) - .or_insert(SeekIndex { blocks: vec![] }) - .blocks - .push(BlockIndex { - pts: pts + block.timestamp_off as u64, - source_off: position, - size: block.data.len(), - }); + seek_index_add(seek_index, &block, position, pts); } _ => trace!("{item:?}"), } @@ -116,15 +113,7 @@ fn import_seek_index_segment( block.timestamp_off ); trace!("{pts} {}", block.timestamp_off); - seek_index - .entry(block.track) - .or_insert(SeekIndex { blocks: vec![] }) - .blocks - .push(BlockIndex { - pts: (pts as i64 + block.timestamp_off as i64) as u64, - source_off: position, - size: block.data.len(), - }); + seek_index_add(seek_index, &block, position, pts); } _ => trace!("(rsc) tag ignored: {item:?}"), } @@ -139,3 +128,23 @@ fn import_seek_index_segment( } Ok(()) } + +fn seek_index_add( + seek_index: &mut BTreeMap<u64, SeekIndex>, + block: &Block, + position: usize, + pts_base: u64, +) { + let trs = seek_index + .entry(block.track) + .or_insert(SeekIndex::default()); + + if block.keyframe { + trs.keyframes.push(trs.blocks.len()); + } + trs.blocks.push(BlockIndex { + pts: pts_base + block.timestamp_off as u64, + source_off: position, + size: block.data.len(), + }); +} diff --git a/remuxer/src/segment_extractor.rs b/remuxer/src/segment_extractor.rs index b947c57..0ceac72 100644 --- a/remuxer/src/segment_extractor.rs +++ b/remuxer/src/segment_extractor.rs @@ -1,3 +1,8 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ use anyhow::{anyhow, bail, Result}; use jellymatroska::{block::Block, read::EbmlReader, unflatten::IterWithPos, Master, MatroskaTag}; use log::{debug, trace, warn}; diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs new file mode 100644 index 0000000..65d63ff --- /dev/null +++ b/remuxer/src/snippet.rs @@ -0,0 +1,5 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/
\ No newline at end of file diff --git a/remuxer/src/trim_writer.rs b/remuxer/src/trim_writer.rs index d278894..2a7fb84 100644 --- a/remuxer/src/trim_writer.rs +++ b/remuxer/src/trim_writer.rs @@ -1,3 +1,8 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ use std::{ io::{Seek, Write}, ops::Range, diff --git a/server/Cargo.toml b/server/Cargo.toml index 9a87f61..291f56b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -20,7 +20,7 @@ env_logger = "0.10.0" rand = "0.8.5" base64 = "0.21.4" chrono = { version = "0.4.31", features = ["serde"] } -vte = "0.11.1" +vte = "0.12.0" argon2 = "0.5.2" aes-gcm-siv = "0.11.1" diff --git a/stream/src/hls.rs b/stream/src/hls.rs new file mode 100644 index 0000000..6ddc2a4 --- /dev/null +++ b/stream/src/hls.rs @@ -0,0 +1,5 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 5051b18..5f9edb9 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -1,3 +1,11 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ +pub mod hls; +pub mod segment; + use anyhow::{anyhow, bail, Context, Result}; use jellybase::CONF; use jellycommon::{ diff --git a/stream/src/segment.rs b/stream/src/segment.rs new file mode 100644 index 0000000..ed4f5ef --- /dev/null +++ b/stream/src/segment.rs @@ -0,0 +1,18 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin.org> +*/ +use jellycommon::{stream::StreamSpec, LocalTrack, Node}; +use std::ops::Range; +use tokio::io::DuplexStream; + +pub async fn stream_segment( + node: Node, + track_sources: Vec<LocalTrack>, + spec: StreamSpec, + range: Range<usize>, + b: DuplexStream, +) { + +} |