aboutsummaryrefslogtreecommitdiff
path: root/stream
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-06 15:40:58 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-06 15:40:58 +0200
commit7acb520f552bd1edde5c29fbf5baf6643ec4b14e (patch)
tree222fa1d582d039b00da50735b62573db8bdc1f9d /stream
parent80343d02e9e29e4bc55d790b491ce0d0c7bff201 (diff)
downloadjellything-7acb520f552bd1edde5c29fbf5baf6643ec4b14e.tar
jellything-7acb520f552bd1edde5c29fbf5baf6643ec4b14e.tar.bz2
jellything-7acb520f552bd1edde5c29fbf5baf6643ec4b14e.tar.zst
a bit more progress on new streaming api
Diffstat (limited to 'stream')
-rw-r--r--stream/src/hls.rs2
-rw-r--r--stream/src/lib.rs113
2 files changed, 92 insertions, 23 deletions
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 56edd2d..27630b2 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -34,8 +34,8 @@ pub async fn hls_master_stream(
let uri = format!(
"stream?{}",
StreamSpec::HlsVariant {
- track: i,
segment,
+ track: i,
container,
format: 0
}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 1f32239..751ecfa 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -12,17 +12,28 @@ pub mod webvtt;
use anyhow::Result;
use ebml_struct::matroska::{Info, Tracks};
use jellybase::common::{
- stream::{StreamContainer, StreamSpec},
- LocalTrack, MediaInfo, Node,
+ stream::{
+ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec,
+ StreamTrackInfo, TrackKind,
+ },
+ LocalTrack, Node,
};
use jellymatroska::block::LacingType;
-use std::{ops::Range, sync::Arc};
+use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata};
+use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc};
use tokio::{
fs::File,
io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
+ task::spawn_blocking,
};
use tokio_util::io::SyncIoBridge;
+#[derive(Debug)]
+pub struct SMediaInfo {
+ pub info: Arc<Node>,
+ pub files: BTreeSet<PathBuf>,
+}
+
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
@@ -54,7 +65,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead {
}
pub async fn stream(
- info: Arc<MediaInfo>,
+ info: Arc<SMediaInfo>,
spec: StreamSpec,
range: Range<usize>,
) -> Result<DuplexStream> {
@@ -73,7 +84,7 @@ pub async fn stream(
container,
format,
} => todo!(),
- StreamSpec::Info { segment } => todo!(),
+ StreamSpec::Info { segment } => write_stream_info(info, b).await?,
StreamSpec::FragmentIndex { segment, track } => todo!(),
StreamSpec::Fragment {
segment,
@@ -87,6 +98,64 @@ pub async fn stream(
Ok(a)
}
+async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<Option<MatroskaMetadata>>> {
+ Ok(spawn_blocking(move || matroska_metadata(&path)).await??)
+}
+
+async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> {
+ let mut metadata = Vec::new();
+ for path in &info.files {
+ metadata.extend((*async_matroska_metadata(path.clone()).await?).clone());
+ }
+
+ let mut tracks = Vec::new();
+
+ for m in &metadata {
+ if let Some(t) = &m.tracks {
+ for t in &t.entries {
+ let mut formats = Vec::new();
+ formats.push(StreamFormatInfo {
+ codec: t.codec_id.to_string(),
+ remux: true,
+ byterate: 10., // TODO
+ containers: [StreamContainer::Matroska].to_vec(),
+ bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)),
+ samplerate: t.audio.as_ref().map(|a| a.sampling_frequency),
+ channels: t.audio.as_ref().map(|a| a.channels as usize),
+ pixel_count: t.video.as_ref().map(|v| v.pixel_width * v.pixel_height),
+ ..Default::default()
+ });
+ tracks.push(StreamTrackInfo {
+ name: None,
+ kind: match t.track_type {
+ 1 => TrackKind::Video,
+ 2 => TrackKind::Audio,
+ 17 => TrackKind::Subtitle,
+ _ => todo!(),
+ },
+ formats,
+ })
+ }
+ }
+ }
+
+ let segment = StreamSegmentInfo {
+ name: None,
+ duration: 0,
+ tracks,
+ };
+ Ok(StreamInfo {
+ name: info.info.title.clone(),
+ segments: vec![segment],
+ })
+}
+
+async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> {
+ let info = stream_info(info).await?;
+ b.write_all(&serde_json::to_vec(&info)?).await?;
+ Ok(())
+}
+
async fn remux_stream(
node: Arc<Node>,
local_tracks: Vec<LocalTrack>,
@@ -146,20 +215,20 @@ async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) ->
}
}
-// TODO functions to test seekability, get live status and enumate segments
-trait MediaSource {
- fn loaded_ranges(&self) -> Result<Vec<Range<(u64, u64)>>>;
- /// Seeks to some position close to, but before, `time` ticks.
- fn seek(&mut self, segment: u64, time: u64) -> Result<()>;
- /// Retrieve headers (info and tracks) for some segment.
- fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>;
- /// Returns the next block and the current segment index
- fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>;
-}
-pub struct AbsBlock {
- track: u64,
- pts: u64,
- keyframe: bool,
- lacing: Option<LacingType>,
- data: Vec<u8>,
-}
+// // TODO functions to test seekability, get live status and enumate segments
+// trait MediaSource {
+// fn loaded_range(&self) -> Result<Range<(u64, u64)>>;
+// /// Seeks to some position close to, but before, `time` ticks.
+// fn seek(&mut self, segment: u64, time: u64) -> Result<()>;
+// /// Retrieve headers (info and tracks) for some segment.
+// fn segment_headers(&mut self, seg: u64) -> Result<(Info, Tracks)>;
+// /// Returns the next block and the current segment index
+// fn next(&mut self) -> Result<Option<(u64, AbsBlock)>>;
+// }
+// pub struct AbsBlock {
+// track: u64,
+// pts: u64,
+// keyframe: bool,
+// lacing: Option<LacingType>,
+// data: Vec<u8>,
+// }