From 7acb520f552bd1edde5c29fbf5baf6643ec4b14e Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 6 Apr 2025 15:40:58 +0200 Subject: a bit more progress on new streaming api --- Cargo.lock | 3 +- client/src/lib.rs | 5 +- common/src/stream.rs | 47 +++++++++++++++++- import/Cargo.toml | 5 +- import/src/lib.rs | 3 +- import/src/matroska.rs | 116 -------------------------------------------- remuxer/Cargo.toml | 4 ++ remuxer/src/lib.rs | 1 + remuxer/src/metadata.rs | 116 ++++++++++++++++++++++++++++++++++++++++++++ server/src/routes/stream.rs | 25 +++++++--- stream/src/hls.rs | 2 +- stream/src/lib.rs | 113 +++++++++++++++++++++++++++++++++--------- 12 files changed, 282 insertions(+), 158 deletions(-) delete mode 100644 import/src/matroska.rs create mode 100644 remuxer/src/metadata.rs diff --git a/Cargo.lock b/Cargo.lock index 9c9b347..aabeff6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1760,10 +1760,10 @@ dependencies = [ "base64", "bincode", "crossbeam-channel", - "ebml-struct", "futures", "jellybase", "jellyclient", + "jellyremuxer", "log", "rayon", "regex", @@ -1791,6 +1791,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "ebml-struct", "jellybase", "jellymatroska", "log", diff --git a/client/src/lib.rs b/client/src/lib.rs index 1497e45..d3172fd 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -165,11 +165,10 @@ impl Session { pub fn stream_url(&self, id: NodeIDOrSlug, stream_spec: &StreamSpec) -> String { format!( - "{}/n/{}/stream?{}&{}", + "{}/n/{}/stream{}&{}", self.instance.base(), id, - todo!(), - // stream_spec.to_query(), + stream_spec.to_query(), self.session_param() ) } diff --git a/common/src/stream.rs b/common/src/stream.rs index 9a00ce0..a06dad5 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -51,6 +51,47 @@ pub enum StreamSpec { }, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamInfo { + pub name: Option, + pub segments: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamSegmentInfo { + pub name: Option, + pub duration: u64, + pub tracks: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamTrackInfo { + pub name: Option, + pub kind: TrackKind, + pub formats: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TrackKind { + Video, + Audio, + Subtitle, +} + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct StreamFormatInfo { + pub codec: String, + pub byterate: f64, + pub remux: bool, + pub containers: Vec, + + pub pixel_count: Option, + pub samplerate: Option, + pub channels: Option, + pub bit_depth: Option, +} + #[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum StreamContainer { @@ -119,7 +160,11 @@ impl StreamSpec { .ok_or("container") .and_then(|s| s.parse().map_err(|()| "unknown container")) }; - if query.contains_key("fragment") { + if query.contains_key("info") { + Ok(Self::Info { + segment: get_num("segment").ok(), + }) + } else if query.contains_key("fragment") { Ok(Self::Fragment { segment: get_num("segment")?, track: get_num("track")? as usize, diff --git a/import/Cargo.toml b/import/Cargo.toml index 645326d..37b5a77 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -6,10 +6,7 @@ edition = "2021" [dependencies] jellybase = { path = "../base" } jellyclient = { path = "../client" } - -ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [ - "bincode", -] } +jellyremuxer = { path = "../remuxer" } rayon = "1.10.0" crossbeam-channel = "0.5.14" diff --git a/import/src/lib.rs b/import/src/lib.rs index 3226a0a..d7f9dd7 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -15,8 +15,8 @@ use jellybase::{ CONF, SECRETS, }; use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility}; +use jellyremuxer::metadata::matroska_metadata; use log::info; -use matroska::matroska_metadata; use rayon::iter::{ParallelBridge, ParallelIterator}; use regex::Regex; use std::{ @@ -36,7 +36,6 @@ use tokio::{ use trakt::Trakt; pub mod infojson; -pub mod matroska; pub mod tmdb; pub mod trakt; diff --git a/import/src/matroska.rs b/import/src/matroska.rs deleted file mode 100644 index 1593463..0000000 --- a/import/src/matroska.rs +++ /dev/null @@ -1,116 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin -*/ -use anyhow::{Context, Result}; -use bincode::{Decode, Encode}; -use ebml_struct::{ - ids::*, - matroska::*, - read::{EbmlReadExt, TagRead}, -}; -use jellybase::{ - assetfed::AssetInner, - cache::{cache_file, cache_memory}, - common::Asset, -}; -use log::{info, warn}; -use std::{ - fs::File, - io::{BufReader, ErrorKind, Read, Write}, - path::Path, - sync::Arc, -}; - -#[derive(Encode, Decode, Clone)] -pub(crate) struct MatroskaMetadata { - pub info: Option, - pub tracks: Option, - pub cover: Option, - pub chapters: Option, - pub tags: Option, - pub infojson: Option>, -} -pub(crate) fn matroska_metadata(path: &Path) -> Result>> { - cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || { - let mut magic = [0; 4]; - File::open(path)?.read_exact(&mut magic).ok(); - if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { - return Ok(None); - } - - info!("reading {path:?}"); - let mut file = BufReader::new(File::open(path)?); - let mut file = file.by_ref().take(u64::MAX); - - let (x, mut ebml) = file.read_tag()?; - assert_eq!(x, EL_EBML); - let ebml = Ebml::read(&mut ebml).unwrap(); - assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); - let (x, mut segment) = file.read_tag()?; - assert_eq!(x, EL_SEGMENT); - - let mut info = None; - let mut infojson = None; - let mut tracks = None; - let mut cover = None; - let mut chapters = None; - let mut tags = None; - loop { - let (x, mut seg) = match segment.read_tag() { - Ok(o) => o, - Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, - Err(e) => return Err(e.into()), - }; - match x { - EL_INFO => info = Some(Info::read(&mut seg).context("info")?), - EL_TRACKS => tracks = Some(Tracks::read(&mut seg).context("tracks")?), - EL_CHAPTERS => chapters = Some(Chapters::read(&mut seg).context("chapters")?), - EL_TAGS => tags = Some(Tags::read(&mut seg).context("tags")?), - EL_ATTACHMENTS => { - let attachments = Attachments::read(&mut seg).context("attachments")?; - for f in attachments.files { - match f.name.as_str() { - "info.json" => { - infojson = Some(f.data); - } - "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" - | "cover.avif" => { - cover = Some( - AssetInner::Cache(cache_file( - &["att-cover", path.to_string_lossy().as_ref()], - move |mut file| { - file.write_all(&f.data)?; - Ok(()) - }, - )?) - .ser(), - ) - } - _ => (), - } - } - } - EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { - seg.consume()?; - } - EL_CLUSTER => { - break; - } - id => { - warn!("unknown top-level element {id:x}"); - seg.consume()?; - } - } - } - Ok(Some(MatroskaMetadata { - chapters, - cover, - info, - infojson, - tags, - tracks, - })) - }) -} diff --git a/remuxer/Cargo.toml b/remuxer/Cargo.toml index 2313dcc..16713df 100644 --- a/remuxer/Cargo.toml +++ b/remuxer/Cargo.toml @@ -13,3 +13,7 @@ log = { workspace = true } serde = { version = "1.0.217", features = ["derive"] } bincode = { version = "2.0.0-rc.3", features = ["serde"] } + +ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct", features = [ + "bincode", +] } diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index a98ffad..cc4b39b 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -9,6 +9,7 @@ pub mod remux; pub mod seek_index; pub mod segment_extractor; pub mod trim_writer; +pub mod metadata; pub use fragment::write_fragment_into; pub use remux::remux_stream_into; diff --git a/remuxer/src/metadata.rs b/remuxer/src/metadata.rs new file mode 100644 index 0000000..4ddad20 --- /dev/null +++ b/remuxer/src/metadata.rs @@ -0,0 +1,116 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +use anyhow::{Context, Result}; +use bincode::{Decode, Encode}; +use ebml_struct::{ + ids::*, + matroska::*, + read::{EbmlReadExt, TagRead}, +}; +use jellybase::{ + assetfed::AssetInner, + cache::{cache_file, cache_memory}, + common::Asset, +}; +use log::{info, warn}; +use std::{ + fs::File, + io::{BufReader, ErrorKind, Read, Write}, + path::Path, + sync::Arc, +}; + +#[derive(Debug, Encode, Decode, Clone)] +pub struct MatroskaMetadata { + pub info: Option, + pub tracks: Option, + pub cover: Option, + pub chapters: Option, + pub tags: Option, + pub infojson: Option>, +} +pub fn matroska_metadata(path: &Path) -> Result>> { + cache_memory(&["mkmeta-v2", path.to_string_lossy().as_ref()], || { + let mut magic = [0; 4]; + File::open(path)?.read_exact(&mut magic).ok(); + if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { + return Ok(None); + } + + info!("reading {path:?}"); + let mut file = BufReader::new(File::open(path)?); + let mut file = file.by_ref().take(u64::MAX); + + let (x, mut ebml) = file.read_tag()?; + assert_eq!(x, EL_EBML); + let ebml = Ebml::read(&mut ebml).unwrap(); + assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); + let (x, mut segment) = file.read_tag()?; + assert_eq!(x, EL_SEGMENT); + + let mut info = None; + let mut infojson = None; + let mut tracks = None; + let mut cover = None; + let mut chapters = None; + let mut tags = None; + loop { + let (x, mut seg) = match segment.read_tag() { + Ok(o) => o, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + }; + match x { + EL_INFO => info = Some(Info::read(&mut seg).context("info")?), + EL_TRACKS => tracks = Some(Tracks::read(&mut seg).context("tracks")?), + EL_CHAPTERS => chapters = Some(Chapters::read(&mut seg).context("chapters")?), + EL_TAGS => tags = Some(Tags::read(&mut seg).context("tags")?), + EL_ATTACHMENTS => { + let attachments = Attachments::read(&mut seg).context("attachments")?; + for f in attachments.files { + match f.name.as_str() { + "info.json" => { + infojson = Some(f.data); + } + "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" + | "cover.avif" => { + cover = Some( + AssetInner::Cache(cache_file( + &["att-cover", path.to_string_lossy().as_ref()], + move |mut file| { + file.write_all(&f.data)?; + Ok(()) + }, + )?) + .ser(), + ) + } + _ => (), + } + } + } + EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { + seg.consume()?; + } + EL_CLUSTER => { + break; + } + id => { + warn!("unknown top-level element {id:x}"); + seg.consume()?; + } + } + } + Ok(Some(MatroskaMetadata { + chapters, + cover, + info, + infojson, + tags, + tracks, + })) + }) +} diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 4b3d02e..8f97aec 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -6,13 +6,9 @@ use super::ui::{account::session::Session, error::MyError}; use crate::database::Database; use anyhow::{anyhow, Result}; -use jellybase::{federation::Federation, permission::PermissionSetExt, SECRETS}; -use jellycommon::{ - config::FederationAccount, - stream::StreamSpec, - user::{CreateSessionParams, UserPermission}, - TrackSource, -}; +use jellybase::{assetfed::AssetInner, federation::Federation}; +use jellycommon::{stream::StreamSpec, TrackSource}; +use jellystream::SMediaInfo; use log::{info, warn}; use rocket::{ get, head, @@ -22,7 +18,7 @@ use rocket::{ Either, Request, Response, State, }; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, ops::Range, sync::Arc, }; @@ -131,6 +127,19 @@ pub async fn r_stream( let head = jellystream::stream_head(&spec); + let mut sources = BTreeSet::new(); + for t in &media.tracks { + if let TrackSource::Local(x) = &t.source { + if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { + sources.insert(m.path); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + info: node, + }); + match jellystream::stream(media, spec, urange).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 56edd2d..27630b2 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -34,8 +34,8 @@ pub async fn hls_master_stream( let uri = format!( "stream?{}", StreamSpec::HlsVariant { - track: i, segment, + track: i, container, format: 0 } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 1f32239..751ecfa 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -12,17 +12,28 @@ pub mod webvtt; use anyhow::Result; use ebml_struct::matroska::{Info, Tracks}; use jellybase::common::{ - stream::{StreamContainer, StreamSpec}, - LocalTrack, MediaInfo, Node, + stream::{ + StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, + StreamTrackInfo, TrackKind, + }, + LocalTrack, Node, }; use jellymatroska::block::LacingType; -use std::{ops::Range, sync::Arc}; +use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; +use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc}; use tokio::{ fs::File, io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, + task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; +#[derive(Debug)] +pub struct SMediaInfo { + pub info: Arc, + pub files: BTreeSet, +} + pub struct StreamHead { pub content_type: &'static str, pub range_supported: bool, @@ -54,7 +65,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead { } pub async fn stream( - info: Arc, + info: Arc, spec: StreamSpec, range: Range, ) -> Result { @@ -73,7 +84,7 @@ pub async fn stream( container, format, } => todo!(), - StreamSpec::Info { segment } => todo!(), + StreamSpec::Info { segment } => write_stream_info(info, b).await?, StreamSpec::FragmentIndex { segment, track } => todo!(), StreamSpec::Fragment { segment, @@ -87,6 +98,64 @@ pub async fn stream( Ok(a) } +async fn async_matroska_metadata(path: PathBuf) -> Result>> { + Ok(spawn_blocking(move || matroska_metadata(&path)).await??) +} + +async fn stream_info(info: Arc) -> Result { + let mut metadata = Vec::new(); + for path in &info.files { + metadata.extend((*async_matroska_metadata(path.clone()).await?).clone()); + } + + let mut tracks = Vec::new(); + + for m in &metadata { + if let Some(t) = &m.tracks { + for t in &t.entries { + let mut formats = Vec::new(); + formats.push(StreamFormatInfo { + codec: t.codec_id.to_string(), + remux: true, + byterate: 10., // TODO + containers: [StreamContainer::Matroska].to_vec(), + bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)), + samplerate: t.audio.as_ref().map(|a| a.sampling_frequency), + channels: t.audio.as_ref().map(|a| a.channels as usize), + pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height), + ..Default::default() + }); + tracks.push(StreamTrackInfo { + name: None, + kind: match t.track_type { + 1 => TrackKind::Video, + 2 => TrackKind::Audio, + 17 => TrackKind::Subtitle, + _ => todo!(), + }, + formats, + }) + } + } + } + + let segment = StreamSegmentInfo { + name: None, + duration: 0, + tracks, + }; + Ok(StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }) +} + +async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { + let info = stream_info(info).await?; + b.write_all(&serde_json::to_vec(&info)?).await?; + Ok(()) +} + async fn remux_stream( node: Arc, local_tracks: Vec, @@ -146,20 +215,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> } } -// TODO functions to test seekability, get live status and enumate segments -trait MediaSource { - fn loaded_ranges(&self) -> Result>>; - /// Seeks to some position close to, but before, `time` ticks. - fn seek(&mut self, segment: u64, time: u64) -> Result<()>; - /// Retrieve headers (info and tracks) for some segment. - fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; - /// Returns the next block and the current segment index - fn next(&mut self) -> Result>; -} -pub struct AbsBlock { - track: u64, - pts: u64, - keyframe: bool, - lacing: Option, - data: Vec, -} +// // TODO functions to test seekability, get live status and enumate segments +// trait MediaSource { +// fn loaded_range(&self) -> Result>; +// /// Seeks to some position close to, but before, `time` ticks. +// fn seek(&mut self, segment: u64, time: u64) -> Result<()>; +// /// Retrieve headers (info and tracks) for some segment. +// fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>; +// /// Returns the next block and the current segment index +// fn next(&mut self) -> Result>; +// } +// pub struct AbsBlock { +// track: u64, +// pts: u64, +// keyframe: bool, +// lacing: Option, +// data: Vec, +// } -- cgit v1.2.3-70-g09d2