diff options
-rw-r--r-- | Cargo.lock | 3 | ||||
-rw-r--r-- | client/src/lib.rs | 5 | ||||
-rw-r--r-- | common/src/stream.rs | 47 | ||||
-rw-r--r-- | import/Cargo.toml | 5 | ||||
-rw-r--r-- | import/src/lib.rs | 3 | ||||
-rw-r--r-- | remuxer/Cargo.toml | 4 | ||||
-rw-r--r-- | remuxer/src/lib.rs | 1 | ||||
-rw-r--r-- | remuxer/src/metadata.rs (renamed from import/src/matroska.rs) | 6 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 25 | ||||
-rw-r--r-- | stream/src/hls.rs | 2 | ||||
-rw-r--r-- | stream/src/lib.rs | 113 |
11 files changed, 169 insertions, 45 deletions
@@ -1760,10 +1760,10 @@ dependencies = [ "base64", "bincode", "crossbeam-channel", - "ebml-struct", "futures", "jellybase", "jellyclient", + "jellyremuxer", "log", "rayon", "regex", @@ -1791,6 +1791,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "ebml-struct", "jellybase", "jellymatroska", "log", diff --git a/client/src/lib.rs b/client/src/lib.rs index 1497e45..d3172fd 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -165,11 +165,10 @@ impl Session { pub fn stream_url(&self, id: NodeIDOrSlug, stream_spec: &StreamSpec) -> String { format!( - "{}/n/{}/stream?{}&{}", + "{}/n/{}/stream{}&{}", self.instance.base(), id, - todo!(), - // stream_spec.to_query(), + stream_spec.to_query(), self.session_param() ) } diff --git a/common/src/stream.rs b/common/src/stream.rs index 9a00ce0..a06dad5 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -51,6 +51,47 @@ pub enum StreamSpec { }, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamInfo { + pub name: Option<String>, + pub segments: Vec<StreamSegmentInfo>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamSegmentInfo { + pub name: Option<String>, + pub duration: u64, + pub tracks: Vec<StreamTrackInfo>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamTrackInfo { + pub name: Option<String>, + pub kind: TrackKind, + pub formats: Vec<StreamFormatInfo>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TrackKind { + Video, + Audio, + Subtitle, +} + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct StreamFormatInfo { + pub codec: String, + pub byterate: f64, + pub remux: bool, + pub containers: Vec<StreamContainer>, + + pub pixel_count: Option<u64>, + pub samplerate: Option<f64>, + pub channels: Option<usize>, + pub bit_depth: Option<u8>, +} + #[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum StreamContainer { @@ -119,7 +160,11 @@ impl StreamSpec { .ok_or("container") .and_then(|s| s.parse().map_err(|()| "unknown container")) }; - if query.contains_key("fragment") { + if query.contains_key("info") { + Ok(Self::Info { + segment: get_num("segment").ok(), + }) + } else if query.contains_key("fragment") { Ok(Self::Fragment { segment: get_num("segment")?, track: get_num("track")? as usize, diff --git a/import/Cargo.toml b/import/Cargo.toml index 645326d..37b5a77 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -6,10 +6,7 @@ edition = "2021" [dependencies] jellybase = { path = "../base" } jellyclient = { path = "../client" } - -ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [ - "bincode", -] } +jellyremuxer = { path = "../remuxer" } rayon = "1.10.0" crossbeam-channel = "0.5.14" diff --git a/import/src/lib.rs b/import/src/lib.rs index 3226a0a..d7f9dd7 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -15,8 +15,8 @@ use jellybase::{ CONF, SECRETS, }; use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility}; +use jellyremuxer::metadata::matroska_metadata; use log::info; -use matroska::matroska_metadata; use rayon::iter::{ParallelBridge, ParallelIterator}; use regex::Regex; use std::{ @@ -36,7 +36,6 @@ use tokio::{ use trakt::Trakt; pub mod infojson; -pub mod matroska; pub mod tmdb; pub mod trakt; diff --git a/remuxer/Cargo.toml b/remuxer/Cargo.toml index 2313dcc..16713df 100644 --- a/remuxer/Cargo.toml +++ b/remuxer/Cargo.toml @@ -13,3 +13,7 @@ log = { workspace = true } serde = { version = "1.0.217", features = ["derive"] } bincode = { version = "2.0.0-rc.3", features = ["serde"] } + +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [ + "bincode", +] } diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index a98ffad..cc4b39b 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -9,6 +9,7 @@ pub mod remux; pub mod seek_index; pub mod segment_extractor; pub mod trim_writer; +pub mod metadata; pub use fragment::write_fragment_into; pub use remux::remux_stream_into; diff --git a/import/src/matroska.rs b/remuxer/src/metadata.rs index 1593463..4ddad20 100644 --- a/import/src/matroska.rs +++ b/remuxer/src/metadata.rs @@ -23,8 +23,8 @@ use std::{ sync::Arc, }; -#[derive(Encode, Decode, Clone)] -pub(crate) struct MatroskaMetadata { +#[derive(Debug, Encode, Decode, Clone)] +pub struct MatroskaMetadata { pub info: Option<Info>, pub tracks: Option<Tracks>, pub cover: Option<Asset>, @@ -32,7 +32,7 @@ pub(crate) struct MatroskaMetadata { pub tags: Option<Tags>, pub infojson: Option<Vec<u8>>, } -pub(crate) fn matroska_metadata(path: &Path) -> Result<Arc<Option<MatroskaMetadata>>> { +pub fn matroska_metadata(path: &Path) -> Result<Arc<Option<MatroskaMetadata>>> { cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || { let mut magic = [0; 4]; File::open(path)?.read_exact(&mut magic).ok(); diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 4b3d02e..8f97aec 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -6,13 +6,9 @@ use super::ui::{account::session::Session, error::MyError}; use crate::database::Database; use anyhow::{anyhow, Result}; -use jellybase::{federation::Federation, permission::PermissionSetExt, SECRETS}; -use jellycommon::{ - config::FederationAccount, - stream::StreamSpec, - user::{CreateSessionParams, UserPermission}, - TrackSource, -}; +use jellybase::{assetfed::AssetInner, federation::Federation}; +use jellycommon::{stream::StreamSpec, TrackSource}; +use jellystream::SMediaInfo; use log::{info, warn}; use rocket::{ get, head, @@ -22,7 +18,7 @@ use rocket::{ Either, Request, Response, State, }; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, ops::Range, sync::Arc, }; @@ -131,6 +127,19 @@ pub async fn r_stream( let head = jellystream::stream_head(&spec); + let mut sources = BTreeSet::new(); + for t in &media.tracks { + if let TrackSource::Local(x) = &t.source { + if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { + sources.insert(m.path); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + info: node, + }); + match jellystream::stream(media, spec, urange).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 56edd2d..27630b2 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -34,8 +34,8 @@ pub async fn hls_master_stream( let uri = format!( "stream?{}", StreamSpec::HlsVariant { - track: i, segment, + track: i, container, format: 0 } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 1f32239..751ecfa 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -12,17 +12,28 @@ pub mod webvtt; use anyhow::Result; use ebml_struct::matroska::{Info, Tracks}; use jellybase::common::{ - stream::{StreamContainer, StreamSpec}, - LocalTrack, MediaInfo, Node, + stream::{ + StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, + StreamTrackInfo, TrackKind, + }, + LocalTrack, Node, }; use jellymatroska::block::LacingType; -use std::{ops::Range, sync::Arc}; +use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; +use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, + task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; +#[derive(Debug)] +pub struct SMediaInfo { + pub info: Arc<Node>, + pub files: BTreeSet<PathBuf>, +} + pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, @@ -54,7 +65,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead { } pub async fn stream( - info: Arc<MediaInfo>, + info: Arc<SMediaInfo>, spec: StreamSpec, range: Range<usize>, ) -> Result<DuplexStream> { @@ -73,7 +84,7 @@ pub async fn stream( container, format, } => todo!(), - StreamSpec::Info { segment } => todo!(), + StreamSpec::Info { segment } => write_stream_info(info, b).await?, StreamSpec::FragmentIndex { segment, track } => todo!(), StreamSpec::Fragment { segment, @@ -87,6 +98,64 @@ pub async fn stream( Ok(a) } +async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<Option<MatroskaMetadata>>> { + Ok(spawn_blocking(move || matroska_metadata(&path)).await??) +} + +async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> { + let mut metadata = Vec::new(); + for path in &info.files { + metadata.extend((*async_matroska_metadata(path.clone()).await?).clone()); + } + + let mut tracks = Vec::new(); + + for m in &metadata { + if let Some(t) = &m.tracks { + for t in &t.entries { + let mut formats = Vec::new(); + formats.push(StreamFormatInfo { + codec: t.codec_id.to_string(), + remux: true, + byterate: 10., // TODO + containers: [StreamContainer::Matroska].to_vec(), + bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), + samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), + channels: t.audio.as_ref().map(|a| a.channels as usize), + pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height), + ..Default::default() + }); + tracks.push(StreamTrackInfo { + name: None, + kind: match t.track_type { + 1 => TrackKind::Video, + 2 => TrackKind::Audio, + 17 => TrackKind::Subtitle, + _ => todo!(), + }, + formats, + }) + } + } + } + + let segment = StreamSegmentInfo { + name: None, + duration: 0, + tracks, + }; + Ok(StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }) +} + +async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> { + let info = stream_info(info).await?; + b.write_all(&serde_json::to_vec(&info)?).await?; + Ok(()) +} + async fn remux_stream( node: Arc<Node>, local_tracks: Vec<LocalTrack>, @@ -146,20 +215,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> } } -// TODO functions to test seekability, get live status and enumate segments -trait MediaSource { - fn loaded_ranges(&self) -> Result<Vec<Range<(u64, u64)>>>; - /// Seeks to some position close to, but before, `time` ticks. - fn seek(&mut self, segment: u64, time: u64) -> Result<()>; - /// Retrieve headers (info and tracks) for some segment. - fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; - /// Returns the next block and the current segment index - fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>; -} -pub struct AbsBlock { - track: u64, - pts: u64, - keyframe: bool, - lacing: Option<LacingType>, - data: Vec<u8>, -} +// // TODO functions to test seekability, get live status and enumate segments +// trait MediaSource { +// fn loaded_range(&self) -> Result<Range<(u64, u64)>>; +// /// Seeks to some position close to, but before, `time` ticks. +// fn seek(&mut self, segment: u64, time: u64) -> Result<()>; +// /// Retrieve headers (info and tracks) for some segment. +// fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; +// /// Returns the next block and the current segment index +// fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>; +// } +// pub struct AbsBlock { +// track: u64, +// pts: u64, +// keyframe: bool, +// lacing: Option<LacingType>, +// data: Vec<u8>, +// } |