aboutsummaryrefslogtreecommitdiff
path: root/stream/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-16 20:06:01 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-16 20:06:01 +0200
commitd26849375c70c795fdf664f9dfea68c273b6d483 (patch)
tree53ad4f0eff3604e80b27ff0abf0438ea6c69d432 /stream/src
parent1cd966f7454f052fda6c6c9ae1597479f05e23d9 (diff)
parentcdf95d7b80bd2b78895671da8f462145bb5db522 (diff)
downloadjellything-d26849375c70c795fdf664f9dfea68c273b6d483.tar
jellything-d26849375c70c795fdf664f9dfea68c273b6d483.tar.bz2
jellything-d26849375c70c795fdf664f9dfea68c273b6d483.tar.zst
Merge branch 'rewrite-stream'
Diffstat (limited to 'stream/src')
-rw-r--r--stream/src/fragment.rs126
-rw-r--r--stream/src/fragment_index.rs32
-rw-r--r--stream/src/hls.rs110
-rw-r--r--stream/src/jhls.rs47
-rw-r--r--stream/src/lib.rs167
-rw-r--r--stream/src/stream_info.rs169
-rw-r--r--stream/src/webvtt.rs116
7 files changed, 497 insertions, 270 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index e276d29..dfe101e 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,16 +3,10 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, bail, Result};
-use jellybase::{
- common::{
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
+use jellybase::common::stream::StreamContainer;
+use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm};
use jellytranscoder::fragment::transcode;
use log::warn;
use std::sync::Arc;
@@ -20,40 +14,57 @@ use tokio::{fs::File, io::DuplexStream};
use tokio_util::io::SyncIoBridge;
pub async fn fragment_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
mut b: DuplexStream,
- perms: &PermissionSet,
+ info: Arc<SMediaInfo>,
+ track: usize,
+ segment: usize,
+ index: usize,
+ format_num: usize,
+ container: StreamContainer,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("unsupported number of tracks for segment, must be exactly one");
- }
- let track = spec.track[0];
- let n = spec.index.ok_or(anyhow!("segment index missing"))?;
-
- let local_track = local_tracks
- .first()
- .ok_or(anyhow!("track missing"))?
- .to_owned();
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let path = iinfo.paths[file_index].clone();
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+ let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?;
+ let format = track
+ .formats
+ .get(format_num)
+ .ok_or(anyhow!("format not found"))?;
- if let Some(profile) = spec.profile {
- perms.assert(&UserPermission::Transcode)?;
+ if format.remux {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = jellyremuxer::write_fragment_into(
+ SyncIoBridge::new(b),
+ &path,
+ track_num,
+ container == StreamContainer::WebM,
+ &info.name.unwrap_or_default(),
+ index,
+ ) {
+ warn!("segment stream error: {err}");
+ }
+ });
+ } else {
let location = transcode(
- &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source
- CONF.transcoding_profiles
- .get(profile)
- .ok_or(anyhow!("profile out of range"))?,
+ &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source
+ track.kind,
+ format,
move |b| {
tokio::task::spawn_blocking(move || {
if let Err(err) = jellyremuxer::write_fragment_into(
SyncIoBridge::new(b),
- &CONF.media_path,
- &node,
- &local_track,
- track,
+ &path,
+ track_num,
false,
- n,
+ &info.name.unwrap_or_default(),
+ index,
) {
warn!("segment stream error: {err}");
}
@@ -61,27 +72,36 @@ pub async fn fragment_stream(
},
)
.await?;
- let mut output = File::open(location.abs()).await?;
- tokio::task::spawn(async move {
- if let Err(err) = tokio::io::copy(&mut output, &mut b).await {
- warn!("cannot write stream: {err}")
+
+ let mut frag = File::open(location.abs()).await?;
+ match container {
+ StreamContainer::WebM => {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) =
+ matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ {
+ warn!("webm transmux failed: {err}");
+ }
+ });
}
- });
- } else {
- let b = SyncIoBridge::new(b);
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- b,
- &CONF.media_path,
- &node,
- &local_track,
- track,
- spec.webm.unwrap_or(false),
- n,
- ) {
- warn!("segment stream error: {err}");
+ StreamContainer::Matroska => {
+ tokio::task::spawn(async move {
+ if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
+ warn!("cannot write stream: {err}")
+ }
+ });
}
- });
+ StreamContainer::MPEG4 => {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) =
+ matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
+ {
+ warn!("mpeg4 transmux failed: {err}");
+ }
+ });
+ }
+ _ => bail!("unsupported"),
+ }
}
Ok(())
diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs
new file mode 100644
index 0000000..6fbddc6
--- /dev/null
+++ b/stream/src/fragment_index.rs
@@ -0,0 +1,32 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use crate::{stream_info, SMediaInfo};
+use anyhow::{anyhow, Result};
+use jellybase::common::stream::{SegmentNum, TrackNum};
+use std::sync::Arc;
+use tokio::io::{AsyncWriteExt, DuplexStream};
+
+pub async fn fragment_index_stream(
+ mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ _segment: SegmentNum,
+ track: TrackNum,
+) -> Result<()> {
+ let (iinfo, _info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+
+ let fragments = tokio::task::spawn_blocking(move || {
+ jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
+ })
+ .await??;
+
+ let out = serde_json::to_string(&fragments)?;
+ tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ Ok(())
+}
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index dca1036..3dfbf01 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -4,13 +4,10 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- stream::{StreamFormat, StreamSpec},
- LocalTrack, Node, SourceTrackKind,
- },
- CONF,
+use jellybase::common::stream::{
+ FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum,
};
use std::{fmt::Write, ops::Range, sync::Arc};
use tokio::{
@@ -18,32 +15,63 @@ use tokio::{
task::spawn_blocking,
};
-pub async fn hls_master_stream(
- node: Arc<Node>,
- _local_tracks: Vec<LocalTrack>,
- _spec: StreamSpec,
+pub async fn hls_supermultivariant_stream(
mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ container: StreamContainer,
) -> Result<()> {
- let media = node.media.as_ref().ok_or(anyhow!("no media"))?;
+ let (_iinfo, info) = stream_info(info).await?;
+ let mut out = String::new();
+ writeln!(out, "#EXTM3U")?;
+ writeln!(out, "#EXT-X-VERSION:4")?;
+ for (i, _seg) in info.segments.iter().enumerate() {
+ let uri = format!(
+ "stream{}",
+ StreamSpec::HlsMultiVariant {
+ segment: i,
+ container,
+ }
+ .to_query()
+ );
+ writeln!(out, "{uri}")?;
+ }
+ tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ Ok(())
+}
+
+pub async fn hls_multivariant_stream(
+ mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ segment: SegmentNum,
+ container: StreamContainer,
+) -> Result<()> {
+ let (_iinfo, info) = stream_info(info).await?;
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-VERSION:4")?;
// writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?;
- for (i, t) in media.tracks.iter().enumerate() {
+ for (i, t) in seg.tracks.iter().enumerate() {
let uri = format!(
- "stream?{}",
- StreamSpec {
- track: vec![i],
- format: StreamFormat::HlsVariant,
- ..Default::default()
+ "stream{}",
+ StreamSpec::HlsVariant {
+ segment,
+ track: i,
+ container,
+ format: 0
}
.to_query()
);
let r#type = match t.kind {
- SourceTrackKind::Video { .. } => "VIDEO",
- SourceTrackKind::Audio { .. } => "AUDIO",
- SourceTrackKind::Subtitles => "SUBTITLES",
+ TrackKind::Video => "VIDEO",
+ TrackKind::Audio => "AUDIO",
+ TrackKind::Subtitle => "SUBTITLES",
};
+ // TODO bw
writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?;
writeln!(out, "{uri}")?;
}
@@ -52,31 +80,49 @@ pub async fn hls_master_stream(
}
pub async fn hls_variant_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- mut spec: StreamSpec,
mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ segment: SegmentNum,
+ track: TrackNum,
+ format: FormatNum,
+ container: StreamContainer,
) -> Result<()> {
- let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned();
- let track_index = spec.track[0];
- let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?;
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+
let frags = spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, track_index)
+ jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
})
.await??;
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?;
- writeln!(out, "#EXT-X-TARGETDURATION:{}", media_info.duration)?;
+ writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?;
writeln!(out, "#EXT-X-VERSION:4")?;
writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?;
- spec.format = StreamFormat::Fragment;
- for (i, Range { start, end }) in frags.iter().enumerate() {
+ for (index, Range { start, end }) in frags.iter().enumerate() {
writeln!(out, "#EXTINF:{:},", end - start)?;
- spec.index = Some(i);
- writeln!(out, "stream?{}", spec.to_query())?;
+ writeln!(
+ out,
+ "stream{}",
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ }
+ .to_query()
+ )?;
}
writeln!(out, "#EXT-X-ENDLIST")?;
diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs
deleted file mode 100644
index b222e39..0000000
--- a/stream/src/jhls.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2025 metamuffin <metamuffin.org>
-*/
-use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- jhls::JhlsTrackIndex,
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
-use std::sync::Arc;
-use tokio::io::{AsyncWriteExt, DuplexStream};
-
-pub async fn jhls_index(
- node: Arc<Node>,
- local_tracks: &[LocalTrack],
- spec: StreamSpec,
- mut b: DuplexStream,
- perms: &PermissionSet,
-) -> Result<()> {
- let local_track = local_tracks
- .first()
- .ok_or(anyhow!("track missing"))?
- .to_owned();
-
- let fragments = tokio::task::spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0])
- })
- .await??;
-
- let out = serde_json::to_string(&JhlsTrackIndex {
- extra_profiles: if perms.check(&UserPermission::Transcode) {
- CONF.transcoding_profiles.clone()
- } else {
- vec![]
- },
- fragments,
- })?;
- tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
-}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 00338c1..4df87ae 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -5,136 +5,113 @@
*/
#![feature(iterator_try_collect)]
pub mod fragment;
+pub mod fragment_index;
pub mod hls;
-pub mod jhls;
+pub mod stream_info;
pub mod webvtt;
use anyhow::{anyhow, bail, Context, Result};
use fragment::fragment_stream;
-use hls::{hls_master_stream, hls_variant_stream};
-use jellybase::{
- assetfed::AssetInner,
- common::{
- stream::{StreamFormat, StreamSpec},
- user::{PermissionSet, UserPermission},
- LocalTrack, Node, TrackSource,
- },
- permission::PermissionSetExt,
- CONF,
+use fragment_index::fragment_index_stream;
+use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream};
+use jellybase::common::{
+ stream::{StreamContainer, StreamSpec},
+ Node,
};
-use jhls::jhls_index;
-use std::{io::SeekFrom, ops::Range, sync::Arc};
+use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc};
+use stream_info::{stream_info, write_stream_info};
use tokio::{
fs::File,
io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
};
-use tokio_util::io::SyncIoBridge;
-use webvtt::vtt_stream;
+
+#[derive(Debug)]
+pub struct SMediaInfo {
+ pub info: Arc<Node>,
+ pub files: BTreeSet<PathBuf>,
+}
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
}
-#[rustfmt::skip]
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" };
- match spec.format {
- StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true },
- StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
- StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
- StreamFormat::JhlsIndex => StreamHead { content_type: "application/jellything-seekindex+json", range_supported: false },
- StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false },
- StreamFormat::Fragment => StreamHead { content_type: webm_or_mkv, range_supported: false },
- StreamFormat::Jvtt => StreamHead { content_type: "application/jellything-vtt+json", range_supported: false },
+ let cons = |ct: &'static str, rs: bool| StreamHead {
+ content_type: ct,
+ range_supported: rs,
+ };
+ let container_ct = |x: StreamContainer| match x {
+ StreamContainer::WebM => "video/webm",
+ StreamContainer::Matroska => "video/x-matroska",
+ StreamContainer::WebVTT => "text/vtt",
+ StreamContainer::JVTT => "application/jellything-vtt+json",
+ StreamContainer::MPEG4 => "video/mp4",
+ };
+ match spec {
+ StreamSpec::Whep { .. } => cons("application/x-todo", false),
+ StreamSpec::WhepControl { .. } => cons("application/x-todo", false),
+ StreamSpec::Remux { container, .. } => cons(container_ct(*container), true),
+ StreamSpec::Original { .. } => cons("video/x-matroska", true),
+ StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false),
+ StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false),
+ StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false),
+ StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false),
}
}
pub async fn stream(
- node: Arc<Node>,
+ info: Arc<SMediaInfo>,
spec: StreamSpec,
range: Range<usize>,
- perms: &PermissionSet,
) -> Result<DuplexStream> {
- perms.assert(&UserPermission::StreamFormat(spec.format))?;
-
let (a, b) = duplex(4096);
- // TODO remux of mixed remote and local tracks?!
- let track_sources = node.media.to_owned().ok_or(anyhow!("node has no media"))?;
-
- let local_tracks = spec
- .track
- .iter()
- .map(|i| {
- anyhow::Ok(
- match &track_sources
- .tracks
- .get(*i)
- .ok_or(anyhow!("track does not exist"))?
- .source
- {
- TrackSource::Local(t) => AssetInner::deser(&t.0)?
- .as_local_track()
- .ok_or(anyhow!("asset not a track"))?,
- TrackSource::Remote(_) => bail!("track is not local"),
- },
- )
- })
- .collect::<anyhow::Result<Vec<_>>>()?
- .into_iter()
- .collect::<Vec<_>>();
-
- match spec.format {
- StreamFormat::Original => original_stream(local_tracks, spec, range, b).await?,
- StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?,
- StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?,
- StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?,
- StreamFormat::JhlsIndex => jhls_index(node, &local_tracks, spec, b, perms).await?,
- StreamFormat::Fragment => fragment_stream(node, local_tracks, spec, b, perms).await?,
- StreamFormat::Webvtt => vtt_stream(false, node, local_tracks, spec, b).await?,
- StreamFormat::Jvtt => vtt_stream(true, node, local_tracks, spec, b).await?,
+ match spec {
+ StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
+ StreamSpec::HlsSuperMultiVariant { container } => {
+ hls_supermultivariant_stream(b, info, container).await?;
+ }
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ hls_multivariant_stream(b, info, segment, container).await?
+ }
+ StreamSpec::HlsVariant {
+ segment,
+ track,
+ container,
+ format,
+ } => hls_variant_stream(b, info, segment, track, format, container).await?,
+ StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?,
+ StreamSpec::FragmentIndex { segment, track } => {
+ fragment_index_stream(b, info, segment, track).await?
+ }
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ } => fragment_stream(b, info, track, segment, index, format, container).await?,
+ _ => bail!("todo"),
}
Ok(a)
}
-async fn remux_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
- range: Range<usize>,
- b: DuplexStream,
-) -> Result<()> {
- let b = SyncIoBridge::new(b);
-
- tokio::task::spawn_blocking(move || {
- jellyremuxer::remux_stream_into(
- b,
- range,
- CONF.media_path.to_owned(),
- &node,
- local_tracks,
- spec.track,
- spec.webm.unwrap_or(false),
- )
- });
-
- Ok(())
-}
-
async fn original_stream(
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
+ info: Arc<SMediaInfo>,
+ track: usize,
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
- }
-
- let source = local_tracks[spec.track[0]].clone();
- let mut file = File::open(CONF.media_path.join(source.path))
+ let (iinfo, _info) = stream_info(info).await?;
+ let (file_index, _) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("unknown track"))?;
+ let mut file = File::open(&iinfo.paths[file_index])
.await
.context("opening source")?;
file.seek(SeekFrom::Start(range.start as u64))
diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs
new file mode 100644
index 0000000..c3746c6
--- /dev/null
+++ b/stream/src/stream_info.rs
@@ -0,0 +1,169 @@
+use anyhow::Result;
+use ebml_struct::matroska::TrackEntry;
+use jellybase::{
+ common::stream::{
+ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo,
+ TrackKind,
+ },
+ CONF,
+};
+use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata};
+use std::{path::PathBuf, sync::Arc};
+use tokio::{
+ io::{AsyncWriteExt, DuplexStream},
+ spawn,
+ task::spawn_blocking,
+};
+
+use crate::SMediaInfo;
+
+async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>> {
+ Ok(spawn_blocking(move || matroska_metadata(&path)).await??)
+}
+
+pub(crate) struct InternalStreamInfo {
+ pub paths: Vec<PathBuf>,
+ pub _metadata: Vec<Arc<MatroskaMetadata>>,
+ pub track_to_file: Vec<(usize, u64)>,
+}
+
+pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> {
+ let mut metadata = Vec::new();
+ let mut paths = Vec::new();
+ for path in &info.files {
+ metadata.push(async_matroska_metadata(path.clone()).await?);
+ paths.push(path.clone());
+ }
+ let mut tracks = Vec::new();
+ let mut track_to_file = Vec::new();
+
+ for (i, m) in metadata.iter().enumerate() {
+ if let Some(t) = &m.tracks {
+ for t in &t.entries {
+ tracks.push(StreamTrackInfo {
+ name: None,
+ kind: match t.track_type {
+ 1 => TrackKind::Video,
+ 2 => TrackKind::Audio,
+ 17 => TrackKind::Subtitle,
+ _ => todo!(),
+ },
+ formats: stream_formats(t),
+ });
+ track_to_file.push((i, t.track_number));
+ }
+ }
+ }
+
+ let segment = StreamSegmentInfo {
+ name: None,
+ duration: media_duration(&metadata[0]),
+ tracks,
+ };
+ Ok((
+ InternalStreamInfo {
+ _metadata: metadata,
+ paths,
+ track_to_file,
+ },
+ StreamInfo {
+ name: info.info.title.clone(),
+ segments: vec![segment],
+ },
+ ))
+}
+
+fn stream_formats(t: &TrackEntry) -> Vec<StreamFormatInfo> {
+ let mut formats = Vec::new();
+ formats.push(StreamFormatInfo {
+ codec: t.codec_id.to_string(),
+ remux: true,
+ bitrate: 10_000_000., // TODO
+ containers: {
+ let mut x = containers_by_codec(&t.codec_id);
+ // TODO remove this
+ x.retain_mut(|x| *x != StreamContainer::MPEG4);
+ x
+ },
+ bit_depth: t.audio.as_ref().and_then(|a| a.bit_depth.map(|e| e as u8)),
+ samplerate: t.audio.as_ref().map(|a| a.sampling_frequency),
+ channels: t.audio.as_ref().map(|a| a.channels as usize),
+ width: t.video.as_ref().map(|v| v.pixel_width),
+ height: t.video.as_ref().map(|v| v.pixel_height),
+ ..Default::default()
+ });
+
+ match t.track_type {
+ 1 => {
+ let sw = t.video.as_ref().unwrap().pixel_width;
+ let sh = t.video.as_ref().unwrap().pixel_height;
+ for (w, br) in [(3840, 8e6), (1920, 5e6), (1280, 3e6), (640, 1e6)] {
+ if w > sw {
+ continue;
+ }
+ let h = (w * sh) / sw;
+ for (cid, enable) in [
+ ("V_AV1", CONF.encoders.av1.is_some()),
+ ("V_VP8", CONF.encoders.vp8.is_some()),
+ ("V_VP9", CONF.encoders.vp9.is_some()),
+ ("V_MPEG4/ISO/AVC", CONF.encoders.avc.is_some()),
+ ("V_MPEGH/ISO/HEVC", CONF.encoders.hevc.is_some()),
+ ] {
+ if enable {
+ formats.push(StreamFormatInfo {
+ codec: cid.to_string(),
+ bitrate: br,
+ remux: false,
+ containers: containers_by_codec(cid),
+ width: Some(w),
+ height: Some(h),
+ samplerate: None,
+ channels: None,
+ bit_depth: None,
+ });
+ }
+ }
+ }
+ }
+ 2 => {
+ for br in [256e3, 128e3, 64e3] {
+ formats.push(StreamFormatInfo {
+ codec: "A_OPUS".to_string(),
+ bitrate: br,
+ remux: false,
+ containers: containers_by_codec("A_OPUS"),
+ width: None,
+ height: None,
+ samplerate: Some(48e3),
+ channels: Some(2),
+ bit_depth: Some(32),
+ });
+ }
+ }
+ 17 => {}
+ _ => {}
+ }
+
+ formats
+}
+
+fn containers_by_codec(codec: &str) -> Vec<StreamContainer> {
+ use StreamContainer::*;
+ match codec {
+ "V_VP8" | "V_VP9" | "V_AV1" | "A_OPUS" | "A_VORBIS" => vec![Matroska, WebM],
+ "V_MPEG4/ISO/AVC" | "A_AAC" => vec![Matroska, MPEG4],
+ "S_TEXT/UTF8" | "S_TEXT/WEBVTT" => vec![Matroska, WebVTT, WebM, JVTT],
+ _ => vec![Matroska],
+ }
+}
+
+pub(crate) async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> {
+ let (_, info) = stream_info(info).await?;
+ spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await });
+ Ok(())
+}
+
+fn media_duration(m: &MatroskaMetadata) -> f64 {
+ let info = m.info.as_ref().unwrap();
+ (info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000.
+}
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
index f78ac2f..e9f0181 100644
--- a/stream/src/webvtt.rs
+++ b/stream/src/webvtt.rs
@@ -3,61 +3,91 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use anyhow::{anyhow, Context, Result};
-use jellybase::{
- cache::async_cache_memory,
- common::{stream::StreamSpec, LocalTrack, Node},
- CONF,
-};
-use jellyremuxer::extract::extract_track;
-use jellytranscoder::subtitles::{parse_subtitles, write_webvtt};
+use anyhow::Result;
+use jellybase::common::{stream::StreamSpec, Node};
use std::sync::Arc;
-use tokio::io::{AsyncWriteExt, DuplexStream};
+use tokio::io::DuplexStream;
pub async fn vtt_stream(
json: bool,
node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
- mut b: DuplexStream,
+ b: DuplexStream,
) -> Result<()> {
+ let _ = b;
+ let _ = spec;
+ let _ = node;
+ let _ = json;
// TODO cache
// TODO should use fragments too? big films take too long...
- let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?;
- let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
- let track = &node.media.as_ref().unwrap().tracks[tracki];
- let cp = local_track.codec_private.clone();
+ // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?;
+ // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
+ // let track = &node.media.as_ref().unwrap().tracks[tracki];
+ // let cp = local_track.codec_private.clone();
- let subtitles = async_cache_memory(
- &[
- "vtt",
- &format!(
- "{} {}",
- local_track.path.to_str().unwrap(),
- local_track.track
- ),
- ],
- move || async move {
- let blocks = tokio::task::spawn_blocking(move || {
- extract_track(CONF.media_path.clone(), local_track)
- })
- .await??;
- let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
- Ok(subtitles)
- },
- )
- .await?;
+ // let subtitles = async_cache_memory(
+ // &[
+ // "vtt",
+ // &format!(
+ // "{} {}",
+ // local_track.path.to_str().unwrap(),
+ // local_track.track
+ // ),
+ // ],
+ // move || async move {
+ // let blocks = tokio::task::spawn_blocking(move || {
+ // extract_track(CONF.media_path.clone(), local_track)
+ // })
+ // .await??;
+ // let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
+ // Ok(subtitles)
+ // },
+ // )spec.track.first().ok_or(anyhow!("no track selected"))?;
+ // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
+ // let track = &node.media.as_ref().unwrap().tracks[tracki];
+ // let cp = local_track.codec_private.clone();
- let output = if json {
- serde_json::to_string(subtitles.as_ref())?
- } else {
- write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
- .context("writing webvtt")?
- };
- tokio::task::spawn(async move {
- let _ = b.write_all(output.as_bytes()).await;
- });
+ // let subtitles = async_cache_memory(
+ // &[
+ // "vtt",
+ // &format!(
+ // "{} {}",
+ // local_track.path.to_str().unwrap(),
+ // local_track.track
+ // ),
+ // ],
+ // move || async move {
+ // let blocks = tokio::task::spawn_blocking(move || {
+ // extract_track(CONF.media_path.clone(), local_track)
+ // })
+ // .await??;
+ // let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
+ // Ok(subtitles)
+ // },
+ // )
+ // .await?;
+
+ // let output = if json {
+ // serde_json::to_string(subtitles.as_ref())?
+ // } else {
+ // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
+ // .context("writing webvtt")?
+ // };
+ // tokio::task::spawn(async move {
+ // let _ = b.write_all(output.as_bytes()).await;
+ // });
+ // .await?;
+
+ // let output = if json {
+ // serde_json::to_string(subtitles.as_ref())?
+ // } else {
+ // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
+ // .context("writing webvtt")?
+ // };
+ // tokio::task::spawn(async move {
+ // let _ = b.write_all(output.as_bytes()).await;
+ // });
Ok(())
}