aboutsummaryrefslogtreecommitdiff
path: root/stream/src
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src')
-rw-r--r--stream/src/hls.rs8
-rw-r--r--stream/src/jhls.rs13
-rw-r--r--stream/src/lib.rs52
-rw-r--r--stream/src/segment.rs11
-rw-r--r--stream/src/webvtt.rs7
5 files changed, 54 insertions, 37 deletions
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index ea82fed..74f18b4 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -18,7 +18,7 @@ use tokio::{
pub async fn hls_master_stream(
_node: Node,
- _track_sources: Vec<LocalTrack>,
+ _local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
mut b: DuplexStream,
) -> Result<()> {
@@ -45,17 +45,15 @@ pub async fn hls_master_stream(
pub async fn hls_variant_stream(
node: Node,
- track_sources: Vec<LocalTrack>,
+ local_tracks: Vec<LocalTrack>,
mut spec: StreamSpec,
mut b: DuplexStream,
) -> Result<()> {
- let track = *spec.tracks.get(0).ok_or(anyhow!("no track"))?;
let snips = spawn_blocking(move || {
jellyremuxer::snippet::snippet_index(
&CONF.library_path,
&node.public,
- &track_sources,
- track,
+ local_tracks.get(0).ok_or(anyhow!("no track"))?,
)
})
.await??;
diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs
index 5fe2cab..600d945 100644
--- a/stream/src/jhls.rs
+++ b/stream/src/jhls.rs
@@ -3,23 +3,24 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use jellybase::{permission::PermissionSetExt, CONF};
use jellycommon::{
jhls::{JhlsMetadata, JhlsTrack},
stream::StreamSpec,
user::{PermissionSet, UserPermission},
- LocalTrack, Node,
+ Node, TrackSource,
};
use tokio::io::{AsyncWriteExt, DuplexStream};
pub async fn jhls_stream(
node: Node,
- track_sources: Vec<LocalTrack>,
+ track_sources: &[TrackSource],
_spec: StreamSpec,
mut b: DuplexStream,
perms: &PermissionSet,
) -> Result<()> {
+ let track_sources = track_sources.to_vec();
let media = node.public.media.clone().unwrap();
let tracks = tokio::task::spawn_blocking(move || {
media
@@ -38,8 +39,10 @@ pub async fn jhls_stream(
match jellyremuxer::snippet::snippet_index(
&CONF.library_path,
&node.public,
- &track_sources,
- i,
+ match &track_sources[i] {
+ TrackSource::Local(x) => x,
+ TrackSource::Remote => return Some(Err(anyhow!("das geht nicht"))),
+ },
) {
Ok(segments) => Some(Ok::<_, anyhow::Error>(JhlsTrack {
info: t.to_owned(),
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 1ee0690..ee5c78a 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -15,7 +15,7 @@ use jellybase::{permission::PermissionSetExt, CONF};
use jellycommon::{
stream::{StreamFormat, StreamSpec},
user::{PermissionSet, UserPermission},
- LocalTrack, Node,
+ LocalTrack, Node, TrackSource,
};
use jhls::jhls_stream;
use segment::segment_stream;
@@ -56,24 +56,38 @@ pub async fn stream(
let (a, b) = duplex(4096);
// TODO remux of mixed remote and local tracks?!
- let track_sources = match node
+ let track_sources = node
.private
.source
- .as_ref()
- .ok_or(anyhow!("node has no media"))?
- {
- // MediaSource::Local { tracks } => tracks.to_owned(),
- _ => bail!("node tracks are not local"),
- };
+ .to_owned()
+ .ok_or(anyhow!("node has no media"))?;
+
+ let local_tracks = spec
+ .tracks
+ .iter()
+ .map(|i| {
+ anyhow::Ok(
+ match track_sources
+ .get(*i)
+ .ok_or(anyhow!("track does not exist"))?
+ {
+ TrackSource::Local(t) => t.to_owned(),
+ TrackSource::Remote => bail!("track is not local"),
+ },
+ )
+ })
+ .collect::<anyhow::Result<Vec<_>>>()?
+ .into_iter()
+ .collect::<Vec<_>>();
match spec.format {
- StreamFormat::Original => original_stream(track_sources, spec, range, b).await?,
- StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?,
- StreamFormat::HlsMaster => hls_master_stream(node, track_sources, spec, b).await?,
- StreamFormat::HlsVariant => hls_variant_stream(node, track_sources, spec, b).await?,
- StreamFormat::Jhls => jhls_stream(node, track_sources, spec, b, perms).await?,
- StreamFormat::Segment => segment_stream(node, track_sources, spec, b, perms).await?,
- StreamFormat::Webvtt => webvtt_stream(node, track_sources, spec, b).await?,
+ StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?,
+ StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?,
+ StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?,
+ StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?,
+ StreamFormat::Jhls => jhls_stream(node, &track_sources, spec, b, perms).await?,
+ StreamFormat::Segment => segment_stream(node, local_tracks, spec, b, perms).await?,
+ StreamFormat::Webvtt => webvtt_stream(node, local_tracks, spec, b).await?,
}
Ok(a)
@@ -81,7 +95,7 @@ pub async fn stream(
async fn remux_stream(
node: Node,
- track_sources: Vec<LocalTrack>,
+ local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
b: DuplexStream,
@@ -94,7 +108,7 @@ async fn remux_stream(
range,
CONF.library_path.to_owned(),
node.public,
- track_sources,
+ local_tracks,
spec.tracks,
spec.webm.unwrap_or(false),
)
@@ -104,7 +118,7 @@ async fn remux_stream(
}
async fn original_stream(
- track_sources: Vec<LocalTrack>,
+ local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
b: DuplexStream,
@@ -113,7 +127,7 @@ async fn original_stream(
bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
}
- let source = track_sources[spec.tracks[0]].clone();
+ let source = local_tracks[spec.tracks[0]].clone();
let mut file = File::open(CONF.library_path.join(source.path))
.await
.context("opening source")?;
diff --git a/stream/src/segment.rs b/stream/src/segment.rs
index 309da1d..a2553bc 100644
--- a/stream/src/segment.rs
+++ b/stream/src/segment.rs
@@ -17,7 +17,7 @@ use tokio_util::io::SyncIoBridge;
pub async fn segment_stream(
node: Node,
- track_sources: Vec<LocalTrack>,
+ local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
mut b: DuplexStream,
perms: &PermissionSet,
@@ -28,6 +28,11 @@ pub async fn segment_stream(
let track = spec.tracks[0];
let n = spec.index.ok_or(anyhow!("segment index missing"))?;
+ let local_track = local_tracks
+ .get(0)
+ .ok_or(anyhow!("track missing"))?
+ .to_owned();
+
if let Some(profile) = spec.profile {
perms.assert(&UserPermission::Transcode)?;
let location = transcode(
@@ -41,7 +46,7 @@ pub async fn segment_stream(
SyncIoBridge::new(b),
&CONF.library_path,
&node.public,
- track_sources,
+ &local_track,
track,
false,
n,
@@ -65,7 +70,7 @@ pub async fn segment_stream(
b,
&CONF.library_path,
&node.public,
- track_sources,
+ &local_track,
track,
spec.webm.unwrap_or(false),
n,
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
index faf0cd3..b1eed2f 100644
--- a/stream/src/webvtt.rs
+++ b/stream/src/webvtt.rs
@@ -12,7 +12,7 @@ use tokio::io::{AsyncWriteExt, DuplexStream};
pub async fn webvtt_stream(
node: Node,
- track_sources: Vec<LocalTrack>,
+ local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
mut b: DuplexStream,
) -> Result<()> {
@@ -21,10 +21,7 @@ pub async fn webvtt_stream(
// TODO should use snippets too? big films take too long...
let tracki = *spec.tracks.get(0).ok_or(anyhow!("no track selected"))?;
- let local_track = track_sources
- .get(tracki)
- .ok_or(anyhow!("track does not exist"))?
- .clone();
+ let local_track = local_tracks.get(0).ok_or(anyhow!("no tracks"))?.clone();
let track = &node.public.media.unwrap().tracks[tracki];
match track.codec.as_str() {