diff options
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | client/src/lib.rs | 16 | ||||
-rw-r--r-- | common/src/stream.rs | 46 | ||||
-rw-r--r-- | remuxer/src/lib.rs | 495 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 4 | ||||
-rw-r--r-- | stream/Cargo.toml | 1 | ||||
-rw-r--r-- | stream/src/lib.rs | 54 |
7 files changed, 349 insertions, 268 deletions
@@ -1608,6 +1608,7 @@ dependencies = [ "anyhow", "jellybase", "jellycommon", + "jellyremuxer", "jellytranscoder", "log", "tokio", diff --git a/client/src/lib.rs b/client/src/lib.rs index bd9882c..200c869 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -4,17 +4,16 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ use anyhow::Result; -use jc::stream::StreamSpec; -use jellycommon::NodePublic; use log::debug; use reqwest::{ header::{HeaderMap, HeaderValue}, Client, }; use serde_json::json; -use std::{io::Write, time::Duration}; +use std::time::Duration; +use stream::StreamSpec; -pub use jellycommon as jc; +pub use jellycommon::*; #[derive(Debug, Clone)] pub struct Instance { @@ -90,7 +89,12 @@ impl Session { } // TODO use AssetRole instead of str - pub async fn node_asset(&self, id: &str, role: &str, mut writer: impl Write) -> Result<()> { + pub async fn node_asset( + &self, + id: &str, + role: &str, + mut writer: impl std::io::Write, + ) -> Result<()> { debug!("downloading asset {role:?} for {id:?}"); let mut r = self .client @@ -108,7 +112,7 @@ impl Session { "{}/n/{}/stream?{}&{}", self.instance.base(), id, - todo!(), + stream_spec.to_query(), self.session_param() ) } diff --git a/common/src/stream.rs b/common/src/stream.rs index 59166b8..3fb1008 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -13,7 +13,7 @@ pub struct StreamSpec { } #[rustfmt::skip] -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] #[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))] pub enum StreamFormat { @@ -36,3 +36,47 @@ impl Default for StreamSpec { } } } + +impl StreamSpec { + pub fn to_query(&self) -> String { + use std::fmt::Write; + let mut u = String::new(); + writeln!(u, "format={}", self.format.ident()).unwrap(); + + if !self.tracks.is_empty() { + writeln!( + u, + "&tracks={}", + self.tracks + .iter() + .map(|s| s.to_string()) + .collect::<Vec<_>>() + .join(",") + ) + .unwrap(); + } + if let Some(abr) = self.abr { + writeln!(u, "&abr={abr}").unwrap(); + } + if let Some(vbr) = self.vbr { + writeln!(u, "&vbr={vbr}").unwrap(); + } + if let Some(index) = self.index { + writeln!(u, "&index={index}").unwrap(); + } + u + } +} + +impl StreamFormat { + pub fn ident(&self) -> &'static str { + match self { + StreamFormat::Original => "original", + StreamFormat::Matroska => "matroska", + StreamFormat::Webm => "webm", + StreamFormat::Hls => "hls", + StreamFormat::Jhls => "jhls", + StreamFormat::Segment => "hlsseg", + } + } +} diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index 21b7fd6..a9702fb 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -25,191 +25,180 @@ use std::{ time::Instant, }; -#[derive(Debug, Clone)] -pub struct RemuxerContext {} +pub fn remux_stream_into( + writer: impl Write + 'static, + range: Range<usize>, + path_base: PathBuf, + item: NodePublic, + track_sources: Vec<LocalTrack>, + selection: Vec<usize>, + webm: bool, +) -> anyhow::Result<()> { + info!("remuxing {:?} to have tracks {selection:?}", item.title); + let writer = TrimWriter::new(writer, range.clone()); + let mut output = EbmlWriter::new(writer, 0); -impl RemuxerContext { - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - Self {} + struct ReaderC { + info: SourceTrack, + reader: EbmlReader, + mapped: u64, + index: SeekIndex, + source_track_index: usize, + codec_private: Option<Vec<u8>>, + layouting_progress_index: usize, } - pub fn generate_into( - &self, - writer: impl Write + 'static, - range: Range<usize>, - path_base: PathBuf, - item: NodePublic, - track_sources: Vec<LocalTrack>, - selection: Vec<usize>, - webm: bool, - ) -> anyhow::Result<()> { - info!("remuxing {:?} to have tracks {selection:?}", item.title); - let writer = TrimWriter::new(writer, range.clone()); - let mut output = EbmlWriter::new(writer, 0); + let timing_cp = Instant::now(); - struct ReaderC { - info: SourceTrack, - reader: EbmlReader, - mapped: u64, - index: SeekIndex, - source_track_index: usize, - codec_private: Option<Vec<u8>>, - layouting_progress_index: usize, - } - - let timing_cp = Instant::now(); - - let mut inputs = selection - .iter() - .enumerate() - .map(|(index, sel)| { - let info = item - .media - .as_ref() - .unwrap() - .tracks - .get(*sel) - .ok_or(anyhow!("track not available"))? - .to_owned(); - let private = &track_sources[*sel]; - let source_path = path_base.join(&private.path); - let mapped = index as u64 + 1; - info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track); - info!("\t {}", info); - let file = File::open(&source_path).context("opening source file")?; - let mut index = - File::open(source_path.with_extension(format!("si.{}", private.track))) - .context("opening seek index file")?; - let index = bincode::decode_from_std_read::<SeekIndex, _, _>( - &mut index, - bincode::config::standard(), - )?; - debug!("\t seek index: {} blocks loaded", index.blocks.len()); - let reader = EbmlReader::new(file); - Ok(ReaderC { - index, - reader, - info, - mapped, - source_track_index: private.track, - codec_private: private.codec_private.clone(), - layouting_progress_index: 0, - }) + let mut inputs = selection + .iter() + .enumerate() + .map(|(index, sel)| { + let info = item + .media + .as_ref() + .unwrap() + .tracks + .get(*sel) + .ok_or(anyhow!("track not available"))? + .to_owned(); + let private = &track_sources[*sel]; + let source_path = path_base.join(&private.path); + let mapped = index as u64 + 1; + info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track); + info!("\t {}", info); + let file = File::open(&source_path).context("opening source file")?; + let mut index = File::open(source_path.with_extension(format!("si.{}", private.track))) + .context("opening seek index file")?; + let index = bincode::decode_from_std_read::<SeekIndex, _, _>( + &mut index, + bincode::config::standard(), + )?; + debug!("\t seek index: {} blocks loaded", index.blocks.len()); + let reader = EbmlReader::new(file); + Ok(ReaderC { + index, + reader, + info, + mapped, + source_track_index: private.track, + codec_private: private.codec_private.clone(), + layouting_progress_index: 0, }) - .collect::<anyhow::Result<Vec<_>>>()?; + }) + .collect::<anyhow::Result<Vec<_>>>()?; - info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp); - let timing_cp = Instant::now(); + info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp); + let timing_cp = Instant::now(); - output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![ - MatroskaTag::EbmlVersion(1), - MatroskaTag::EbmlReadVersion(1), - MatroskaTag::EbmlMaxIdLength(4), - MatroskaTag::EbmlMaxSizeLength(8), - MatroskaTag::DocType(if webm { - "webm".to_string() - } else { - "matroska".to_string() - }), - MatroskaTag::DocTypeVersion(4), - MatroskaTag::DocTypeReadVersion(2), - ])))?; + output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![ + MatroskaTag::EbmlVersion(1), + MatroskaTag::EbmlReadVersion(1), + MatroskaTag::EbmlMaxIdLength(4), + MatroskaTag::EbmlMaxSizeLength(8), + MatroskaTag::DocType(if webm { + "webm".to_string() + } else { + "matroska".to_string() + }), + MatroskaTag::DocTypeVersion(4), + MatroskaTag::DocTypeReadVersion(2), + ])))?; - output.write_tag(&MatroskaTag::Segment(Master::Start))?; - let segment_offset = output.position(); + output.write_tag(&MatroskaTag::Segment(Master::Start))?; + let segment_offset = output.position(); - output.write_tag(&MatroskaTag::Info(Master::Collected(vec![ - MatroskaTag::TimestampScale(1_000_000), - MatroskaTag::Duration(item.media.unwrap().duration * 1000.0), - MatroskaTag::Title(item.title.clone()), - MatroskaTag::MuxingApp("jellyremux".to_string()), - MatroskaTag::WritingApp("jellything".to_string()), - ])))?; - output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; + output.write_tag(&MatroskaTag::Info(Master::Collected(vec![ + MatroskaTag::TimestampScale(1_000_000), + MatroskaTag::Duration(item.media.unwrap().duration * 1000.0), + MatroskaTag::Title(item.title.clone()), + MatroskaTag::MuxingApp("jellyremux".to_string()), + MatroskaTag::WritingApp("jellything".to_string()), + ])))?; + output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?; - let tracks_header = inputs - .iter_mut() - .map(|rc| track_to_ebml(rc.mapped, &rc.info, rc.codec_private.take())) - .collect(); - output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?; + let tracks_header = inputs + .iter_mut() + .map(|rc| track_to_ebml(rc.mapped, &rc.info, rc.codec_private.take())) + .collect(); + output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?; - struct ClusterLayout { - position: usize, - timestamp: u64, - source_offsets: Vec<Option<usize>>, - blocks: Vec<(usize, BlockIndex)>, - } + struct ClusterLayout { + position: usize, + timestamp: u64, + source_offsets: Vec<Option<usize>>, + blocks: Vec<(usize, BlockIndex)>, + } - let mut segment_layout: Vec<ClusterLayout> = { - let mut cluster_pts = 0; - let mut clusters = vec![]; - let mut cluster = vec![]; - let mut source_offsets = vec![None; inputs.len()]; - let mut gp = 0usize; // cluster position (in the segment) - let mut p = 0usize; // block position (in the cluster) - loop { - let (track, block) = { - let mut best_block = BlockIndex { - pts: u64::MAX, - size: 0, - source_off: 0, - }; - let mut best_track = 0; - for (i, r) in inputs.iter().enumerate() { - if let Some(v) = r.index.blocks.get(r.layouting_progress_index) { - if v.pts < best_block.pts { - best_block = v.to_owned(); - best_track = i; - } - }; - } - (best_track, best_block) + let mut segment_layout: Vec<ClusterLayout> = { + let mut cluster_pts = 0; + let mut clusters = vec![]; + let mut cluster = vec![]; + let mut source_offsets = vec![None; inputs.len()]; + let mut gp = 0usize; // cluster position (in the segment) + let mut p = 0usize; // block position (in the cluster) + loop { + let (track, block) = { + let mut best_block = BlockIndex { + pts: u64::MAX, + size: 0, + source_off: 0, }; - inputs[track].layouting_progress_index += 1; - source_offsets[track].get_or_insert(block.source_off); - if block.pts > cluster_pts + 1_000 { - let cluster_content_size = 1 + 1 // timestamp {tag, size} + let mut best_track = 0; + for (i, r) in inputs.iter().enumerate() { + if let Some(v) = r.index.blocks.get(r.layouting_progress_index) { + if v.pts < best_block.pts { + best_block = v.to_owned(); + best_track = i; + } + }; + } + (best_track, best_block) + }; + inputs[track].layouting_progress_index += 1; + source_offsets[track].get_or_insert(block.source_off); + if block.pts > cluster_pts + 1_000 { + let cluster_content_size = 1 + 1 // timestamp {tag, size} + bad_vint_length(cluster_pts) // timestamp tag value + p; - let cluster_size = 4 // tag length + let cluster_size = 4 // tag length + vint_length(cluster_content_size as u64) // size varint + cluster_content_size; - clusters.push(ClusterLayout { - position: gp, // relative to the first cluster - timestamp: cluster_pts, - source_offsets, - blocks: std::mem::take(&mut cluster), - }); + clusters.push(ClusterLayout { + position: gp, // relative to the first cluster + timestamp: cluster_pts, + source_offsets, + blocks: std::mem::take(&mut cluster), + }); - cluster_pts = block.pts; - source_offsets = vec![None; inputs.len()]; - gp += cluster_size; - p = 0; - } - if block.pts == u64::MAX { - break; - } + cluster_pts = block.pts; + source_offsets = vec![None; inputs.len()]; + gp += cluster_size; + p = 0; + } + if block.pts == u64::MAX { + break; + } - let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags} + let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags} // TODO does not work, if more than 127 tracks are present + block.size; // block payload - p += 1; // simpleblock tag - p += vint_length(simpleblock_size as u64); // simpleblock size vint - p += simpleblock_size; + p += 1; // simpleblock tag + p += vint_length(simpleblock_size as u64); // simpleblock size vint + p += simpleblock_size; - cluster.push((track, block)) - } - info!("segment layout computed ({} clusters)", clusters.len()); - clusters - }; - info!( - "(perf) compute segment layout: {:?}", - Instant::now() - timing_cp - ); - let timing_cp = Instant::now(); + cluster.push((track, block)) + } + info!("segment layout computed ({} clusters)", clusters.len()); + clusters + }; + info!( + "(perf) compute segment layout: {:?}", + Instant::now() - timing_cp + ); + let timing_cp = Instant::now(); - let max_cue_size = 4 // cues id + let max_cue_size = 4 // cues id + 8 // cues len + ( // cues content 1 // cp id @@ -236,115 +225,113 @@ impl RemuxerContext { + 1 // void id + 8; // void len - let first_cluster_offset_predict = max_cue_size + output.position(); + let first_cluster_offset_predict = max_cue_size + output.position(); - // make the cluster position relative to the segment start as they should - segment_layout - .iter_mut() - .for_each(|e| e.position += first_cluster_offset_predict - segment_offset); + // make the cluster position relative to the segment start as they should + segment_layout + .iter_mut() + .for_each(|e| e.position += first_cluster_offset_predict - segment_offset); - output.write_tag(&MatroskaTag::Cues(Master::Collected( - segment_layout - .iter() - .map(|cluster| { - MatroskaTag::CuePoint(Master::Collected( - Some(MatroskaTag::CueTime(cluster.timestamp)) - .into_iter() - // TODO: Subtitles should not have cues for every cluster - .chain(inputs.iter().map(|i| { - MatroskaTag::CueTrackPositions(Master::Collected(vec![ - MatroskaTag::CueTrack(i.mapped), - MatroskaTag::CueClusterPosition(cluster.position as u64), - ])) - })) - .collect(), - )) - }) - .collect(), - )))?; - output.write_padding(first_cluster_offset_predict)?; - let first_cluster_offset = output.position(); - assert_eq!(first_cluster_offset, first_cluster_offset_predict); + output.write_tag(&MatroskaTag::Cues(Master::Collected( + segment_layout + .iter() + .map(|cluster| { + MatroskaTag::CuePoint(Master::Collected( + Some(MatroskaTag::CueTime(cluster.timestamp)) + .into_iter() + // TODO: Subtitles should not have cues for every cluster + .chain(inputs.iter().map(|i| { + MatroskaTag::CueTrackPositions(Master::Collected(vec![ + MatroskaTag::CueTrack(i.mapped), + MatroskaTag::CueClusterPosition(cluster.position as u64), + ])) + })) + .collect(), + )) + }) + .collect(), + )))?; + output.write_padding(first_cluster_offset_predict)?; + let first_cluster_offset = output.position(); + assert_eq!(first_cluster_offset, first_cluster_offset_predict); - let mut skip = 0; - for (i, cluster) in segment_layout.iter().enumerate() { - if (cluster.position + segment_offset) >= range.start { - break; - } - skip = i; + let mut skip = 0; + for (i, cluster) in segment_layout.iter().enumerate() { + if (cluster.position + segment_offset) >= range.start { + break; } + skip = i; + } - if skip != 0 { - info!("skipping {skip} clusters"); - output.seek(SeekFrom::Start( - (segment_layout[skip].position + segment_offset) as u64, - ))?; - } + if skip != 0 { + info!("skipping {skip} clusters"); + output.seek(SeekFrom::Start( + (segment_layout[skip].position + segment_offset) as u64, + ))?; + } - struct ReaderD<'a> { - peek: Option<Block>, - stream: SegmentExtractIter<'a>, - mapped: u64, - } + struct ReaderD<'a> { + peek: Option<Block>, + stream: SegmentExtractIter<'a>, + mapped: u64, + } - let mut track_readers = inputs - .iter_mut() - .enumerate() - .map(|(i, inp)| { - inp.reader - .seek( - segment_layout[skip].source_offsets[i].unwrap(), // TODO will crash if there is a "hole" - MatroskaTag::Cluster(Master::Start), - ) - .context("seeking in input")?; - let mut stream = - SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64); + let mut track_readers = inputs + .iter_mut() + .enumerate() + .map(|(i, inp)| { + inp.reader + .seek( + segment_layout[skip].source_offsets[i].unwrap(), // TODO will crash if there is a "hole" + MatroskaTag::Cluster(Master::Start), + ) + .context("seeking in input")?; + let mut stream = + SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64); - Ok(ReaderD { - mapped: inp.mapped, - peek: Some(stream.next()?), - stream, - }) + Ok(ReaderD { + mapped: inp.mapped, + peek: Some(stream.next()?), + stream, }) - .collect::<anyhow::Result<Vec<_>>>()?; + }) + .collect::<anyhow::Result<Vec<_>>>()?; - info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp); + info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp); - for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) { - debug!( - "writing cluster {cluster_index} (pts_base={}) with {} blocks", - cluster.timestamp, - cluster.blocks.len() - ); - { - let cue_error = - cluster.position as i64 - (output.position() - segment_offset) as i64; - if cue_error != 0 { - warn!("calculation was {} bytes off", cue_error); - } + for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) { + debug!( + "writing cluster {cluster_index} (pts_base={}) with {} blocks", + cluster.timestamp, + cluster.blocks.len() + ); + { + let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64; + if cue_error != 0 { + warn!("calculation was {} bytes off", cue_error); } + } - let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)]; - for (block_track, index_block) in cluster.blocks { - let track_reader = &mut track_readers[block_track]; - let mut block = track_reader - .peek - .replace(track_reader.stream.next()?) - .expect("source file too short"); + let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)]; + for (block_track, index_block) in cluster.blocks { + let track_reader = &mut track_readers[block_track]; + let mut block = track_reader + .peek + .replace(track_reader.stream.next()?) + .expect("source file too short"); - assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); + assert_eq!(index_block.size, block.data.len(), "seek index is wrong"); - block.track = track_reader.mapped; - block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap(); - trace!("n={} tso={}", block.track, block.timestamp_off); - let buf = block.dump(); - cluster_blocks.push(MatroskaTag::SimpleBlock(buf)) - } - output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?; + block.track = track_reader.mapped; + block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap(); + trace!("n={} tso={}", block.track, block.timestamp_off); + let buf = block.dump(); + cluster_blocks.push(MatroskaTag::SimpleBlock(buf)) } - output.write_tag(&MatroskaTag::Segment(Master::End))?; - Ok(()) + output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?; } + output.write_tag(&MatroskaTag::Segment(Master::End))?; + Ok(()) } pub fn track_to_ebml( diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 6b268e4..c47f589 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -24,11 +24,11 @@ pub async fn r_stream( _sess: Session, federation: &State<Federation>, db: &State<Database>, - id: String, + id: &str, range: Option<RequestRange>, spec: StreamSpec, ) -> Result<Either<StreamResponse, Redirect>, MyError> { - let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?; + let node = db.node.get(&id.to_string())?.ok_or(anyhow!("node does not exist"))?; let source = node .private .source diff --git a/stream/Cargo.toml b/stream/Cargo.toml index 804bc1c..a03a85c 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" jellycommon = { path = "../common", features = ["rocket"] } jellybase = { path = "../base" } jellytranscoder = { path = "../transcoder" } +jellyremuxer = { path = "../remuxer" } log = { workspace = true } anyhow = { workspace = true } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index df75cf5..867b310 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -1,15 +1,59 @@ -use jellycommon::{stream::StreamSpec, Node}; +use anyhow::{anyhow, bail, Result}; +use jellybase::CONF; +use jellycommon::{ + stream::{StreamFormat, StreamSpec}, + LocalTrack, MediaSource, Node, +}; use std::ops::Range; use tokio::io::{duplex, DuplexStream}; use tokio_util::io::SyncIoBridge; -pub async fn stream( +pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result<DuplexStream> { + let (a, b) = duplex(4096); + + let track_sources = match node + .private + .source + .as_ref() + .ok_or(anyhow!("node has no media"))? + { + MediaSource::Local { tracks } => tracks.to_owned(), + _ => bail!("node tracks are not local"), + }; + + match spec.format { + StreamFormat::Original => todo!(), + StreamFormat::Matroska | StreamFormat::Webm => { + remux_stream(node, track_sources, spec, range, b).await? + } + StreamFormat::Hls => todo!(), + StreamFormat::Jhls => todo!(), + StreamFormat::Segment => todo!(), + } + + Ok(a) +} + +async fn remux_stream( node: Node, + track_sources: Vec<LocalTrack>, spec: StreamSpec, range: Range<usize>, -) -> anyhow::Result<DuplexStream> { - let (a, b) = duplex(4096); + b: DuplexStream, +) -> Result<()> { let b = SyncIoBridge::new(b); - Ok(a) + tokio::task::spawn_blocking(move || { + jellyremuxer::remux_stream_into( + b, + range, + CONF.library_path.to_owned(), + node.public, + track_sources, + spec.tracks, + spec.format == StreamFormat::Webm, + ) + }); + + Ok(()) } |