aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-06 15:40:58 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-06 15:40:58 +0200
commit7acb520f552bd1edde5c29fbf5baf6643ec4b14e (patch)
tree222fa1d582d039b00da50735b62573db8bdc1f9d
parent80343d02e9e29e4bc55d790b491ce0d0c7bff201 (diff)
downloadjellything-7acb520f552bd1edde5c29fbf5baf6643ec4b14e.tar
jellything-7acb520f552bd1edde5c29fbf5baf6643ec4b14e.tar.bz2
jellything-7acb520f552bd1edde5c29fbf5baf6643ec4b14e.tar.zst
a bit more progress on new streaming api
-rw-r--r--Cargo.lock3
-rw-r--r--client/src/lib.rs5
-rw-r--r--common/src/stream.rs47
-rw-r--r--import/Cargo.toml5
-rw-r--r--import/src/lib.rs3
-rw-r--r--remuxer/Cargo.toml4
-rw-r--r--remuxer/src/lib.rs1
-rw-r--r--remuxer/src/metadata.rs (renamed from import/src/matroska.rs)6
-rw-r--r--server/src/routes/stream.rs25
-rw-r--r--stream/src/hls.rs2
-rw-r--r--stream/src/lib.rs113
11 files changed, 169 insertions, 45 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9c9b347..aabeff6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1760,10 +1760,10 @@ dependencies = [
"base64",
"bincode",
"crossbeam-channel",
- "ebml-struct",
"futures",
"jellybase",
"jellyclient",
+ "jellyremuxer",
"log",
"rayon",
"regex",
@@ -1791,6 +1791,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
+ "ebml-struct",
"jellybase",
"jellymatroska",
"log",
diff --git a/client/src/lib.rs b/client/src/lib.rs
index 1497e45..d3172fd 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -165,11 +165,10 @@ impl Session {
pub fn stream_url(&self, id: NodeIDOrSlug, stream_spec: &StreamSpec) -> String {
format!(
- "{}/n/{}/stream?{}&{}",
+ "{}/n/{}/stream{}&{}",
self.instance.base(),
id,
- todo!(),
- // stream_spec.to_query(),
+ stream_spec.to_query(),
self.session_param()
)
}
diff --git a/common/src/stream.rs b/common/src/stream.rs
index 9a00ce0..a06dad5 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -51,6 +51,47 @@ pub enum StreamSpec {
},
}
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct StreamInfo {
+ pub name: Option<String>,
+ pub segments: Vec<StreamSegmentInfo>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct StreamSegmentInfo {
+ pub name: Option<String>,
+ pub duration: u64,
+ pub tracks: Vec<StreamTrackInfo>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct StreamTrackInfo {
+ pub name: Option<String>,
+ pub kind: TrackKind,
+ pub formats: Vec<StreamFormatInfo>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum TrackKind {
+ Video,
+ Audio,
+ Subtitle,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub struct StreamFormatInfo {
+ pub codec: String,
+ pub byterate: f64,
+ pub remux: bool,
+ pub containers: Vec<StreamContainer>,
+
+ pub pixel_count: Option<u64>,
+ pub samplerate: Option<f64>,
+ pub channels: Option<usize>,
+ pub bit_depth: Option<u8>,
+}
+
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum StreamContainer {
@@ -119,7 +160,11 @@ impl StreamSpec {
.ok_or("container")
.and_then(|s| s.parse().map_err(|()| "unknown container"))
};
- if query.contains_key("fragment") {
+ if query.contains_key("info") {
+ Ok(Self::Info {
+ segment: get_num("segment").ok(),
+ })
+ } else if query.contains_key("fragment") {
Ok(Self::Fragment {
segment: get_num("segment")?,
track: get_num("track")? as usize,
diff --git a/import/Cargo.toml b/import/Cargo.toml
index 645326d..37b5a77 100644
--- a/import/Cargo.toml
+++ b/import/Cargo.toml
@@ -6,10 +6,7 @@ edition = "2021"
[dependencies]
jellybase = { path = "../base" }
jellyclient = { path = "../client" }
-
-ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [
- "bincode",
-] }
+jellyremuxer = { path = "../remuxer" }
rayon = "1.10.0"
crossbeam-channel = "0.5.14"
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 3226a0a..d7f9dd7 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -15,8 +15,8 @@ use jellybase::{
CONF, SECRETS,
};
use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility};
+use jellyremuxer::metadata::matroska_metadata;
use log::info;
-use matroska::matroska_metadata;
use rayon::iter::{ParallelBridge, ParallelIterator};
use regex::Regex;
use std::{
@@ -36,7 +36,6 @@ use tokio::{
use trakt::Trakt;
pub mod infojson;
-pub mod matroska;
pub mod tmdb;
pub mod trakt;
diff --git a/remuxer/Cargo.toml b/remuxer/Cargo.toml
index 2313dcc..16713df 100644
--- a/remuxer/Cargo.toml
+++ b/remuxer/Cargo.toml
@@ -13,3 +13,7 @@ log = { workspace = true }
serde = { version = "1.0.217", features = ["derive"] }
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
+
+ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [
+ "bincode",
+] }
diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs
index a98ffad..cc4b39b 100644
--- a/remuxer/src/lib.rs
+++ b/remuxer/src/lib.rs
@@ -9,6 +9,7 @@ pub mod remux;
pub mod seek_index;
pub mod segment_extractor;
pub mod trim_writer;
+pub mod metadata;
pub use fragment::write_fragment_into;
pub use remux::remux_stream_into;
diff --git a/import/src/matroska.rs b/remuxer/src/metadata.rs
index 1593463..4ddad20 100644
--- a/import/src/matroska.rs
+++ b/remuxer/src/metadata.rs
@@ -23,8 +23,8 @@ use std::{
sync::Arc,
};
-#[derive(Encode, Decode, Clone)]
-pub(crate) struct MatroskaMetadata {
+#[derive(Debug, Encode, Decode, Clone)]
+pub struct MatroskaMetadata {
pub info: Option<Info>,
pub tracks: Option<Tracks>,
pub cover: Option<Asset>,
@@ -32,7 +32,7 @@ pub(crate) struct MatroskaMetadata {
pub tags: Option<Tags>,
pub infojson: Option<Vec<u8>>,
}
-pub(crate) fn matroska_metadata(path: &Path) -> Result<Arc<Option<MatroskaMetadata>>> {
+pub fn matroska_metadata(path: &Path) -> Result<Arc<Option<MatroskaMetadata>>> {
cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || {
let mut magic = [0; 4];
File::open(path)?.read_exact(&mut magic).ok();
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index 4b3d02e..8f97aec 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -6,13 +6,9 @@
use super::ui::{account::session::Session, error::MyError};
use crate::database::Database;
use anyhow::{anyhow, Result};
-use jellybase::{federation::Federation, permission::PermissionSetExt, SECRETS};
-use jellycommon::{
- config::FederationAccount,
- stream::StreamSpec,
- user::{CreateSessionParams, UserPermission},
- TrackSource,
-};
+use jellybase::{assetfed::AssetInner, federation::Federation};
+use jellycommon::{stream::StreamSpec, TrackSource};
+use jellystream::SMediaInfo;
use log::{info, warn};
use rocket::{
get, head,
@@ -22,7 +18,7 @@ use rocket::{
Either, Request, Response, State,
};
use std::{
- collections::{BTreeMap, HashSet},
+ collections::{BTreeMap, BTreeSet},
ops::Range,
sync::Arc,
};
@@ -131,6 +127,19 @@ pub async fn r_stream(
let head = jellystream::stream_head(&spec);
+ let mut sources = BTreeSet::new();
+ for t in &media.tracks {
+ if let TrackSource::Local(x) = &t.source {
+ if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? {
+ sources.insert(m.path);
+ }
+ }
+ }
+ let media = Arc::new(SMediaInfo {
+ files: sources,
+ info: node,
+ });
+
match jellystream::stream(media, spec, urange).await {
Ok(stream) => Ok(Either::Left(StreamResponse {
stream,
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 56edd2d..27630b2 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -34,8 +34,8 @@ pub async fn hls_master_stream(
let uri = format!(
"stream?{}",
StreamSpec::HlsVariant {
- track: i,
segment,
+ track: i,
container,
format: 0
}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 1f32239..751ecfa 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -12,17 +12,28 @@ pub mod webvtt;
use anyhow::Result;
use ebml_struct::matroska::{Info, Tracks};
use jellybase::common::{
- stream::{StreamContainer, StreamSpec},
- LocalTrack, MediaInfo, Node,
+ stream::{
+ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec,
+ StreamTrackInfo, TrackKind,
+ },
+ LocalTrack, Node,
};
use jellymatroska::block::LacingType;
-use std::{ops::Range, sync::Arc};
+use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata};
+use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc};
use tokio::{
fs::File,
io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
+ task::spawn_blocking,
};
use tokio_util::io::SyncIoBridge;
+#[derive(Debug)]
+pub struct SMediaInfo {
+ pub info: Arc<Node>,
+ pub files: BTreeSet<PathBuf>,
+}
+
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
@@ -54,7 +65,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead {
}
pub async fn stream(
- info: Arc<MediaInfo>,
+ info: Arc<SMediaInfo>,
spec: StreamSpec,
range: Range<usize>,
) -> Result<DuplexStream> {
@@ -73,7 +84,7 @@ pub async fn stream(
container,
format,
} => todo!(),
- StreamSpec::Info { segment } => todo!(),
+ StreamSpec::Info { segment } => write_stream_info(info, b).await?,
StreamSpec::FragmentIndex { segment, track } => todo!(),
StreamSpec::Fragment {
segment,
@@ -87,6 +98,64 @@ pub async fn stream(
Ok(a)
}
+async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<Option<MatroskaMetadata>>> {
+ Ok(spawn_blocking(move || matroska_metadata(&path)).await??)
+}
+
+async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> {
+ let mut metadata = Vec::new();
+ for path in &info.files {
+ metadata.extend((*async_matroska_metadata(path.clone()).await?).clone());
+ }
+
+ let mut tracks = Vec::new();
+
+ for m in &metadata {
+ if let Some(t) = &m.tracks {
+ for t in &t.entries {
+ let mut formats = Vec::new();
+ formats.push(StreamFormatInfo {
+ codec: t.codec_id.to_string(),
+ remux: true,
+ byterate: 10., // TODO
+ containers: [StreamContainer::Matroska].to_vec(),
+ bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)),
+ samplerate: t.audio.as_ref().map(|a| a.sampling_frequency),
+ channels: t.audio.as_ref().map(|a| a.channels as usize),
+ pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height),
+ ..Default::default()
+ });
+ tracks.push(StreamTrackInfo {
+ name: None,
+ kind: match t.track_type {
+ 1 => TrackKind::Video,
+ 2 => TrackKind::Audio,
+ 17 => TrackKind::Subtitle,
+ _ => todo!(),
+ },
+ formats,
+ })
+ }
+ }
+ }
+
+ let segment = StreamSegmentInfo {
+ name: None,
+ duration: 0,
+ tracks,
+ };
+ Ok(StreamInfo {
+ name: info.info.title.clone(),
+ segments: vec![segment],
+ })
+}
+
+async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> {
+ let info = stream_info(info).await?;
+ b.write_all(&serde_json::to_vec(&info)?).await?;
+ Ok(())
+}
+
async fn remux_stream(
node: Arc<Node>,
local_tracks: Vec<LocalTrack>,
@@ -146,20 +215,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) ->
}
}
-// TODO functions to test seekability, get live status and enumate segments
-trait MediaSource {
- fn loaded_ranges(&self) -> Result<Vec<Range<(u64, u64)>>>;
- /// Seeks to some position close to, but before, `time` ticks.
- fn seek(&mut self, segment: u64, time: u64) -> Result<()>;
- /// Retrieve headers (info and tracks) for some segment.
- fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>;
- /// Returns the next block and the current segment index
- fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>;
-}
-pub struct AbsBlock {
- track: u64,
- pts: u64,
- keyframe: bool,
- lacing: Option<LacingType>,
- data: Vec<u8>,
-}
+// // TODO functions to test seekability, get live status and enumate segments
+// trait MediaSource {
+// fn loaded_range(&self) -> Result<Range<(u64, u64)>>;
+// /// Seeks to some position close to, but before, `time` ticks.
+// fn seek(&mut self, segment: u64, time: u64) -> Result<()>;
+// /// Retrieve headers (info and tracks) for some segment.
+// fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>;
+// /// Returns the next block and the current segment index
+// fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>;
+// }
+// pub struct AbsBlock {
+// track: u64,
+// pts: u64,
+// keyframe: bool,
+// lacing: Option<LacingType>,
+// data: Vec<u8>,
+// }