aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--client/src/lib.rs3
-rw-r--r--common/src/stream.rs120
-rw-r--r--common/src/user.rs46
-rw-r--r--doc/api.md13
-rw-r--r--stream/Cargo.toml2
-rw-r--r--stream/src/lib.rs157
7 files changed, 143 insertions, 200 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ed04edd..9c9b347 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1803,7 +1803,9 @@ name = "jellystream"
version = "0.1.0"
dependencies = [
"anyhow",
+ "ebml-struct",
"jellybase",
+ "jellymatroska",
"jellyremuxer",
"jellytranscoder",
"log",
diff --git a/client/src/lib.rs b/client/src/lib.rs
index 96c39b6..1497e45 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -168,7 +168,8 @@ impl Session {
"{}/n/{}/stream?{}&{}",
self.instance.base(),
id,
- stream_spec.to_query(),
+ todo!(),
+ // stream_spec.to_query(),
self.session_param()
)
}
diff --git a/common/src/stream.rs b/common/src/stream.rs
index 3e227e1..46f6abc 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -1,82 +1,60 @@
-use bincode::{Decode, Encode};
/*
This file is part of jellything (https://codeberg.org/metamuffin/jellything)
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-#[cfg(feature = "rocket")]
-use rocket::{FromForm, FromFormField, UriDisplayQuery};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
-#[cfg_attr(feature = "rocket", derive(FromForm, UriDisplayQuery))]
-pub struct StreamSpec {
- pub track: Vec<usize>,
- pub format: StreamFormat,
- pub webm: Option<bool>,
- pub profile: Option<usize>,
- pub index: Option<usize>,
+pub enum StreamSpec {
+ Whep {
+ track: u64,
+ seek: u64,
+ },
+ WhepControl {
+ token: String,
+ },
+ Remux {
+ track: Vec<u64>,
+ container: StreamContainer,
+ },
+ Original {
+ track: u64,
+ },
+ HlsSuperMultiVariant {
+ container: StreamContainer,
+ },
+ HlsMultiVariant {
+ segment: u64,
+ container: StreamContainer,
+ },
+ HlsVariant {
+ segment: u64,
+ track: u64,
+ container: StreamContainer,
+ format: usize,
+ },
+ Info {
+ segment: Option<u64>,
+ },
+ FragmentIndex {
+ segment: u64,
+ track: u64,
+ },
+ Fragment {
+ segment: u64,
+ track: u64,
+ index: u64,
+ container: StreamContainer,
+ format: usize,
+ },
}
-#[rustfmt::skip]
-#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Hash, Encode, Decode)]
-#[serde(rename_all = "snake_case")]
-#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))]
-pub enum StreamFormat {
- #[cfg_attr(feature = "rocket", field(value = "original"))] Original,
- #[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska,
- #[cfg_attr(feature = "rocket", field(value = "hlsmaster"))] HlsMaster,
- #[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant,
- #[cfg_attr(feature = "rocket", field(value = "jhlsi"))] JhlsIndex,
- #[cfg_attr(feature = "rocket", field(value = "frag"))] Fragment,
- #[cfg_attr(feature = "rocket", field(value = "webvtt"))] Webvtt,
- #[cfg_attr(feature = "rocket", field(value = "jvtt"))] Jvtt,
-}
-
-impl Default for StreamSpec {
- fn default() -> Self {
- Self {
- track: Vec::new(),
- format: StreamFormat::Matroska,
- webm: Some(true),
- profile: None,
- index: None,
- }
- }
-}
-
-impl StreamSpec {
- pub fn to_query(&self) -> String {
- use std::fmt::Write;
- let mut u = String::new();
- write!(u, "format={}", self.format.ident()).unwrap();
- for t in &self.track {
- write!(u, "&track={}", t).unwrap();
- }
- if let Some(profile) = self.profile {
- write!(u, "&profile={profile}").unwrap();
- }
- if let Some(index) = self.index {
- write!(u, "&index={index}").unwrap();
- }
- if let Some(webm) = self.webm {
- write!(u, "&webm={webm}").unwrap();
- }
- u
- }
-}
-
-impl StreamFormat {
- pub fn ident(&self) -> &'static str {
- match self {
- StreamFormat::Jvtt => "jvtt",
- StreamFormat::Original => "original",
- StreamFormat::Matroska => "matroska",
- StreamFormat::HlsMaster => "hlsmaster",
- StreamFormat::HlsVariant => "hlsvariant",
- StreamFormat::JhlsIndex => "jhlsi",
- StreamFormat::Fragment => "frag",
- StreamFormat::Webvtt => "webvtt",
- }
- }
+#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
+#[serde(rename_all = "lowercase")]
+pub enum StreamContainer {
+ WebM,
+ Matroska,
+ WebVTT,
+ JVTT,
}
diff --git a/common/src/user.rs b/common/src/user.rs
index ef78eca..e0e7a0d 100644
--- a/common/src/user.rs
+++ b/common/src/user.rs
@@ -3,7 +3,6 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{stream::StreamFormat, user};
use bincode::{Decode, Encode};
#[cfg(feature = "rocket")]
use rocket::{FromFormField, UriDisplayQuery};
@@ -99,7 +98,6 @@ pub enum UserPermission {
ManageSelf,
AccessNode(String),
- StreamFormat(StreamFormat),
Transcode,
FederatedContent,
}
@@ -107,33 +105,11 @@ pub enum UserPermission {
impl UserPermission {
pub const ALL_ENUMERABLE: &'static [UserPermission] = {
use UserPermission::*;
- &[
- Admin,
- Transcode,
- ManageSelf,
- FederatedContent,
- StreamFormat(user::StreamFormat::Original),
- ]
+ &[Admin, Transcode, ManageSelf, FederatedContent]
};
pub fn default_value(&self) -> bool {
- use user::StreamFormat::*;
use UserPermission::*;
- matches!(
- self,
- Transcode
- | ManageSelf
- | FederatedContent
- | StreamFormat(
- JhlsIndex
- | Jvtt
- | HlsMaster
- | HlsVariant
- | Matroska
- | Fragment
- | Webvtt
- | Original // TODO remove later
- )
- )
+ matches!(self, Transcode | ManageSelf | FederatedContent)
}
}
@@ -143,15 +119,15 @@ impl Display for UserPermission {
UserPermission::ManageSelf => "manage self (password, display name, etc.)".to_string(),
UserPermission::FederatedContent => "access to federated content".to_string(),
UserPermission::Admin => "administrative rights".to_string(),
- UserPermission::StreamFormat(StreamFormat::Original) => {
- "downloading the original media".to_string()
- }
- UserPermission::StreamFormat(StreamFormat::Matroska) => {
- "downloading a remuxed WebM/Matroska version of the media ".to_string()
- }
- UserPermission::StreamFormat(x) => {
- format!("downloading media via {x:?}")
- }
+ // UserPermission::StreamFormat(StreamFormat::Original) => {
+ // "downloading the original media".to_string()
+ // }
+ // UserPermission::StreamFormat(StreamFormat::Matroska) => {
+ // "downloading a remuxed WebM/Matroska version of the media ".to_string()
+ // }
+ // UserPermission::StreamFormat(x) => {
+ // format!("downloading media via {x:?}")
+ // }
UserPermission::Transcode => "transcoding".to_string(),
// UserPermission::ManageUsers => format!("management of all users"),
// UserPermission::GenerateInvite => format!("inviting new users"),
diff --git a/doc/api.md b/doc/api.md
index 5713df7..b24fd46 100644
--- a/doc/api.md
+++ b/doc/api.md
@@ -111,19 +111,19 @@ federation.
by the server.
- `?whepcontrol&<token>`
- WebSocket endpoint for controlling WHEP playback. TODO schema
-- `?remux&<track...>&<webm>`
-- `?hlssupermultivariant&<webm>`
+- `?remux&<track...>&<container>`
+- `?hlssupermultivariant&<container>`
- Returns m3u8/HLS playlist of all known multi-variant playlists, one for each
segment. The plylist is updated for live media.
-- `?hlsmultivariant&<segment>&<webm>`
+- `?hlsmultivariant&<segment>&<container>`
- Returns m3u8/HLS playlist of all track formats' variant playlists.
-- `?hlsvariant&<segment>&<track>&<webm>&<format>`
+- `?hlsvariant&<segment>&<track>&<container>&<format>`
- Returns m3u8/HLS playlist of all known fragments of this track format. The
playlist is updated for live media.
- `?info&<segment?>`
- Returns JSON `SegmentInfo` if a segment index is provided, else `MediaInfo`
- `?fragmentindex&<segment>&<track>`
-- `?fragment&<segment>&<track>&<index>&<webm>&<format>`
+- `?fragment&<segment>&<track>&<index>&<container>&<format>`
```ts
interface MediaInfo {
@@ -147,10 +147,13 @@ interface TrackFormat {
bandwidth: number;
remux: bool;
title?: string;
+ containers: StreamContainer[];
a_sampling_frequency?: number;
a_channels?: number;
v_resolution_width?: number;
av_bit_depth?: number;
}
+type FragmentIndex = number[];
+type StreamContainer = "webm" | "matroska" | "webvtt" | "jvtt";
```
diff --git a/stream/Cargo.toml b/stream/Cargo.toml
index 36979c9..21d1650 100644
--- a/stream/Cargo.toml
+++ b/stream/Cargo.toml
@@ -7,6 +7,8 @@ edition = "2021"
jellybase = { path = "../base", features = ["rocket"] }
jellytranscoder = { path = "../transcoder" }
jellyremuxer = { path = "../remuxer" }
+jellymatroska = { path = "../matroska" }
+ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" }
log = { workspace = true }
anyhow = { workspace = true }
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 00338c1..6f31e6b 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -9,93 +9,57 @@ pub mod hls;
pub mod jhls;
pub mod webvtt;
-use anyhow::{anyhow, bail, Context, Result};
-use fragment::fragment_stream;
-use hls::{hls_master_stream, hls_variant_stream};
-use jellybase::{
- assetfed::AssetInner,
- common::{
- stream::{StreamFormat, StreamSpec},
- user::{PermissionSet, UserPermission},
- LocalTrack, Node, TrackSource,
- },
- permission::PermissionSetExt,
- CONF,
+use anyhow::Result;
+use ebml_struct::matroska::{Info, Tracks};
+use jellybase::common::{
+ stream::{StreamContainer, StreamSpec},
+ LocalTrack, MediaInfo, Node,
};
-use jhls::jhls_index;
-use std::{io::SeekFrom, ops::Range, sync::Arc};
+use jellymatroska::block::LacingType;
+use std::{ops::Range, sync::Arc};
use tokio::{
fs::File,
- io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
+ io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
};
use tokio_util::io::SyncIoBridge;
-use webvtt::vtt_stream;
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
}
-#[rustfmt::skip]
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" };
- match spec.format {
- StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true },
- StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
- StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
- StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false },
- StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false },
- StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false },
- StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false },
+ let cons = |ct: &'static str, rs: bool| StreamHead {
+ content_type: ct,
+ range_supported: rs,
+ };
+ let container_ct = |x: StreamContainer| match x {
+ StreamContainer::WebM => "video/webm",
+ StreamContainer::Matroska => "video/x-matroska",
+ StreamContainer::WebVTT => "text/vtt",
+ StreamContainer::JVTT => "application/jellything-vtt+json",
+ };
+ match spec {
+ StreamSpec::Whep { .. } => cons("application/x-todo", false),
+ StreamSpec::WhepControl { .. } => cons("application/x-todo", false),
+ StreamSpec::Remux { container, .. } => cons(container_ct(*container), true),
+ StreamSpec::Original { .. } => cons("video/x-matroska", true),
+ StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false),
+ StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false),
+ StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false),
}
}
pub async fn stream(
- node: Arc<Node>,
+ info: Arc<MediaInfo>,
spec: StreamSpec,
range: Range<usize>,
- perms: &PermissionSet,
) -> Result<DuplexStream> {
- perms.assert(&UserPermission::StreamFormat(spec.format))?;
-
let (a, b) = duplex(4096);
- // TODO remux of mixed remote and local tracks?!
- let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?;
-
- let local_tracks = spec
- .track
- .iter()
- .map(|i| {
- anyhow::Ok(
- match &track_sources
- .tracks
- .get(*i)
- .ok_or(anyhow!("track does not exist"))?
- .source
- {
- TrackSource::Local(t) => AssetInner::deser(&t.0)?
- .as_local_track()
- .ok_or(anyhow!("asset not a track"))?,
- TrackSource::Remote(_) => bail!("track is not local"),
- },
- )
- })
- .collect::<anyhow::Result<Vec<_>>>()?
- .into_iter()
- .collect::<Vec<_>>();
-
- match spec.format {
- StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?,
- StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?,
- StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?,
- StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?,
- StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?,
- StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?,
- StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?,
- StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?,
- }
-
Ok(a)
}
@@ -108,17 +72,17 @@ async fn remux_stream(
) -> Result<()> {
let b = SyncIoBridge::new(b);
- tokio::task::spawn_blocking(move || {
- jellyremuxer::remux_stream_into(
- b,
- range,
- CONF.media_path.to_owned(),
- &node,
- local_tracks,
- spec.track,
- spec.webm.unwrap_or(false),
- )
- });
+ // tokio::task::spawn_blocking(move || {
+ // jellyremuxer::remux_stream_into(
+ // b,
+ // range,
+ // CONF.media_path.to_owned(),
+ // &node,
+ // local_tracks,
+ // spec.track,
+ // spec.webm.unwrap_or(false),
+ // )
+ // });
Ok(())
}
@@ -129,19 +93,19 @@ async fn original_stream(
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
- }
+ // if spec.track.len() != 1 {
+ // bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
+ // }
- let source = local_tracks[spec.track[0]].clone();
- let mut file = File::open(CONF.media_path.join(source.path))
- .await
- .context("opening source")?;
- file.seek(SeekFrom::Start(range.start as u64))
- .await
- .context("seek source")?;
+ // let source = local_tracks[spec.track[0]].clone();
+ // let mut file = File::open(CONF.media_path.join(source.path))
+ // .await
+ // .context("opening source")?;
+ // file.seek(SeekFrom::Start(range.start as u64))
+ // .await
+ // .context("seek source")?;
- tokio::task::spawn(copy_stream(file, b, range.end - range.start));
+ // tokio::task::spawn(copy_stream(file, b, range.end - range.start));
Ok(())
}
@@ -157,3 +121,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) ->
amount -= size;
}
}
+
+// TODO functions to test seekability, get live status and enumate segments
+trait MediaSource {
+ /// Seeks to some position close to, but before, `time` ticks.
+ fn seek(&mut self, time: u64) -> Result<()>;
+ /// Retrieve headers (info and tracks) for some segment.
+ fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>;
+ /// Returns the next block and the current segment index
+ fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>;
+}
+pub struct AbsBlock {
+ track: u64,
+ pts: u64,
+ keyframe: bool,
+ lacing: Option<LacingType>,
+ data: Vec<u8>,
+}