aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-09-29 21:15:07 +0200
committermetamuffin <metamuffin@disroot.org>2023-09-29 21:15:07 +0200
commit795438f691657c2fdcfe113f933950b63df84271 (patch)
treefba3d1e6d34bb92709282a4826bbbbbd9215ef1e
parentc62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914 (diff)
downloadjellything-795438f691657c2fdcfe113f933950b63df84271.tar
jellything-795438f691657c2fdcfe113f933950b63df84271.tar.bz2
jellything-795438f691657c2fdcfe113f933950b63df84271.tar.zst
stream wrapper for remux mkv/webm
-rw-r--r--Cargo.lock1
-rw-r--r--client/src/lib.rs16
-rw-r--r--common/src/stream.rs46
-rw-r--r--remuxer/src/lib.rs495
-rw-r--r--server/src/routes/stream.rs4
-rw-r--r--stream/Cargo.toml1
-rw-r--r--stream/src/lib.rs54
7 files changed, 349 insertions, 268 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d7246e6..2c20b9b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1608,6 +1608,7 @@ dependencies = [
"anyhow",
"jellybase",
"jellycommon",
+ "jellyremuxer",
"jellytranscoder",
"log",
"tokio",
diff --git a/client/src/lib.rs b/client/src/lib.rs
index bd9882c..200c869 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -4,17 +4,16 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
use anyhow::Result;
-use jc::stream::StreamSpec;
-use jellycommon::NodePublic;
use log::debug;
use reqwest::{
header::{HeaderMap, HeaderValue},
Client,
};
use serde_json::json;
-use std::{io::Write, time::Duration};
+use std::time::Duration;
+use stream::StreamSpec;
-pub use jellycommon as jc;
+pub use jellycommon::*;
#[derive(Debug, Clone)]
pub struct Instance {
@@ -90,7 +89,12 @@ impl Session {
}
// TODO use AssetRole instead of str
- pub async fn node_asset(&self, id: &str, role: &str, mut writer: impl Write) -> Result<()> {
+ pub async fn node_asset(
+ &self,
+ id: &str,
+ role: &str,
+ mut writer: impl std::io::Write,
+ ) -> Result<()> {
debug!("downloading asset {role:?} for {id:?}");
let mut r = self
.client
@@ -108,7 +112,7 @@ impl Session {
"{}/n/{}/stream?{}&{}",
self.instance.base(),
id,
- todo!(),
+ stream_spec.to_query(),
self.session_param()
)
}
diff --git a/common/src/stream.rs b/common/src/stream.rs
index 59166b8..3fb1008 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -13,7 +13,7 @@ pub struct StreamSpec {
}
#[rustfmt::skip]
-#[derive(Debug, Clone, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))]
pub enum StreamFormat {
@@ -36,3 +36,47 @@ impl Default for StreamSpec {
}
}
}
+
+impl StreamSpec {
+ pub fn to_query(&self) -> String {
+ use std::fmt::Write;
+ let mut u = String::new();
+ writeln!(u, "format={}", self.format.ident()).unwrap();
+
+ if !self.tracks.is_empty() {
+ writeln!(
+ u,
+ "&tracks={}",
+ self.tracks
+ .iter()
+ .map(|s| s.to_string())
+ .collect::<Vec<_>>()
+ .join(",")
+ )
+ .unwrap();
+ }
+ if let Some(abr) = self.abr {
+ writeln!(u, "&abr={abr}").unwrap();
+ }
+ if let Some(vbr) = self.vbr {
+ writeln!(u, "&vbr={vbr}").unwrap();
+ }
+ if let Some(index) = self.index {
+ writeln!(u, "&index={index}").unwrap();
+ }
+ u
+ }
+}
+
+impl StreamFormat {
+ pub fn ident(&self) -> &'static str {
+ match self {
+ StreamFormat::Original => "original",
+ StreamFormat::Matroska => "matroska",
+ StreamFormat::Webm => "webm",
+ StreamFormat::Hls => "hls",
+ StreamFormat::Jhls => "jhls",
+ StreamFormat::Segment => "hlsseg",
+ }
+ }
+}
diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs
index 21b7fd6..a9702fb 100644
--- a/remuxer/src/lib.rs
+++ b/remuxer/src/lib.rs
@@ -25,191 +25,180 @@ use std::{
time::Instant,
};
-#[derive(Debug, Clone)]
-pub struct RemuxerContext {}
+pub fn remux_stream_into(
+ writer: impl Write + 'static,
+ range: Range<usize>,
+ path_base: PathBuf,
+ item: NodePublic,
+ track_sources: Vec<LocalTrack>,
+ selection: Vec<usize>,
+ webm: bool,
+) -> anyhow::Result<()> {
+ info!("remuxing {:?} to have tracks {selection:?}", item.title);
+ let writer = TrimWriter::new(writer, range.clone());
+ let mut output = EbmlWriter::new(writer, 0);
-impl RemuxerContext {
- #[allow(clippy::new_without_default)]
- pub fn new() -> Self {
- Self {}
+ struct ReaderC {
+ info: SourceTrack,
+ reader: EbmlReader,
+ mapped: u64,
+ index: SeekIndex,
+ source_track_index: usize,
+ codec_private: Option<Vec<u8>>,
+ layouting_progress_index: usize,
}
- pub fn generate_into(
- &self,
- writer: impl Write + 'static,
- range: Range<usize>,
- path_base: PathBuf,
- item: NodePublic,
- track_sources: Vec<LocalTrack>,
- selection: Vec<usize>,
- webm: bool,
- ) -> anyhow::Result<()> {
- info!("remuxing {:?} to have tracks {selection:?}", item.title);
- let writer = TrimWriter::new(writer, range.clone());
- let mut output = EbmlWriter::new(writer, 0);
+ let timing_cp = Instant::now();
- struct ReaderC {
- info: SourceTrack,
- reader: EbmlReader,
- mapped: u64,
- index: SeekIndex,
- source_track_index: usize,
- codec_private: Option<Vec<u8>>,
- layouting_progress_index: usize,
- }
-
- let timing_cp = Instant::now();
-
- let mut inputs = selection
- .iter()
- .enumerate()
- .map(|(index, sel)| {
- let info = item
- .media
- .as_ref()
- .unwrap()
- .tracks
- .get(*sel)
- .ok_or(anyhow!("track not available"))?
- .to_owned();
- let private = &track_sources[*sel];
- let source_path = path_base.join(&private.path);
- let mapped = index as u64 + 1;
- info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track);
- info!("\t {}", info);
- let file = File::open(&source_path).context("opening source file")?;
- let mut index =
- File::open(source_path.with_extension(format!("si.{}", private.track)))
- .context("opening seek index file")?;
- let index = bincode::decode_from_std_read::<SeekIndex, _, _>(
- &mut index,
- bincode::config::standard(),
- )?;
- debug!("\t seek index: {} blocks loaded", index.blocks.len());
- let reader = EbmlReader::new(file);
- Ok(ReaderC {
- index,
- reader,
- info,
- mapped,
- source_track_index: private.track,
- codec_private: private.codec_private.clone(),
- layouting_progress_index: 0,
- })
+ let mut inputs = selection
+ .iter()
+ .enumerate()
+ .map(|(index, sel)| {
+ let info = item
+ .media
+ .as_ref()
+ .unwrap()
+ .tracks
+ .get(*sel)
+ .ok_or(anyhow!("track not available"))?
+ .to_owned();
+ let private = &track_sources[*sel];
+ let source_path = path_base.join(&private.path);
+ let mapped = index as u64 + 1;
+ info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track);
+ info!("\t {}", info);
+ let file = File::open(&source_path).context("opening source file")?;
+ let mut index = File::open(source_path.with_extension(format!("si.{}", private.track)))
+ .context("opening seek index file")?;
+ let index = bincode::decode_from_std_read::<SeekIndex, _, _>(
+ &mut index,
+ bincode::config::standard(),
+ )?;
+ debug!("\t seek index: {} blocks loaded", index.blocks.len());
+ let reader = EbmlReader::new(file);
+ Ok(ReaderC {
+ index,
+ reader,
+ info,
+ mapped,
+ source_track_index: private.track,
+ codec_private: private.codec_private.clone(),
+ layouting_progress_index: 0,
})
- .collect::<anyhow::Result<Vec<_>>>()?;
+ })
+ .collect::<anyhow::Result<Vec<_>>>()?;
- info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp);
- let timing_cp = Instant::now();
+ info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp);
+ let timing_cp = Instant::now();
- output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![
- MatroskaTag::EbmlVersion(1),
- MatroskaTag::EbmlReadVersion(1),
- MatroskaTag::EbmlMaxIdLength(4),
- MatroskaTag::EbmlMaxSizeLength(8),
- MatroskaTag::DocType(if webm {
- "webm".to_string()
- } else {
- "matroska".to_string()
- }),
- MatroskaTag::DocTypeVersion(4),
- MatroskaTag::DocTypeReadVersion(2),
- ])))?;
+ output.write_tag(&MatroskaTag::Ebml(Master::Collected(vec![
+ MatroskaTag::EbmlVersion(1),
+ MatroskaTag::EbmlReadVersion(1),
+ MatroskaTag::EbmlMaxIdLength(4),
+ MatroskaTag::EbmlMaxSizeLength(8),
+ MatroskaTag::DocType(if webm {
+ "webm".to_string()
+ } else {
+ "matroska".to_string()
+ }),
+ MatroskaTag::DocTypeVersion(4),
+ MatroskaTag::DocTypeReadVersion(2),
+ ])))?;
- output.write_tag(&MatroskaTag::Segment(Master::Start))?;
- let segment_offset = output.position();
+ output.write_tag(&MatroskaTag::Segment(Master::Start))?;
+ let segment_offset = output.position();
- output.write_tag(&MatroskaTag::Info(Master::Collected(vec![
- MatroskaTag::TimestampScale(1_000_000),
- MatroskaTag::Duration(item.media.unwrap().duration * 1000.0),
- MatroskaTag::Title(item.title.clone()),
- MatroskaTag::MuxingApp("jellyremux".to_string()),
- MatroskaTag::WritingApp("jellything".to_string()),
- ])))?;
- output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?;
+ output.write_tag(&MatroskaTag::Info(Master::Collected(vec![
+ MatroskaTag::TimestampScale(1_000_000),
+ MatroskaTag::Duration(item.media.unwrap().duration * 1000.0),
+ MatroskaTag::Title(item.title.clone()),
+ MatroskaTag::MuxingApp("jellyremux".to_string()),
+ MatroskaTag::WritingApp("jellything".to_string()),
+ ])))?;
+ output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?;
- let tracks_header = inputs
- .iter_mut()
- .map(|rc| track_to_ebml(rc.mapped, &rc.info, rc.codec_private.take()))
- .collect();
- output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?;
+ let tracks_header = inputs
+ .iter_mut()
+ .map(|rc| track_to_ebml(rc.mapped, &rc.info, rc.codec_private.take()))
+ .collect();
+ output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?;
- struct ClusterLayout {
- position: usize,
- timestamp: u64,
- source_offsets: Vec<Option<usize>>,
- blocks: Vec<(usize, BlockIndex)>,
- }
+ struct ClusterLayout {
+ position: usize,
+ timestamp: u64,
+ source_offsets: Vec<Option<usize>>,
+ blocks: Vec<(usize, BlockIndex)>,
+ }
- let mut segment_layout: Vec<ClusterLayout> = {
- let mut cluster_pts = 0;
- let mut clusters = vec![];
- let mut cluster = vec![];
- let mut source_offsets = vec![None; inputs.len()];
- let mut gp = 0usize; // cluster position (in the segment)
- let mut p = 0usize; // block position (in the cluster)
- loop {
- let (track, block) = {
- let mut best_block = BlockIndex {
- pts: u64::MAX,
- size: 0,
- source_off: 0,
- };
- let mut best_track = 0;
- for (i, r) in inputs.iter().enumerate() {
- if let Some(v) = r.index.blocks.get(r.layouting_progress_index) {
- if v.pts < best_block.pts {
- best_block = v.to_owned();
- best_track = i;
- }
- };
- }
- (best_track, best_block)
+ let mut segment_layout: Vec<ClusterLayout> = {
+ let mut cluster_pts = 0;
+ let mut clusters = vec![];
+ let mut cluster = vec![];
+ let mut source_offsets = vec![None; inputs.len()];
+ let mut gp = 0usize; // cluster position (in the segment)
+ let mut p = 0usize; // block position (in the cluster)
+ loop {
+ let (track, block) = {
+ let mut best_block = BlockIndex {
+ pts: u64::MAX,
+ size: 0,
+ source_off: 0,
};
- inputs[track].layouting_progress_index += 1;
- source_offsets[track].get_or_insert(block.source_off);
- if block.pts > cluster_pts + 1_000 {
- let cluster_content_size = 1 + 1 // timestamp {tag, size}
+ let mut best_track = 0;
+ for (i, r) in inputs.iter().enumerate() {
+ if let Some(v) = r.index.blocks.get(r.layouting_progress_index) {
+ if v.pts < best_block.pts {
+ best_block = v.to_owned();
+ best_track = i;
+ }
+ };
+ }
+ (best_track, best_block)
+ };
+ inputs[track].layouting_progress_index += 1;
+ source_offsets[track].get_or_insert(block.source_off);
+ if block.pts > cluster_pts + 1_000 {
+ let cluster_content_size = 1 + 1 // timestamp {tag, size}
+ bad_vint_length(cluster_pts) // timestamp tag value
+ p;
- let cluster_size = 4 // tag length
+ let cluster_size = 4 // tag length
+ vint_length(cluster_content_size as u64) // size varint
+ cluster_content_size;
- clusters.push(ClusterLayout {
- position: gp, // relative to the first cluster
- timestamp: cluster_pts,
- source_offsets,
- blocks: std::mem::take(&mut cluster),
- });
+ clusters.push(ClusterLayout {
+ position: gp, // relative to the first cluster
+ timestamp: cluster_pts,
+ source_offsets,
+ blocks: std::mem::take(&mut cluster),
+ });
- cluster_pts = block.pts;
- source_offsets = vec![None; inputs.len()];
- gp += cluster_size;
- p = 0;
- }
- if block.pts == u64::MAX {
- break;
- }
+ cluster_pts = block.pts;
+ source_offsets = vec![None; inputs.len()];
+ gp += cluster_size;
+ p = 0;
+ }
+ if block.pts == u64::MAX {
+ break;
+ }
- let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags}
+ let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags}
// TODO does not work, if more than 127 tracks are present
+ block.size; // block payload
- p += 1; // simpleblock tag
- p += vint_length(simpleblock_size as u64); // simpleblock size vint
- p += simpleblock_size;
+ p += 1; // simpleblock tag
+ p += vint_length(simpleblock_size as u64); // simpleblock size vint
+ p += simpleblock_size;
- cluster.push((track, block))
- }
- info!("segment layout computed ({} clusters)", clusters.len());
- clusters
- };
- info!(
- "(perf) compute segment layout: {:?}",
- Instant::now() - timing_cp
- );
- let timing_cp = Instant::now();
+ cluster.push((track, block))
+ }
+ info!("segment layout computed ({} clusters)", clusters.len());
+ clusters
+ };
+ info!(
+ "(perf) compute segment layout: {:?}",
+ Instant::now() - timing_cp
+ );
+ let timing_cp = Instant::now();
- let max_cue_size = 4 // cues id
+ let max_cue_size = 4 // cues id
+ 8 // cues len
+ ( // cues content
1 // cp id
@@ -236,115 +225,113 @@ impl RemuxerContext {
+ 1 // void id
+ 8; // void len
- let first_cluster_offset_predict = max_cue_size + output.position();
+ let first_cluster_offset_predict = max_cue_size + output.position();
- // make the cluster position relative to the segment start as they should
- segment_layout
- .iter_mut()
- .for_each(|e| e.position += first_cluster_offset_predict - segment_offset);
+ // make the cluster position relative to the segment start as they should
+ segment_layout
+ .iter_mut()
+ .for_each(|e| e.position += first_cluster_offset_predict - segment_offset);
- output.write_tag(&MatroskaTag::Cues(Master::Collected(
- segment_layout
- .iter()
- .map(|cluster| {
- MatroskaTag::CuePoint(Master::Collected(
- Some(MatroskaTag::CueTime(cluster.timestamp))
- .into_iter()
- // TODO: Subtitles should not have cues for every cluster
- .chain(inputs.iter().map(|i| {
- MatroskaTag::CueTrackPositions(Master::Collected(vec![
- MatroskaTag::CueTrack(i.mapped),
- MatroskaTag::CueClusterPosition(cluster.position as u64),
- ]))
- }))
- .collect(),
- ))
- })
- .collect(),
- )))?;
- output.write_padding(first_cluster_offset_predict)?;
- let first_cluster_offset = output.position();
- assert_eq!(first_cluster_offset, first_cluster_offset_predict);
+ output.write_tag(&MatroskaTag::Cues(Master::Collected(
+ segment_layout
+ .iter()
+ .map(|cluster| {
+ MatroskaTag::CuePoint(Master::Collected(
+ Some(MatroskaTag::CueTime(cluster.timestamp))
+ .into_iter()
+ // TODO: Subtitles should not have cues for every cluster
+ .chain(inputs.iter().map(|i| {
+ MatroskaTag::CueTrackPositions(Master::Collected(vec![
+ MatroskaTag::CueTrack(i.mapped),
+ MatroskaTag::CueClusterPosition(cluster.position as u64),
+ ]))
+ }))
+ .collect(),
+ ))
+ })
+ .collect(),
+ )))?;
+ output.write_padding(first_cluster_offset_predict)?;
+ let first_cluster_offset = output.position();
+ assert_eq!(first_cluster_offset, first_cluster_offset_predict);
- let mut skip = 0;
- for (i, cluster) in segment_layout.iter().enumerate() {
- if (cluster.position + segment_offset) >= range.start {
- break;
- }
- skip = i;
+ let mut skip = 0;
+ for (i, cluster) in segment_layout.iter().enumerate() {
+ if (cluster.position + segment_offset) >= range.start {
+ break;
}
+ skip = i;
+ }
- if skip != 0 {
- info!("skipping {skip} clusters");
- output.seek(SeekFrom::Start(
- (segment_layout[skip].position + segment_offset) as u64,
- ))?;
- }
+ if skip != 0 {
+ info!("skipping {skip} clusters");
+ output.seek(SeekFrom::Start(
+ (segment_layout[skip].position + segment_offset) as u64,
+ ))?;
+ }
- struct ReaderD<'a> {
- peek: Option<Block>,
- stream: SegmentExtractIter<'a>,
- mapped: u64,
- }
+ struct ReaderD<'a> {
+ peek: Option<Block>,
+ stream: SegmentExtractIter<'a>,
+ mapped: u64,
+ }
- let mut track_readers = inputs
- .iter_mut()
- .enumerate()
- .map(|(i, inp)| {
- inp.reader
- .seek(
- segment_layout[skip].source_offsets[i].unwrap(), // TODO will crash if there is a "hole"
- MatroskaTag::Cluster(Master::Start),
- )
- .context("seeking in input")?;
- let mut stream =
- SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64);
+ let mut track_readers = inputs
+ .iter_mut()
+ .enumerate()
+ .map(|(i, inp)| {
+ inp.reader
+ .seek(
+ segment_layout[skip].source_offsets[i].unwrap(), // TODO will crash if there is a "hole"
+ MatroskaTag::Cluster(Master::Start),
+ )
+ .context("seeking in input")?;
+ let mut stream =
+ SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64);
- Ok(ReaderD {
- mapped: inp.mapped,
- peek: Some(stream.next()?),
- stream,
- })
+ Ok(ReaderD {
+ mapped: inp.mapped,
+ peek: Some(stream.next()?),
+ stream,
})
- .collect::<anyhow::Result<Vec<_>>>()?;
+ })
+ .collect::<anyhow::Result<Vec<_>>>()?;
- info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp);
+ info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp);
- for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) {
- debug!(
- "writing cluster {cluster_index} (pts_base={}) with {} blocks",
- cluster.timestamp,
- cluster.blocks.len()
- );
- {
- let cue_error =
- cluster.position as i64 - (output.position() - segment_offset) as i64;
- if cue_error != 0 {
- warn!("calculation was {} bytes off", cue_error);
- }
+ for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) {
+ debug!(
+ "writing cluster {cluster_index} (pts_base={}) with {} blocks",
+ cluster.timestamp,
+ cluster.blocks.len()
+ );
+ {
+ let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64;
+ if cue_error != 0 {
+ warn!("calculation was {} bytes off", cue_error);
}
+ }
- let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)];
- for (block_track, index_block) in cluster.blocks {
- let track_reader = &mut track_readers[block_track];
- let mut block = track_reader
- .peek
- .replace(track_reader.stream.next()?)
- .expect("source file too short");
+ let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)];
+ for (block_track, index_block) in cluster.blocks {
+ let track_reader = &mut track_readers[block_track];
+ let mut block = track_reader
+ .peek
+ .replace(track_reader.stream.next()?)
+ .expect("source file too short");
- assert_eq!(index_block.size, block.data.len(), "seek index is wrong");
+ assert_eq!(index_block.size, block.data.len(), "seek index is wrong");
- block.track = track_reader.mapped;
- block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap();
- trace!("n={} tso={}", block.track, block.timestamp_off);
- let buf = block.dump();
- cluster_blocks.push(MatroskaTag::SimpleBlock(buf))
- }
- output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?;
+ block.track = track_reader.mapped;
+ block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap();
+ trace!("n={} tso={}", block.track, block.timestamp_off);
+ let buf = block.dump();
+ cluster_blocks.push(MatroskaTag::SimpleBlock(buf))
}
- output.write_tag(&MatroskaTag::Segment(Master::End))?;
- Ok(())
+ output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?;
}
+ output.write_tag(&MatroskaTag::Segment(Master::End))?;
+ Ok(())
}
pub fn track_to_ebml(
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index 6b268e4..c47f589 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -24,11 +24,11 @@ pub async fn r_stream(
_sess: Session,
federation: &State<Federation>,
db: &State<Database>,
- id: String,
+ id: &str,
range: Option<RequestRange>,
spec: StreamSpec,
) -> Result<Either<StreamResponse, Redirect>, MyError> {
- let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?;
+ let node = db.node.get(&id.to_string())?.ok_or(anyhow!("node does not exist"))?;
let source = node
.private
.source
diff --git a/stream/Cargo.toml b/stream/Cargo.toml
index 804bc1c..a03a85c 100644
--- a/stream/Cargo.toml
+++ b/stream/Cargo.toml
@@ -7,6 +7,7 @@ edition = "2021"
jellycommon = { path = "../common", features = ["rocket"] }
jellybase = { path = "../base" }
jellytranscoder = { path = "../transcoder" }
+jellyremuxer = { path = "../remuxer" }
log = { workspace = true }
anyhow = { workspace = true }
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index df75cf5..867b310 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -1,15 +1,59 @@
-use jellycommon::{stream::StreamSpec, Node};
+use anyhow::{anyhow, bail, Result};
+use jellybase::CONF;
+use jellycommon::{
+ stream::{StreamFormat, StreamSpec},
+ LocalTrack, MediaSource, Node,
+};
use std::ops::Range;
use tokio::io::{duplex, DuplexStream};
use tokio_util::io::SyncIoBridge;
-pub async fn stream(
+pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result<DuplexStream> {
+ let (a, b) = duplex(4096);
+
+ let track_sources = match node
+ .private
+ .source
+ .as_ref()
+ .ok_or(anyhow!("node has no media"))?
+ {
+ MediaSource::Local { tracks } => tracks.to_owned(),
+ _ => bail!("node tracks are not local"),
+ };
+
+ match spec.format {
+ StreamFormat::Original => todo!(),
+ StreamFormat::Matroska | StreamFormat::Webm => {
+ remux_stream(node, track_sources, spec, range, b).await?
+ }
+ StreamFormat::Hls => todo!(),
+ StreamFormat::Jhls => todo!(),
+ StreamFormat::Segment => todo!(),
+ }
+
+ Ok(a)
+}
+
+async fn remux_stream(
node: Node,
+ track_sources: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
-) -> anyhow::Result<DuplexStream> {
- let (a, b) = duplex(4096);
+ b: DuplexStream,
+) -> Result<()> {
let b = SyncIoBridge::new(b);
- Ok(a)
+ tokio::task::spawn_blocking(move || {
+ jellyremuxer::remux_stream_into(
+ b,
+ range,
+ CONF.library_path.to_owned(),
+ node.public,
+ track_sources,
+ spec.tracks,
+ spec.format == StreamFormat::Webm,
+ )
+ });
+
+ Ok(())
}