aboutsummaryrefslogtreecommitdiff
path: root/stream/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r--stream/src/lib.rs167
1 files changed, 72 insertions, 95 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 00338c1..4df87ae 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -5,136 +5,113 @@
*/
#![feature(iterator_try_collect)]
pub mod fragment;
+pub mod fragment_index;
pub mod hls;
-pub mod jhls;
+pub mod stream_info;
pub mod webvtt;
use anyhow::{anyhow, bail, Context, Result};
use fragment::fragment_stream;
-use hls::{hls_master_stream, hls_variant_stream};
-use jellybase::{
- assetfed::AssetInner,
- common::{
- stream::{StreamFormat, StreamSpec},
- user::{PermissionSet, UserPermission},
- LocalTrack, Node, TrackSource,
- },
- permission::PermissionSetExt,
- CONF,
+use fragment_index::fragment_index_stream;
+use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream};
+use jellybase::common::{
+ stream::{StreamContainer, StreamSpec},
+ Node,
};
-use jhls::jhls_index;
-use std::{io::SeekFrom, ops::Range, sync::Arc};
+use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc};
+use stream_info::{stream_info, write_stream_info};
use tokio::{
fs::File,
io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
};
-use tokio_util::io::SyncIoBridge;
-use webvtt::vtt_stream;
+
+#[derive(Debug)]
+pub struct SMediaInfo {
+ pub info: Arc<Node>,
+ pub files: BTreeSet<PathBuf>,
+}
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
}
-#[rustfmt::skip]
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" };
- match spec.format {
- StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true },
- StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
- StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
- StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false },
- StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false },
- StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false },
- StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false },
+ let cons = |ct: &'static str, rs: bool| StreamHead {
+ content_type: ct,
+ range_supported: rs,
+ };
+ let container_ct = |x: StreamContainer| match x {
+ StreamContainer::WebM => "video/webm",
+ StreamContainer::Matroska => "video/x-matroska",
+ StreamContainer::WebVTT => "text/vtt",
+ StreamContainer::JVTT => "application/jellything-vtt+json",
+ StreamContainer::MPEG4 => "video/mp4",
+ };
+ match spec {
+ StreamSpec::Whep { .. } => cons("application/x-todo", false),
+ StreamSpec::WhepControl { .. } => cons("application/x-todo", false),
+ StreamSpec::Remux { container, .. } => cons(container_ct(*container), true),
+ StreamSpec::Original { .. } => cons("video/x-matroska", true),
+ StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false),
+ StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false),
+ StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false),
}
}
pub async fn stream(
- node: Arc<Node>,
+ info: Arc<SMediaInfo>,
spec: StreamSpec,
range: Range<usize>,
- perms: &PermissionSet,
) -> Result<DuplexStream> {
- perms.assert(&UserPermission::StreamFormat(spec.format))?;
-
let (a, b) = duplex(4096);
- // TODO remux of mixed remote and local tracks?!
- let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?;
-
- let local_tracks = spec
- .track
- .iter()
- .map(|i| {
- anyhow::Ok(
- match &track_sources
- .tracks
- .get(*i)
- .ok_or(anyhow!("track does not exist"))?
- .source
- {
- TrackSource::Local(t) => AssetInner::deser(&t.0)?
- .as_local_track()
- .ok_or(anyhow!("asset not a track"))?,
- TrackSource::Remote(_) => bail!("track is not local"),
- },
- )
- })
- .collect::<anyhow::Result<Vec<_>>>()?
- .into_iter()
- .collect::<Vec<_>>();
-
- match spec.format {
- StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?,
- StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?,
- StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?,
- StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?,
- StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?,
- StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?,
- StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?,
- StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?,
+ match spec {
+ StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
+ StreamSpec::HlsSuperMultiVariant { container } => {
+ hls_supermultivariant_stream(b, info, container).await?;
+ }
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ hls_multivariant_stream(b, info, segment, container).await?
+ }
+ StreamSpec::HlsVariant {
+ segment,
+ track,
+ container,
+ format,
+ } => hls_variant_stream(b, info, segment, track, format, container).await?,
+ StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?,
+ StreamSpec::FragmentIndex { segment, track } => {
+ fragment_index_stream(b, info, segment, track).await?
+ }
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ } => fragment_stream(b, info, track, segment, index, format, container).await?,
+ _ => bail!("todo"),
}
Ok(a)
}
-async fn remux_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
- range: Range<usize>,
- b: DuplexStream,
-) -> Result<()> {
- let b = SyncIoBridge::new(b);
-
- tokio::task::spawn_blocking(move || {
- jellyremuxer::remux_stream_into(
- b,
- range,
- CONF.media_path.to_owned(),
- &node,
- local_tracks,
- spec.track,
- spec.webm.unwrap_or(false),
- )
- });
-
- Ok(())
-}
-
async fn original_stream(
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
+ info: Arc<SMediaInfo>,
+ track: usize,
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
- }
-
- let source = local_tracks[spec.track[0]].clone();
- let mut file = File::open(CONF.media_path.join(source.path))
+ let (iinfo, _info) = stream_info(info).await?;
+ let (file_index, _) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("unknown track"))?;
+ let mut file = File::open(&iinfo.paths[file_index])
.await
.context("opening source")?;
file.seek(SeekFrom::Start(range.start as u64))