aboutsummaryrefslogtreecommitdiff
path: root/stream
diff options
context:
space:
mode:
Diffstat (limited to 'stream')
-rw-r--r--stream/Cargo.toml1
-rw-r--r--stream/src/lib.rs54
2 files changed, 50 insertions, 5 deletions
diff --git a/stream/Cargo.toml b/stream/Cargo.toml
index 804bc1c..a03a85c 100644
--- a/stream/Cargo.toml
+++ b/stream/Cargo.toml
@@ -7,6 +7,7 @@ edition = "2021"
jellycommon = { path = "../common", features = ["rocket"] }
jellybase = { path = "../base" }
jellytranscoder = { path = "../transcoder" }
+jellyremuxer = { path = "../remuxer" }
log = { workspace = true }
anyhow = { workspace = true }
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index df75cf5..867b310 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -1,15 +1,59 @@
-use jellycommon::{stream::StreamSpec, Node};
+use anyhow::{anyhow, bail, Result};
+use jellybase::CONF;
+use jellycommon::{
+ stream::{StreamFormat, StreamSpec},
+ LocalTrack, MediaSource, Node,
+};
use std::ops::Range;
use tokio::io::{duplex, DuplexStream};
use tokio_util::io::SyncIoBridge;
-pub async fn stream(
+pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result<DuplexStream> {
+ let (a, b) = duplex(4096);
+
+ let track_sources = match node
+ .private
+ .source
+ .as_ref()
+ .ok_or(anyhow!("node has no media"))?
+ {
+ MediaSource::Local { tracks } => tracks.to_owned(),
+ _ => bail!("node tracks are not local"),
+ };
+
+ match spec.format {
+ StreamFormat::Original => todo!(),
+ StreamFormat::Matroska | StreamFormat::Webm => {
+ remux_stream(node, track_sources, spec, range, b).await?
+ }
+ StreamFormat::Hls => todo!(),
+ StreamFormat::Jhls => todo!(),
+ StreamFormat::Segment => todo!(),
+ }
+
+ Ok(a)
+}
+
+async fn remux_stream(
node: Node,
+ track_sources: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
-) -> anyhow::Result<DuplexStream> {
- let (a, b) = duplex(4096);
+ b: DuplexStream,
+) -> Result<()> {
let b = SyncIoBridge::new(b);
- Ok(a)
+ tokio::task::spawn_blocking(move || {
+ jellyremuxer::remux_stream_into(
+ b,
+ range,
+ CONF.library_path.to_owned(),
+ node.public,
+ track_sources,
+ spec.tracks,
+ spec.format == StreamFormat::Webm,
+ )
+ });
+
+ Ok(())
}