aboutsummaryrefslogtreecommitdiff
path: root/stream/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r--stream/src/lib.rs157
1 files changed, 69 insertions, 88 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 00338c1..6f31e6b 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -9,93 +9,57 @@ pub mod hls;
pub mod jhls;
pub mod webvtt;
-use anyhow::{anyhow, bail, Context, Result};
-use fragment::fragment_stream;
-use hls::{hls_master_stream, hls_variant_stream};
-use jellybase::{
- assetfed::AssetInner,
- common::{
- stream::{StreamFormat, StreamSpec},
- user::{PermissionSet, UserPermission},
- LocalTrack, Node, TrackSource,
- },
- permission::PermissionSetExt,
- CONF,
+use anyhow::Result;
+use ebml_struct::matroska::{Info, Tracks};
+use jellybase::common::{
+ stream::{StreamContainer, StreamSpec},
+ LocalTrack, MediaInfo, Node,
};
-use jhls::jhls_index;
-use std::{io::SeekFrom, ops::Range, sync::Arc};
+use jellymatroska::block::LacingType;
+use std::{ops::Range, sync::Arc};
use tokio::{
fs::File,
- io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
+ io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
};
use tokio_util::io::SyncIoBridge;
-use webvtt::vtt_stream;
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
}
-#[rustfmt::skip]
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" };
- match spec.format {
- StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true },
- StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
- StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
- StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false },
- StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false },
- StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false },
- StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false },
+ let cons = |ct: &'static str, rs: bool| StreamHead {
+ content_type: ct,
+ range_supported: rs,
+ };
+ let container_ct = |x: StreamContainer| match x {
+ StreamContainer::WebM => "video/webm",
+ StreamContainer::Matroska => "video/x-matroska",
+ StreamContainer::WebVTT => "text/vtt",
+ StreamContainer::JVTT => "application/jellything-vtt+json",
+ };
+ match spec {
+ StreamSpec::Whep { .. } => cons("application/x-todo", false),
+ StreamSpec::WhepControl { .. } => cons("application/x-todo", false),
+ StreamSpec::Remux { container, .. } => cons(container_ct(*container), true),
+ StreamSpec::Original { .. } => cons("video/x-matroska", true),
+ StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false),
+ StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false),
+ StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false),
}
}
pub async fn stream(
- node: Arc<Node>,
+ info: Arc<MediaInfo>,
spec: StreamSpec,
range: Range<usize>,
- perms: &PermissionSet,
) -> Result<DuplexStream> {
- perms.assert(&UserPermission::StreamFormat(spec.format))?;
-
let (a, b) = duplex(4096);
- // TODO remux of mixed remote and local tracks?!
- let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?;
-
- let local_tracks = spec
- .track
- .iter()
- .map(|i| {
- anyhow::Ok(
- match &track_sources
- .tracks
- .get(*i)
- .ok_or(anyhow!("track does not exist"))?
- .source
- {
- TrackSource::Local(t) => AssetInner::deser(&t.0)?
- .as_local_track()
- .ok_or(anyhow!("asset not a track"))?,
- TrackSource::Remote(_) => bail!("track is not local"),
- },
- )
- })
- .collect::<anyhow::Result<Vec<_>>>()?
- .into_iter()
- .collect::<Vec<_>>();
-
- match spec.format {
- StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?,
- StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?,
- StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?,
- StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?,
- StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?,
- StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?,
- StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?,
- StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?,
- }
-
Ok(a)
}
@@ -108,17 +72,17 @@ async fn remux_stream(
) -> Result<()> {
let b = SyncIoBridge::new(b);
- tokio::task::spawn_blocking(move || {
- jellyremuxer::remux_stream_into(
- b,
- range,
- CONF.media_path.to_owned(),
- &node,
- local_tracks,
- spec.track,
- spec.webm.unwrap_or(false),
- )
- });
+ // tokio::task::spawn_blocking(move || {
+ // jellyremuxer::remux_stream_into(
+ // b,
+ // range,
+ // CONF.media_path.to_owned(),
+ // &node,
+ // local_tracks,
+ // spec.track,
+ // spec.webm.unwrap_or(false),
+ // )
+ // });
Ok(())
}
@@ -129,19 +93,19 @@ async fn original_stream(
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
- }
+ // if spec.track.len() != 1 {
+ // bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
+ // }
- let source = local_tracks[spec.track[0]].clone();
- let mut file = File::open(CONF.media_path.join(source.path))
- .await
- .context("opening source")?;
- file.seek(SeekFrom::Start(range.start as u64))
- .await
- .context("seek source")?;
+ // let source = local_tracks[spec.track[0]].clone();
+ // let mut file = File::open(CONF.media_path.join(source.path))
+ // .await
+ // .context("opening source")?;
+ // file.seek(SeekFrom::Start(range.start as u64))
+ // .await
+ // .context("seek source")?;
- tokio::task::spawn(copy_stream(file, b, range.end - range.start));
+ // tokio::task::spawn(copy_stream(file, b, range.end - range.start));
Ok(())
}
@@ -157,3 +121,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) ->
amount -= size;
}
}
+
+// TODO functions to test seekability, get live status and enumate segments
+trait MediaSource {
+ /// Seeks to some position close to, but before, `time` ticks.
+ fn seek(&mut self, time: u64) -> Result<()>;
+ /// Retrieve headers (info and tracks) for some segment.
+ fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>;
+ /// Returns the next block and the current segment index
+ fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>;
+}
+pub struct AbsBlock {
+ track: u64,
+ pts: u64,
+ keyframe: bool,
+ lacing: Option<LacingType>,
+ data: Vec<u8>,
+}