diff options
-rw-r--r-- | matroska/src/write.rs | 45 | ||||
-rw-r--r-- | remuxer/src/lib.rs | 181 | ||||
-rw-r--r-- | remuxer/src/segment_extractor.rs | 107 | ||||
-rw-r--r-- | remuxer/src/trim_writer.rs | 64 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 21 |
5 files changed, 280 insertions, 138 deletions
diff --git a/matroska/src/write.rs b/matroska/src/write.rs index 6f8aad4..2555380 100644 --- a/matroska/src/write.rs +++ b/matroska/src/write.rs @@ -5,19 +5,16 @@ */ use crate::{matroska::MatroskaTag, size::EbmlSize, Master}; use anyhow::{bail, Result}; -use std::io::Write; +use std::io::{Seek, Write}; -pub struct EbmlWriter { - inner: Box<dyn Write>, +pub struct EbmlWriter<W> { + inner: W, position: usize, } -impl EbmlWriter { - pub fn new<T: Write + 'static>(inner: T, position: usize) -> Self { - Self { - inner: Box::new(inner), - position, - } +impl<W: Write> EbmlWriter<W> { + pub fn new(inner: W, position: usize) -> Self { + Self { inner, position } } pub fn write(&mut self, data: &[u8]) -> Result<()> { @@ -66,18 +63,9 @@ impl EbmlWriter { if i > (1 << 56) - 1 { bail!("vint does not fit"); } - self.write_vint_len(i, Self::vint_length(i)) - } - pub fn vint_length(v: u64) -> usize { - let mut len = 1; - while len <= 8 { - if v < (1 << ((7 * len) - 1)) { - break; - } - len += 1; - } - len + self.write_vint_len(i, vint_length(i)) } + pub fn write_vint_len(&mut self, i: u64, len: usize) -> Result<()> { let mut bytes = i.to_be_bytes(); let trunc = &mut bytes[(8 - len)..]; @@ -86,6 +74,12 @@ impl EbmlWriter { } } +impl<W: Seek> Seek for EbmlWriter<W> { + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> { + self.inner.seek(pos) + } +} + impl MatroskaTag { pub fn write_full(&self, w: &mut Vec<u8>) -> Result<()> { let mut buf = vec![]; @@ -97,6 +91,17 @@ impl MatroskaTag { } } +pub fn vint_length(v: u64) -> usize { + let mut len = 1; + while len <= 8 { + if v < (1 << ((7 * len) - 1)) { + break; + } + len += 1; + } + len +} + pub trait WriteValue { fn write_to(&self, w: &mut Vec<u8>) -> Result<()>; } diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index 3ce6b78..ead0051 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -4,18 +4,29 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ pub mod import; +pub mod segment_extractor; +pub mod trim_writer; -use anyhow::{anyhow, Context, Result}; +use crate::{ + segment_extractor::{AbsoluteBlock, SegmentExtractIter}, + trim_writer::TrimWriter, +}; +use anyhow::{anyhow, Context}; use jellycommon::{BlockIndex, ItemInfo, SeekIndex, SourceTrack, SourceTrackKind}; use jellymatroska::{ - block::Block, read::EbmlReader, - unflatten::{IterWithPos, Unflat, Unflatten}, - write::EbmlWriter, + unflatten::{IterWithPos, Unflatten}, + write::{vint_length, EbmlWriter}, Master, MatroskaTag, }; -use log::{debug, info, trace, warn}; -use std::{collections::VecDeque, fs::File, io::Write, path::PathBuf}; +use log::{debug, info, trace}; +use std::{ + fs::File, + io::{Seek, SeekFrom, Write}, + ops::Range, + path::PathBuf, + time::Instant, +}; #[derive(Debug, Clone)] pub struct RemuxerContext {} @@ -29,13 +40,14 @@ impl RemuxerContext { pub fn generate_into( &self, writer: impl Write + 'static, - _offset: usize, + range: Range<usize>, path_base: PathBuf, iteminfo: ItemInfo, selection: Vec<usize>, webm: bool, ) -> anyhow::Result<()> { info!("remuxing {:?} to have tracks {selection:?}", iteminfo.title); + let writer = TrimWriter::new(writer, range.clone()); let mut output = EbmlWriter::new(writer, 0); struct ReaderC { @@ -46,6 +58,8 @@ impl RemuxerContext { temp_index: usize, } + let timing_cp = Instant::now(); + let mut inputs = selection .iter() .enumerate() @@ -82,6 +96,12 @@ impl RemuxerContext { }) .collect::<anyhow::Result<Vec<_>>>()?; + info!( + "(perf) prepare inputs: {}ms", + (Instant::now() - timing_cp).as_millis() + ); + let timing_cp = Instant::now(); + output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![ MatroskaTag::EbmlVersion(1), MatroskaTag::EbmlReadVersion(1), @@ -106,7 +126,6 @@ impl RemuxerContext { MatroskaTag::WritingApp("jellything".to_string()), ])))?; output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; - // output.write_tag(&MatroskaTag::Cues(Master::Collected(vec![])))?; let tracks_header = inputs .iter() @@ -143,12 +162,11 @@ impl RemuxerContext { } inputs[best_index].temp_index += 1; if best_block.pts > cluster_pts + 2_000 { - let cluster_content_size = 1 // timestamp tag - + 1 // timestamp tag size - + EbmlWriter::vint_length(cluster_pts) // timestamp tag value + let cluster_content_size = 1 + 1 // timestamp {tag, size} + + vint_length(cluster_pts) // timestamp tag value + p; let cluster_header_size = 4 // tag length - + EbmlWriter::vint_length(cluster_content_size as u64)// size varint + + vint_length(cluster_content_size as u64) // size varint + cluster_content_size; clusters.push(ClusterLayout { position: gp, @@ -157,14 +175,14 @@ impl RemuxerContext { }); cluster_pts = best_block.pts; - gp += p + cluster_header_size; + gp += cluster_header_size; p = 0; } if best_block.pts == u64::MAX { break; } p += 1; // simpleblock tag - p += EbmlWriter::vint_length(1 + 2 + 1 + best_block.size as u64); // simpleblock size vint + p += vint_length(1 + 2 + 1 + best_block.size as u64); // simpleblock size vint p += 1 + 2 + 1; // block {tracknum, pts_off, flags} // TODO does not work, if more than 127 tracks are present p += best_block.size; // block payload @@ -173,6 +191,11 @@ impl RemuxerContext { info!("segment layout computed ({} clusters)", clusters.len()); clusters }; + info!( + "(perf) compute segment layout: {}ms", + (Instant::now() - timing_cp).as_millis() + ); + let timing_cp = Instant::now(); output.write_tag(&MatroskaTag::Cues(Master::Collected( segment_layout @@ -202,6 +225,19 @@ impl RemuxerContext { .collect(), )))?; + let segment_start_position = output.position(); + let mut skip = 0; + for cluster in &segment_layout { + if (cluster.position + segment_start_position) > range.start { + break; + } + skip += 1; + } + if skip != 0 { + info!("skipping {skip} clusters"); + output.seek(SeekFrom::Start(segment_layout[skip].position as u64))?; + } + struct ReaderD<'a> { _info: SourceTrack, peek: Option<AbsoluteBlock>, @@ -218,14 +254,17 @@ impl RemuxerContext { break; } } - let mut stream = SegmentExtractIter { - segment: Unflatten::new_with_end( - &mut i.reader, + i.reader + .seek( + segment_layout[skip].position, MatroskaTag::Segment(Master::Start), - ), - extract: i.info.track_number, - emission_queue: VecDeque::new(), - }; + ) + .context("seeking in input")?; + let mut stream = SegmentExtractIter::new( + Unflatten::new_with_end(&mut i.reader, MatroskaTag::Segment(Master::Start)), + i.info.track_number, + ); + ks.push(ReaderD { mapped: i.mapped, peek: Some(stream.next()?), @@ -233,9 +272,12 @@ impl RemuxerContext { _info: i.info.clone(), }); } + info!( + "(perf) seek inputs: {}ms", + (Instant::now() - timing_cp).as_millis() + ); - let segment_start_position = output.position(); - for (cluster_index, cluster) in segment_layout.into_iter().enumerate() { + for (cluster_index, cluster) in segment_layout.into_iter().skip(skip).enumerate() { info!( "writing cluster {cluster_index} (pts_base={}) with {} blocks", cluster.timestamp, @@ -258,7 +300,7 @@ impl RemuxerContext { block.inner.track = kn.mapped; block.inner.timestamp_off = (iblock.pts - cluster.timestamp).try_into().unwrap(); - debug!("n={} tso={}", block.inner.track, block.inner.timestamp_off); + trace!("n={} tso={}", block.inner.track, block.inner.timestamp_off); let buf = block.inner.dump(); cluster_blocks.push(MatroskaTag::SimpleBlock(buf)) } @@ -269,97 +311,6 @@ impl RemuxerContext { } } -struct AbsoluteBlock { - pts_base: u64, - inner: Block, -} - -struct SegmentExtractIter<'a> { - segment: Unflatten<'a>, - extract: u64, - emission_queue: VecDeque<AbsoluteBlock>, -} - -impl AbsoluteBlock { - pub fn pts(&self) -> u64 { - self.inner.timestamp_off as u64 + self.pts_base - } -} - -impl SegmentExtractIter<'_> { - pub fn next(&mut self) -> Result<AbsoluteBlock> { - loop { - if let Some(b) = self.emission_queue.pop_front() { - break Ok(b); - } - self.read()?; - } - } - - pub fn read(&mut self) -> Result<()> { - let Unflat { children, item } = self.segment.n().ok_or(anyhow!("eof"))??; - let mut pts_base = 0; - match item { - MatroskaTag::SeekHead(_) => {} - MatroskaTag::Info(_) => {} - MatroskaTag::Cluster(_) => { - let mut children = children.unwrap(); - while let Some(Ok(Unflat { children, item })) = children.n() { - match item { - MatroskaTag::Crc32(_) => (), - MatroskaTag::Timestamp(ts) => { - trace!("read pts={ts}"); - pts_base = ts; - } - MatroskaTag::BlockGroup(_) => { - trace!("group"); - let mut children = children.unwrap(); - - // let mut duration = None; - let mut block = None; - - while let Some(Ok(Unflat { children: _, item })) = children.n() { - match item { - MatroskaTag::Block(buf) => block = Some(buf), - // MatroskaTag::BlockDuration(v) => duration = Some(v), - _ => debug!("ignored {item:?}"), - } - } - // TODO duration - let block = Block::parse(&block.unwrap())?; - if block.track == self.extract { - trace!("block: track={} tso={}", block.track, block.timestamp_off); - self.emission_queue.push_back(AbsoluteBlock { - pts_base, - inner: block, - }); - } - } - MatroskaTag::SimpleBlock(buf) => { - let block = Block::parse(&buf)?; - if block.track == self.extract { - trace!("block: track={} tso={}", block.track, block.timestamp_off); - self.emission_queue.push_back(AbsoluteBlock { - pts_base, - inner: block, - }); - } - } - _ => warn!("(rsc) tag ignored: {item:?}"), - } - } - } - MatroskaTag::Tags(_) => {} - MatroskaTag::Cues(_) => {} - MatroskaTag::Chapters(_) => {} - MatroskaTag::Tracks(_) => {} - MatroskaTag::Void(_) => {} - _ => debug!("(rs) tag ignored: {item:?}"), - } - Ok(()) - } -} - pub fn track_to_ebml(number: u64, track: &SourceTrack) -> MatroskaTag { let mut els = vec![ MatroskaTag::TrackNumber(number), diff --git a/remuxer/src/segment_extractor.rs b/remuxer/src/segment_extractor.rs new file mode 100644 index 0000000..acbec53 --- /dev/null +++ b/remuxer/src/segment_extractor.rs @@ -0,0 +1,107 @@ +use anyhow::{anyhow, Result}; +use jellymatroska::{ + block::Block, + unflatten::{Unflat, Unflatten}, + MatroskaTag, +}; +use log::{debug, trace, warn}; +use std::collections::VecDeque; + +pub struct AbsoluteBlock { + pub pts_base: u64, + pub inner: Block, +} + +pub struct SegmentExtractIter<'a> { + segment: Unflatten<'a>, + extract: u64, + emission_queue: VecDeque<AbsoluteBlock>, +} + +impl AbsoluteBlock { + pub fn pts(&self) -> u64 { + self.inner.timestamp_off as u64 + self.pts_base + } +} + +impl<'a> SegmentExtractIter<'a> { + pub fn new(segment: Unflatten<'a>, extract: u64) -> Self { + Self { + segment, + extract, + emission_queue: Default::default(), + } + } + + pub fn next(&mut self) -> Result<AbsoluteBlock> { + loop { + if let Some(b) = self.emission_queue.pop_front() { + break Ok(b); + } + self.read()?; + } + } + + pub fn read(&mut self) -> Result<()> { + let Unflat { children, item } = self.segment.n().ok_or(anyhow!("eof"))??; + let mut pts_base = 0; + match item { + MatroskaTag::SeekHead(_) => {} + MatroskaTag::Info(_) => {} + MatroskaTag::Cluster(_) => { + let mut children = children.unwrap(); + while let Some(Ok(Unflat { children, item })) = children.n() { + match item { + MatroskaTag::Crc32(_) => (), + MatroskaTag::Timestamp(ts) => { + trace!("read pts={ts}"); + pts_base = ts; + } + MatroskaTag::BlockGroup(_) => { + trace!("group"); + let mut children = children.unwrap(); + + // let mut duration = None; + let mut block = None; + + while let Some(Ok(Unflat { children: _, item })) = children.n() { + match item { + MatroskaTag::Block(buf) => block = Some(buf), + // MatroskaTag::BlockDuration(v) => duration = Some(v), + _ => debug!("ignored {item:?}"), + } + } + // TODO duration + let block = Block::parse(&block.unwrap())?; + if block.track == self.extract { + trace!("block: track={} tso={}", block.track, block.timestamp_off); + self.emission_queue.push_back(AbsoluteBlock { + pts_base, + inner: block, + }); + } + } + MatroskaTag::SimpleBlock(buf) => { + let block = Block::parse(&buf)?; + if block.track == self.extract { + trace!("block: track={} tso={}", block.track, block.timestamp_off); + self.emission_queue.push_back(AbsoluteBlock { + pts_base, + inner: block, + }); + } + } + _ => warn!("(rsc) tag ignored: {item:?}"), + } + } + } + MatroskaTag::Tags(_) => {} + MatroskaTag::Cues(_) => {} + MatroskaTag::Chapters(_) => {} + MatroskaTag::Tracks(_) => {} + MatroskaTag::Void(_) => {} + _ => debug!("(rs) tag ignored: {item:?}"), + } + Ok(()) + } +} diff --git a/remuxer/src/trim_writer.rs b/remuxer/src/trim_writer.rs new file mode 100644 index 0000000..65d3589 --- /dev/null +++ b/remuxer/src/trim_writer.rs @@ -0,0 +1,64 @@ +use std::{ + io::{Seek, Write}, + ops::Range, +}; + +use anyhow::anyhow; +use log::warn; + +pub struct TrimWriter<W> { + inner: W, + position: usize, + range: Range<usize>, +} +impl<W: Write> TrimWriter<W> { + pub fn new(inner: W, range: Range<usize>) -> Self { + Self { + inner, + range, + position: 0, + } + } +} + +impl<W: Write> Write for TrimWriter<W> { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + let start = self.range.start as isize - self.position as isize; + let end = self.range.end as isize - self.position as isize; + + let start = start.clamp(0, buf.len() as isize) as usize; + let end = end.clamp(0, buf.len() as isize) as usize; + + if self.position >= self.range.end { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + anyhow!("range ended"), + )); + } + + let buf = &buf[start..end]; + if !buf.is_empty() { + self.inner.write_all(buf)?; + self.position += buf.len() + } + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +impl<W> Seek for TrimWriter<W> { + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> { + match pos { + std::io::SeekFrom::Start(s) => self.position = s as usize, + std::io::SeekFrom::End(_) => unimplemented!(), + std::io::SeekFrom::Current(s) => self.position += s as usize, + } + if self.position < self.range.end { + warn!("seeked beyond end") + } + Ok(self.position as u64) + } +} diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index e7547bd..749077c 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -17,7 +17,7 @@ use rocket::{ }; use std::{ ops::{Deref, Range}, - path::{PathBuf, Path}, + path::{Path, PathBuf}, }; use tokio::io::{duplex, DuplexStream}; use tokio_util::io::SyncIoBridge; @@ -45,7 +45,13 @@ pub fn r_stream( library: &State<Library>, range: Option<RequestRange>, ) -> Result<StreamResponse, MyError> { - info!("stream request (range={range:?})"); + info!( + "stream request (range={})", + range + .as_ref() + .map(|r| r.to_cr_hv()) + .unwrap_or(format!("none")) + ); let (a, b) = duplex(1024); let path = path.to_str().unwrap().to_string(); let item = library @@ -60,10 +66,19 @@ pub fn r_stream( let b = SyncIoBridge::new(b); + let urange = match &range { + Some(r) => { + // TODO this can crash + let r = &r.0[0]; + r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) + } + None => 0..(isize::MAX as usize), + }; + tokio::task::spawn_blocking(move || { if let Err(e) = remuxer.generate_into( b, - 0, + urange, item.fs_path.parent().unwrap().to_path_buf(), item.info.clone(), tracks, |