aboutsummaryrefslogtreecommitdiff
path: root/stream
diff options
context:
space:
mode:
Diffstat (limited to 'stream')
-rw-r--r--stream/Cargo.toml2
-rw-r--r--stream/src/cues.rs57
-rw-r--r--stream/src/fragment.rs262
-rw-r--r--stream/src/fragment_index.rs53
-rw-r--r--stream/src/hls.rs86
-rw-r--r--stream/src/lib.rs110
-rw-r--r--stream/src/metadata.rs26
-rw-r--r--stream/src/stream_info.rs79
-rw-r--r--stream/types/src/lib.rs78
9 files changed, 376 insertions, 377 deletions
diff --git a/stream/Cargo.toml b/stream/Cargo.toml
index fb8cfe2..8e71e1c 100644
--- a/stream/Cargo.toml
+++ b/stream/Cargo.toml
@@ -12,7 +12,5 @@ jellystream-types = { path = "types" }
log = { workspace = true }
anyhow = { workspace = true }
-tokio = { version = "1.43.0", features = ["io-util"] }
-tokio-util = { version = "0.7.13", features = ["io", "io-util"] }
serde_json = "1.0.138"
serde = { version = "1.0.217", features = ["derive"] }
diff --git a/stream/src/cues.rs b/stream/src/cues.rs
new file mode 100644
index 0000000..b486a6f
--- /dev/null
+++ b/stream/src/cues.rs
@@ -0,0 +1,57 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+
+use anyhow::{anyhow, Result};
+use jellycache::cache_memory;
+use jellyremuxer::demuxers::create_demuxer_autodetect;
+use serde::{Deserialize, Serialize};
+use std::{collections::BTreeMap, fs::File, path::Path, sync::Arc};
+
+#[derive(Serialize, Deserialize, Default)]
+pub struct TrackStat {
+ pub num_blocks: usize,
+ pub total_size: u64,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct GeneratedCue {
+ pub position: u64,
+ pub time: u64,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct StatsAndCues {
+ pub stats: BTreeMap<u64, TrackStat>,
+ pub cues: Vec<GeneratedCue>,
+}
+
+pub fn generate_cues(path: &Path) -> Result<Arc<StatsAndCues>> {
+ cache_memory("generated-cues", path, move || {
+ let media = File::open(path)?;
+ let mut media =
+ create_demuxer_autodetect(Box::new(media))?.ok_or(anyhow!("media format unknown"))?;
+
+ let info = media.info()?;
+ media.seek_cluster(None)?;
+
+ let mut stats = BTreeMap::<u64, TrackStat>::new();
+ let mut cues = Vec::new();
+
+ while let Some((position, cluster)) = media.read_cluster()? {
+ cues.push(GeneratedCue {
+ position,
+ time: cluster.timestamp * info.timestamp_scale,
+ });
+ for block in cluster.simple_blocks {
+ let e = stats.entry(block.track).or_default();
+ e.num_blocks += 1;
+ e.total_size += block.data.len() as u64;
+ }
+ }
+
+ Ok(StatsAndCues { stats, cues })
+ })
+}
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index 89ce94f..3b4bb0f 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,44 +3,39 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{stream_info, SMediaInfo};
-use anyhow::{anyhow, bail, Result};
-use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm};
-use jellystream_types::StreamContainer;
-use jellytranscoder::fragment::transcode;
-use log::warn;
+use crate::{cues::generate_cues, stream_info, SMediaInfo};
+use anyhow::{anyhow, Result};
+use jellyremuxer::{
+ demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat,
+};
+use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum};
use std::{
- io::{Cursor, Seek, SeekFrom},
+ fs::File,
+ io::{Cursor, Read},
sync::Arc,
};
-use tokio::{fs::File, io::DuplexStream};
-use tokio_util::io::SyncIoBridge;
-pub async fn fragment_stream(
- mut b: DuplexStream,
+pub fn fragment_stream(
info: Arc<SMediaInfo>,
- track: usize,
- segment: usize,
- index: usize,
- format_num: usize,
+ track: TrackNum,
+ index: IndexNum,
+ format_num: FormatNum,
container: StreamContainer,
-) -> Result<()> {
- let (iinfo, info) = stream_info(info).await?;
+) -> Result<Box<dyn Read + Send + Sync>> {
+ let (iinfo, info) = stream_info(info)?;
+
let (file_index, track_num) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("track not found"))?;
- let path = iinfo.paths[file_index].clone();
- let seg = info
- .segments
- .get(segment)
- .ok_or(anyhow!("segment not found"))?;
- let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?;
+ let media_path = iinfo.paths[file_index].clone();
+ let track = info.tracks.get(track).ok_or(anyhow!("track not found"))?;
let format = track
.formats
.get(format_num)
.ok_or(anyhow!("format not found"))?;
- let orig_track = iinfo.metadata[file_index]
+
+ let mk_track = iinfo.metadata[file_index]
.tracks
.as_ref()
.unwrap()
@@ -49,97 +44,138 @@ pub async fn fragment_stream(
.find(|t| t.track_number == track_num)
.unwrap();
+ let mk_info = matroska::Info {
+ duration: Some(info.duration),
+ timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale,
+ ..Default::default()
+ };
+ let mk_tracks = matroska::Tracks {
+ entries: vec![mk_track.to_owned()],
+ };
+
+ let cue_stat = generate_cues(&media_path)?;
+ let cluster_offset = cue_stat
+ .cues
+ .get(index)
+ .ok_or(anyhow!("fragment index out of range"))?
+ .position;
+
+ let cluster = {
+ let media_file = File::open(media_path)?;
+ let mut media = create_demuxer_autodetect(Box::new(media_file))?
+ .ok_or(anyhow!("media container unknown"))?;
+ media.seek_cluster(Some(cluster_offset))?;
+ media
+ .read_cluster()?
+ .ok_or(anyhow!("cluster unexpectedly missing"))?
+ .1
+ };
+
+ let jr_container = match container {
+ StreamContainer::WebM => ContainerFormat::Webm,
+ StreamContainer::Matroska => ContainerFormat::Matroska,
+ StreamContainer::WebVTT => todo!(),
+ StreamContainer::MPEG4 => todo!(),
+ StreamContainer::JVTT => todo!(),
+ };
+
if format.remux {
- match container {
- StreamContainer::WebM | StreamContainer::Matroska => {
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- SyncIoBridge::new(b),
- &path,
- track_num,
- container == StreamContainer::WebM,
- &info.name.unwrap_or_default(),
- index,
- ) {
- warn!("segment stream error: {err}");
- }
- });
- }
- StreamContainer::MPEG4 => {
- tokio::task::spawn_blocking(move || {
- let mut buf = Cursor::new(Vec::new());
- if let Err(err) = jellyremuxer::write_fragment_into(
- &mut buf,
- &path,
- track_num,
- false,
- &info.name.unwrap_or_default(),
- index,
- ) {
- warn!("segment stream error: {err}");
- }
- buf.seek(SeekFrom::Start(0)).unwrap();
- if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) {
- warn!("mpeg4 transmux failed: {err}");
- }
- });
- }
- StreamContainer::JVTT => {}
- _ => bail!("not yet supported"),
- }
+ let mut out = Vec::new();
+ write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?;
+ Ok(Box::new(Cursor::new(out)))
} else {
- let location = transcode(
- track.kind,
- orig_track,
- format,
- &format!("{path:?} {track_num} {index}"),
- move |b| {
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- SyncIoBridge::new(b),
- &path,
- track_num,
- false,
- &info.name.unwrap_or_default(),
- index,
- ) {
- warn!("segment stream error: {err}");
- }
- });
- },
- )
- .await?;
-
- let mut frag = File::open(location.abs()).await?;
- match container {
- StreamContainer::WebM => {
- tokio::task::spawn_blocking(move || {
- if let Err(err) =
- matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
- {
- warn!("webm transmux failed: {err}");
- }
- });
- }
- StreamContainer::Matroska => {
- tokio::task::spawn(async move {
- if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
- warn!("cannot write stream: {err}")
- }
- });
- }
- StreamContainer::MPEG4 => {
- tokio::task::spawn_blocking(move || {
- if let Err(err) =
- matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
- {
- warn!("mpeg4 transmux failed: {err}");
- }
- });
- }
- _ => bail!("unsupported"),
- }
+ todo!()
}
- Ok(())
+ // if format.remux {
+ // match container {
+ // StreamContainer::WebM | StreamContainer::Matroska => {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // SyncIoBridge::new(b),
+ // &path,
+ // track_num,
+ // container == StreamContainer::WebM,
+ // &info.name.unwrap_or_default(),
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // });
+ // }
+ // StreamContainer::MPEG4 => {
+ // tokio::task::spawn_blocking(move || {
+ // let mut buf = Cursor::new(Vec::new());
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // &mut buf,
+ // &path,
+ // track_num,
+ // false,
+ // &info.name.unwrap_or_default(),
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // buf.seek(SeekFrom::Start(0)).unwrap();
+ // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) {
+ // warn!("mpeg4 transmux failed: {err}");
+ // }
+ // });
+ // }
+ // StreamContainer::JVTT => {}
+ // _ => bail!("not yet supported"),
+ // }
+ // } else {
+ // let location = transcode(
+ // track.kind,
+ // orig_track,
+ // format,
+ // &format!("{path:?} {track_num} {index}"),
+ // move |b| {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // SyncIoBridge::new(b),
+ // &path,
+ // track_num,
+ // false,
+ // &info.name.unwrap_or_default(),
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // });
+ // },
+ // )
+ // .await?;
+
+ // let mut frag = File::open(location.abs()).await?;
+ // match container {
+ // StreamContainer::WebM => {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) =
+ // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ // {
+ // warn!("webm transmux failed: {err}");
+ // }
+ // });
+ // }
+ // StreamContainer::Matroska => {
+ // tokio::task::spawn(async move {
+ // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
+ // warn!("cannot write stream: {err}")
+ // }
+ // });
+ // }
+ // StreamContainer::MPEG4 => {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) =
+ // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ // {
+ // warn!("mpeg4 transmux failed: {err}");
+ // }
+ // });
+ // }
+ // _ => bail!("unsupported"),
+ // }
+ // }
}
diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs
index cb54948..9d82cd7 100644
--- a/stream/src/fragment_index.rs
+++ b/stream/src/fragment_index.rs
@@ -3,30 +3,45 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{stream_info, SMediaInfo};
+use crate::{cues::generate_cues, stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellystream_types::{SegmentNum, TrackNum};
-use std::sync::Arc;
-use tokio::io::{AsyncWriteExt, DuplexStream};
+use jellystream_types::TrackNum;
+use std::{
+ io::{Cursor, Read},
+ ops::Range,
+ sync::Arc,
+};
-pub async fn fragment_index_stream(
- mut b: DuplexStream,
- info: Arc<SMediaInfo>,
- _segment: SegmentNum,
- track: TrackNum,
-) -> Result<()> {
- let (iinfo, _info) = stream_info(info).await?;
- let (file_index, track_num) = *iinfo
+pub fn fragment_index(info: Arc<SMediaInfo>, track: TrackNum) -> Result<Vec<Range<f64>>> {
+ let (iinfo, info) = stream_info(info)?;
+ let (file_index, _) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("track not found"))?;
- let fragments = tokio::task::spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
- })
- .await??;
+ let cue_stat = generate_cues(&iinfo.paths[file_index])?;
+
+ Ok(cue_stat
+ .cues
+ .iter()
+ .map(|c| c.time as f64 / 1_000_000_000.)
+ .zip(
+ cue_stat
+ .cues
+ .iter()
+ .skip(1)
+ .map(|c| c.time as f64 / 1_000_000_000.)
+ .chain([info.duration]),
+ )
+ .map(|(start, end)| start..end)
+ .collect())
+}
- let out = serde_json::to_string(&fragments)?;
- tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
+pub fn fragment_index_stream(
+ info: Arc<SMediaInfo>,
+ track: TrackNum,
+) -> Result<Box<dyn Read + Send + Sync>> {
+ Ok(Box::new(Cursor::new(serde_json::to_string(
+ &fragment_index(info, track)?,
+ )?)))
}
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 949ddb4..2c91365 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -4,60 +4,30 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{stream_info, SMediaInfo};
-use anyhow::{anyhow, Result};
-use jellystream_types::{FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum};
-use std::{fmt::Write, ops::Range, sync::Arc};
-use tokio::{
- io::{AsyncWriteExt, DuplexStream},
- task::spawn_blocking,
+use crate::{fragment_index::fragment_index, stream_info, SMediaInfo};
+use anyhow::Result;
+use jellystream_types::{FormatNum, StreamContainer, StreamSpec, TrackKind, TrackNum};
+use std::{
+ fmt::Write,
+ io::{Cursor, Read},
+ ops::Range,
+ sync::Arc,
};
-pub async fn hls_supermultivariant_stream(
- mut b: DuplexStream,
+pub fn hls_multivariant_stream(
info: Arc<SMediaInfo>,
container: StreamContainer,
-) -> Result<()> {
- let (_iinfo, info) = stream_info(info).await?;
- let mut out = String::new();
- writeln!(out, "#EXTM3U")?;
- writeln!(out, "#EXT-X-VERSION:4")?;
- for (i, _seg) in info.segments.iter().enumerate() {
- let uri = format!(
- "stream{}",
- StreamSpec::HlsMultiVariant {
- segment: i,
- container,
- }
- .to_query()
- );
- writeln!(out, "{uri}")?;
- }
- tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
-}
-
-pub async fn hls_multivariant_stream(
- mut b: DuplexStream,
- info: Arc<SMediaInfo>,
- segment: SegmentNum,
- container: StreamContainer,
-) -> Result<()> {
- let (_iinfo, info) = stream_info(info).await?;
- let seg = info
- .segments
- .get(segment)
- .ok_or(anyhow!("segment not found"))?;
+) -> Result<Box<dyn Read + Send + Sync>> {
+ let (_iinfo, info) = stream_info(info)?;
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-VERSION:4")?;
// writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?;
- for (i, t) in seg.tracks.iter().enumerate() {
+ for (i, t) in info.tracks.iter().enumerate() {
let uri = format!(
"stream{}",
StreamSpec::HlsVariant {
- segment,
track: i,
container,
format: 0
@@ -73,37 +43,23 @@ pub async fn hls_multivariant_stream(
writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?;
writeln!(out, "{uri}")?;
}
- tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
+
+ Ok(Box::new(Cursor::new(out)))
}
-pub async fn hls_variant_stream(
- mut b: DuplexStream,
+pub fn hls_variant_stream(
info: Arc<SMediaInfo>,
- segment: SegmentNum,
track: TrackNum,
format: FormatNum,
container: StreamContainer,
-) -> Result<()> {
- let (iinfo, info) = stream_info(info).await?;
- let (file_index, track_num) = *iinfo
- .track_to_file
- .get(track)
- .ok_or(anyhow!("track not found"))?;
- let seg = info
- .segments
- .get(segment)
- .ok_or(anyhow!("segment not found"))?;
-
- let frags = spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
- })
- .await??;
+) -> Result<Box<dyn Read + Send + Sync>> {
+ let frags = fragment_index(info.clone(), track)?;
+ let (_, info) = stream_info(info)?;
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?;
- writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?;
+ writeln!(out, "#EXT-X-TARGETDURATION:{}", info.duration)?;
writeln!(out, "#EXT-X-VERSION:4")?;
writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?;
@@ -113,7 +69,6 @@ pub async fn hls_variant_stream(
out,
"stream{}",
StreamSpec::Fragment {
- segment,
track,
index,
container,
@@ -125,6 +80,5 @@ pub async fn hls_variant_stream(
writeln!(out, "#EXT-X-ENDLIST")?;
- tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
+ Ok(Box::new(Cursor::new(out)))
}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 5b4e8ed..60c283c 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -4,30 +4,29 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
#![feature(iterator_try_collect)]
+pub mod cues;
mod fragment;
mod fragment_index;
mod hls;
+pub mod metadata;
mod stream_info;
mod webvtt;
use anyhow::{anyhow, bail, Context, Result};
use fragment::fragment_stream;
use fragment_index::fragment_index_stream;
-use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream};
+use hls::{hls_multivariant_stream, hls_variant_stream};
use jellystream_types::{StreamContainer, StreamSpec};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeSet,
- io::SeekFrom,
+ fs::File,
+ io::{Read, Seek, SeekFrom},
ops::Range,
path::PathBuf,
sync::{Arc, LazyLock, Mutex},
};
use stream_info::{stream_info, write_stream_info};
-use tokio::{
- fs::File,
- io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
-};
#[rustfmt::skip]
#[derive(Debug, Deserialize, Serialize, Default)]
@@ -60,98 +59,69 @@ pub struct StreamHead {
}
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- let cons = |ct: &'static str, rs: bool| StreamHead {
- content_type: ct,
- range_supported: rs,
- };
+ use StreamContainer::*;
+ use StreamSpec::*;
let container_ct = |x: StreamContainer| match x {
- StreamContainer::WebM => "video/webm",
- StreamContainer::Matroska => "video/x-matroska",
- StreamContainer::WebVTT => "text/vtt",
- StreamContainer::JVTT => "application/jellything-vtt+json",
- StreamContainer::MPEG4 => "video/mp4",
+ WebM => "video/webm",
+ Matroska => "video/x-matroska",
+ WebVTT => "text/vtt",
+ JVTT => "application/jellything-vtt+json",
+ MPEG4 => "video/mp4",
};
- match spec {
- StreamSpec::Remux { container, .. } => cons(container_ct(*container), true),
- StreamSpec::Original { .. } => cons("video/x-matroska", true),
- StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
- StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
- StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false),
- StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false),
- StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false),
- StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false),
+ let range_supported = matches!(spec, Remux { .. } | Original { .. });
+ let content_type = match spec {
+ Original { .. } => "video/x-matroska",
+ HlsMultiVariant { .. } => "application/vnd.apple.mpegurl",
+ HlsVariant { .. } => "application/vnd.apple.mpegurl",
+ Info { .. } => "application/jellything-stream-info+json",
+ FragmentIndex { .. } => "application/jellything-frag-index+json",
+ Fragment { container, .. } => container_ct(*container),
+ Remux { container, .. } => container_ct(*container),
+ };
+ StreamHead {
+ content_type,
+ range_supported,
}
}
-pub async fn stream(
+pub fn stream(
info: Arc<SMediaInfo>,
spec: StreamSpec,
- range: Range<usize>,
-) -> Result<DuplexStream> {
- let (a, b) = duplex(4096);
-
+ range: Range<u64>,
+) -> Result<Box<dyn Read + Send + Sync>> {
match spec {
- StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
- StreamSpec::HlsSuperMultiVariant { container } => {
- hls_supermultivariant_stream(b, info, container).await?;
- }
- StreamSpec::HlsMultiVariant { segment, container } => {
- hls_multivariant_stream(b, info, segment, container).await?
- }
+ StreamSpec::Original { track } => original_stream(info, track, range),
+ StreamSpec::HlsMultiVariant { container } => hls_multivariant_stream(info, container),
StreamSpec::HlsVariant {
- segment,
track,
container,
format,
- } => hls_variant_stream(b, info, segment, track, format, container).await?,
- StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?,
- StreamSpec::FragmentIndex { segment, track } => {
- fragment_index_stream(b, info, segment, track).await?
- }
+ } => hls_variant_stream(info, track, format, container),
+ StreamSpec::Info => write_stream_info(info),
+ StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track),
StreamSpec::Fragment {
- segment,
track,
index,
container,
format,
- } => fragment_stream(b, info, track, segment, index, format, container).await?,
+ } => fragment_stream(info, track, index, format, container),
_ => bail!("todo"),
}
-
- Ok(a)
}
-async fn original_stream(
+fn original_stream(
info: Arc<SMediaInfo>,
track: usize,
- range: Range<usize>,
- b: DuplexStream,
-) -> Result<()> {
- let (iinfo, _info) = stream_info(info).await?;
+ range: Range<u64>,
+) -> Result<Box<dyn Read+ Send + Sync>> {
+ let (iinfo, _info) = stream_info(info)?;
let (file_index, _) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("unknown track"))?;
- let mut file = File::open(&iinfo.paths[file_index])
- .await
- .context("opening source")?;
+ let mut file = File::open(&iinfo.paths[file_index]).context("opening source")?;
file.seek(SeekFrom::Start(range.start as u64))
- .await
.context("seek source")?;
- tokio::task::spawn(copy_stream(file, b, range.end - range.start));
-
- Ok(())
-}
-
-async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> {
- let mut buf = [0u8; 4096];
- loop {
- let size = inp.read(&mut buf[..amount.min(4096)]).await?;
- if size == 0 {
- break Ok(());
- }
- out.write_all(&buf[..size]).await?;
- amount -= size;
- }
+ Ok(Box::new(file.take(range.end - range.start)))
}
diff --git a/stream/src/metadata.rs b/stream/src/metadata.rs
new file mode 100644
index 0000000..9bfa3aa
--- /dev/null
+++ b/stream/src/metadata.rs
@@ -0,0 +1,26 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+
+use anyhow::{anyhow, Result};
+use jellycache::cache_memory;
+use jellyremuxer::{demuxers::create_demuxer_autodetect, matroska::Segment};
+use std::{fs::File, path::Path, sync::Arc};
+
+pub fn read_metadata(path: &Path) -> Result<Arc<Segment>> {
+ cache_memory("mkmeta-v4", path, move || {
+ let media = File::open(path)?;
+ let mut media =
+ create_demuxer_autodetect(Box::new(media))?.ok_or(anyhow!("media format unknown"))?;
+
+ let info = media.info()?;
+ let tracks = media.tracks()?;
+ Ok(Segment {
+ info,
+ tracks,
+ ..Default::default()
+ })
+ })
+}
diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs
index 560ec9b..7ebc399 100644
--- a/stream/src/stream_info.rs
+++ b/stream/src/stream_info.rs
@@ -3,59 +3,51 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{SMediaInfo, CONF};
+use crate::{cues::generate_cues, metadata::read_metadata, SMediaInfo, CONF};
use anyhow::Result;
-use jellyremuxer::{
- metadata::{matroska_metadata, MatroskaMetadata, MatroskaTrackEntry},
- seek_index::get_track_sizes,
-};
+use jellyremuxer::matroska::{self, Segment, TrackEntry, TrackType};
use jellystream_types::{
- StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, TrackKind,
+ StreamContainer, StreamFormatInfo, StreamInfo, StreamTrackInfo, TrackKind,
};
-use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
-use tokio::{
- io::{AsyncWriteExt, DuplexStream},
- spawn,
- task::spawn_blocking,
+use std::{
+ io::{Cursor, Read},
+ path::PathBuf,
+ sync::Arc,
};
-async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>> {
- spawn_blocking(move || matroska_metadata(&path)).await?
-}
-
-async fn async_get_track_sizes(path: PathBuf) -> Result<BTreeMap<u64, usize>> {
- spawn_blocking(move || get_track_sizes(&path)).await?
-}
-
pub(crate) struct InternalStreamInfo {
pub paths: Vec<PathBuf>,
- pub metadata: Vec<Arc<MatroskaMetadata>>,
+ pub metadata: Vec<Arc<Segment>>,
pub track_to_file: Vec<(usize, u64)>,
}
// TODO cache mem
-pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> {
+pub(crate) fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> {
let mut tracks = Vec::new();
let mut track_to_file = Vec::new();
let mut metadata_arr = Vec::new();
let mut paths = Vec::new();
for (i, path) in info.files.iter().enumerate() {
- let metadata = async_matroska_metadata(path.clone()).await?;
- let sizes = async_get_track_sizes(path.clone()).await?;
+ let metadata = read_metadata(&path)?;
+ let cue_stat = generate_cues(&path)?;
if let Some(t) = &metadata.tracks {
- let duration = media_duration(&metadata);
+ let duration = media_duration(&metadata.info);
for t in &t.entries {
- let bitrate =
- sizes.get(&t.track_number).copied().unwrap_or_default() as f64 / duration * 8.;
+ let byterate = cue_stat
+ .stats
+ .get(&t.track_number)
+ .map(|e| e.total_size)
+ .unwrap_or_default() as f64
+ / duration;
tracks.push(StreamTrackInfo {
name: None,
kind: match t.track_type {
- 1 => TrackKind::Video,
- 2 => TrackKind::Audio,
- 17 => TrackKind::Subtitle,
+ matroska::TrackType::Video => TrackKind::Video,
+ matroska::TrackType::Audio => TrackKind::Audio,
+ matroska::TrackType::Subtitle => TrackKind::Subtitle,
_ => todo!(),
},
- formats: stream_formats(t, bitrate),
+ formats: stream_formats(t, byterate * 8.),
});
track_to_file.push((i, t.track_number));
}
@@ -64,11 +56,7 @@ pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStream
paths.push(path.to_owned());
}
- let segment = StreamSegmentInfo {
- name: None,
- duration: media_duration(&metadata_arr[0]),
- tracks,
- };
+ let duration = media_duration(&metadata_arr[0].info); // TODO different durations?!
Ok((
InternalStreamInfo {
metadata: metadata_arr,
@@ -77,12 +65,13 @@ pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStream
},
StreamInfo {
name: info.title.clone(),
- segments: vec![segment],
+ duration,
+ tracks,
},
))
}
-fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> {
+fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> {
let mut formats = Vec::new();
formats.push(StreamFormatInfo {
codec: t.codec_id.to_string(),
@@ -97,7 +86,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamForma
});
match t.track_type {
- 1 => {
+ TrackType::Video => {
let sw = t.video.as_ref().unwrap().pixel_width;
let sh = t.video.as_ref().unwrap().pixel_height;
for (w, br) in [
@@ -136,7 +125,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamForma
}
}
}
- 2 => {
+ TrackType::Audio => {
for br in [256e3, 128e3, 64e3] {
formats.push(StreamFormatInfo {
codec: "A_OPUS".to_string(),
@@ -151,7 +140,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamForma
});
}
}
- 17 => {}
+ TrackType::Subtitle => {}
_ => {}
}
@@ -168,13 +157,11 @@ fn containers_by_codec(codec: &str) -> Vec<StreamContainer> {
}
}
-pub(crate) async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> {
- let (_, info) = stream_info(info).await?;
- spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await });
- Ok(())
+pub(crate) fn write_stream_info(info: Arc<SMediaInfo>) -> Result<Box<dyn Read + Send + Sync>> {
+ let (_, info) = stream_info(info)?;
+ Ok(Box::new(Cursor::new(serde_json::to_vec(&info)?)))
}
-fn media_duration(m: &MatroskaMetadata) -> f64 {
- let info = m.info.as_ref().unwrap();
+fn media_duration(info: &matroska::Info) -> f64 {
(info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000.
}
diff --git a/stream/types/src/lib.rs b/stream/types/src/lib.rs
index a90db03..a031f3a 100644
--- a/stream/types/src/lib.rs
+++ b/stream/types/src/lib.rs
@@ -6,7 +6,6 @@
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, fmt::Display, str::FromStr};
-pub type SegmentNum = usize;
pub type TrackNum = usize;
pub type FormatNum = usize;
pub type IndexNum = usize;
@@ -27,28 +26,19 @@ pub enum StreamSpec {
Original {
track: TrackNum,
},
- HlsSuperMultiVariant {
- container: StreamContainer,
- },
HlsMultiVariant {
- segment: SegmentNum,
container: StreamContainer,
},
HlsVariant {
- segment: SegmentNum,
track: TrackNum,
container: StreamContainer,
format: FormatNum,
},
- Info {
- segment: Option<u64>,
- },
+ Info,
FragmentIndex {
- segment: SegmentNum,
track: TrackNum,
},
Fragment {
- segment: SegmentNum,
track: TrackNum,
index: IndexNum,
container: StreamContainer,
@@ -65,12 +55,6 @@ pub enum StreamSpec {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamInfo {
pub name: Option<String>,
- pub segments: Vec<StreamSegmentInfo>,
-}
-
-#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct StreamSegmentInfo {
- pub name: Option<String>,
pub duration: f64,
pub tracks: Vec<StreamTrackInfo>,
}
@@ -128,35 +112,25 @@ impl StreamSpec {
)
}
StreamSpec::Original { track } => format!("?original&track={track}"),
- StreamSpec::HlsSuperMultiVariant { container } => {
- format!("?hlssupermultivariant&container={container}")
- }
- StreamSpec::HlsMultiVariant { segment, container } => {
- format!("?hlsmultivariant&segment={segment}&container={container}")
+ StreamSpec::HlsMultiVariant { container } => {
+ format!("?hlsmultivariant&container={container}")
}
StreamSpec::HlsVariant {
- segment,
track,
container,
format,
- } => format!(
- "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}"
- ),
- StreamSpec::Info {
- segment: Some(segment),
- } => format!("?info&segment={segment}"),
- StreamSpec::Info { segment: None } => "?info".to_string(),
- StreamSpec::FragmentIndex { segment, track } => {
- format!("?fragmentindex&segment={segment}&track={track}")
+ } => format!("?hlsvariant&track={track}&container={container}&format={format}"),
+ StreamSpec::Info => "?info".to_string(),
+ StreamSpec::FragmentIndex { track } => {
+ format!("?fragmentindex&track={track}")
}
StreamSpec::Fragment {
- segment,
track,
index,
container,
format,
} => format!(
- "?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}"
+ "?fragment&track={track}&index={index}&container={container}&format={format}"
),
}
}
@@ -173,32 +147,24 @@ impl StreamSpec {
)
}
StreamSpec::Original { track } => format!("?original&t={track}"),
- StreamSpec::HlsSuperMultiVariant { container } => {
- format!("?hlssupermultivariant&c={container}")
- }
- StreamSpec::HlsMultiVariant { segment, container } => {
- format!("?hlsmultivariant&s={segment}&c={container}")
+ StreamSpec::HlsMultiVariant { container } => {
+ format!("?hlsmultivariant&c={container}")
}
StreamSpec::HlsVariant {
- segment,
track,
container,
format,
- } => format!("?hlsvariant&s={segment}&t={track}&c={container}&f={format}"),
- StreamSpec::Info {
- segment: Some(segment),
- } => format!("?info&s={segment}"),
- StreamSpec::Info { segment: None } => "?info".to_string(),
- StreamSpec::FragmentIndex { segment, track } => {
- format!("?fragmentindex&s={segment}&t={track}")
+ } => format!("?hlsvariant&t={track}&c={container}&f={format}"),
+ StreamSpec::Info => "?info".to_string(),
+ StreamSpec::FragmentIndex { track } => {
+ format!("?fragmentindex&t={track}")
}
StreamSpec::Fragment {
- segment,
track,
index,
container,
format,
- } => format!("?fragment&s={segment}&t={track}&i={index}&c={container}&f={format}"),
+ } => format!("?fragment&t={track}&i={index}&c={container}&f={format}"),
}
}
pub fn from_query_kv(query: &BTreeMap<String, String>) -> Result<Self, &'static str> {
@@ -207,7 +173,7 @@ impl StreamSpec {
.get(k)
.or(query.get(ks))
.ok_or(k)
- .and_then(|a| a.parse().map_err(|_| "invalid number"))
+ .and_then(|a| a.parse::<usize>().map_err(|_| "invalid number"))
};
let get_container = || {
query
@@ -217,28 +183,19 @@ impl StreamSpec {
.and_then(|s| s.parse().map_err(|()| "unknown container"))
};
if query.contains_key("info") {
- Ok(Self::Info {
- segment: get_num("segment", "s").ok(),
- })
- } else if query.contains_key("hlssupermultivariant") {
- Ok(Self::HlsSuperMultiVariant {
- container: get_container().ok().unwrap_or(StreamContainer::Matroska),
- })
+ Ok(Self::Info)
} else if query.contains_key("hlsmultivariant") {
Ok(Self::HlsMultiVariant {
- segment: get_num("segment", "s")? as SegmentNum,
container: get_container()?,
})
} else if query.contains_key("hlsvariant") {
Ok(Self::HlsVariant {
- segment: get_num("segment", "s")? as SegmentNum,
track: get_num("track", "t")? as TrackNum,
format: get_num("format", "f")? as FormatNum,
container: get_container()?,
})
} else if query.contains_key("fragment") {
Ok(Self::Fragment {
- segment: get_num("segment", "s")? as SegmentNum,
track: get_num("track", "t")? as TrackNum,
format: get_num("format", "f")? as FormatNum,
index: get_num("index", "i")? as IndexNum,
@@ -246,7 +203,6 @@ impl StreamSpec {
})
} else if query.contains_key("fragmentindex") {
Ok(Self::FragmentIndex {
- segment: get_num("segment", "s")? as SegmentNum,
track: get_num("track", "t")? as TrackNum,
})
} else {