aboutsummaryrefslogtreecommitdiff
path: root/stream
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-14 13:41:42 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-14 13:41:42 +0200
commita3afc2756a52f7d6fedc928b97c8ff3eb1ade338 (patch)
tree9bb043975a6b92e45bbc2f09e19641f1109950b1 /stream
parent48a57a52d85d387efe122fb4d9fb113f577a0a98 (diff)
downloadjellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar
jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar.bz2
jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar.zst
lots of rewriting and removing dumb code
Diffstat (limited to 'stream')
-rw-r--r--stream/src/fragment.rs45
-rw-r--r--stream/src/fragment_index.rs32
-rw-r--r--stream/src/hls.rs72
-rw-r--r--stream/src/jhls.rs47
-rw-r--r--stream/src/lib.rs36
-rw-r--r--stream/src/webvtt.rs3
6 files changed, 112 insertions, 123 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index a34bb8d..52d32f4 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,37 +3,29 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
-use jellytranscoder::fragment::transcode;
+use jellybase::common::stream::StreamContainer;
use log::warn;
use std::sync::Arc;
-use tokio::{fs::File, io::DuplexStream};
+use tokio::io::DuplexStream;
use tokio_util::io::SyncIoBridge;
pub async fn fragment_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
mut b: DuplexStream,
- perms: &PermissionSet,
- webm: bool,
- track: u64,
- segment: u64,
+ info: Arc<SMediaInfo>,
+ track: usize,
+ segment: usize,
index: usize,
+ format: usize,
+ container: StreamContainer,
) -> Result<()> {
- let local_track = local_tracks
- .first()
- .ok_or(anyhow!("track missing"))?
- .to_owned();
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let path = iinfo.paths[file_index].clone();
// if let Some(profile) = None {
// perms.assert(&UserPermission::Transcode)?;
@@ -70,11 +62,10 @@ pub async fn fragment_stream(
tokio::task::spawn_blocking(move || {
if let Err(err) = jellyremuxer::write_fragment_into(
b,
- &CONF.media_path,
- &node,
- &local_track,
- track as usize,
- webm,
+ &path,
+ track_num,
+ container == StreamContainer::WebM,
+ &info.name.unwrap_or_default(),
index,
) {
warn!("segment stream error: {err}");
diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs
new file mode 100644
index 0000000..6fbddc6
--- /dev/null
+++ b/stream/src/fragment_index.rs
@@ -0,0 +1,32 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use crate::{stream_info, SMediaInfo};
+use anyhow::{anyhow, Result};
+use jellybase::common::stream::{SegmentNum, TrackNum};
+use std::sync::Arc;
+use tokio::io::{AsyncWriteExt, DuplexStream};
+
+pub async fn fragment_index_stream(
+ mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ _segment: SegmentNum,
+ track: TrackNum,
+) -> Result<()> {
+ let (iinfo, _info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+
+ let fragments = tokio::task::spawn_blocking(move || {
+ jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
+ })
+ .await??;
+
+ let out = serde_json::to_string(&fragments)?;
+ tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ Ok(())
+}
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 27630b2..f06ac72 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -4,13 +4,10 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- stream::{StreamContainer, StreamSpec},
- LocalTrack, Node, SourceTrackKind,
- },
- CONF,
+use jellybase::common::stream::{
+ FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum,
};
use std::{fmt::Write, ops::Range, sync::Arc};
use tokio::{
@@ -19,20 +16,24 @@ use tokio::{
};
pub async fn hls_master_stream(
- node: Arc<Node>,
- _local_tracks: Vec<LocalTrack>,
- segment: u64,
- container: StreamContainer,
mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ segment: SegmentNum,
+ container: StreamContainer,
) -> Result<()> {
- let media = node.media.as_ref().ok_or(anyhow!("no media"))?;
+ let (_iinfo, info) = stream_info(info).await?;
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-VERSION:4")?;
// writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?;
- for (i, t) in media.tracks.iter().enumerate() {
+ for (i, t) in seg.tracks.iter().enumerate() {
let uri = format!(
- "stream?{}",
+ "stream{}",
StreamSpec::HlsVariant {
segment,
track: i,
@@ -42,10 +43,11 @@ pub async fn hls_master_stream(
.to_query()
);
let r#type = match t.kind {
- SourceTrackKind::Video { .. } => "VIDEO",
- SourceTrackKind::Audio { .. } => "AUDIO",
- SourceTrackKind::Subtitles => "SUBTITLES",
+ TrackKind::Video => "VIDEO",
+ TrackKind::Audio => "AUDIO",
+ TrackKind::Subtitle => "SUBTITLES",
};
+ // TODO bw
writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?;
writeln!(out, "{uri}")?;
}
@@ -54,42 +56,44 @@ pub async fn hls_master_stream(
}
pub async fn hls_variant_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- segment: u64,
- track: usize,
- format: usize,
- container: StreamContainer,
mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ segment: SegmentNum,
+ track: TrackNum,
+ format: FormatNum,
+ container: StreamContainer,
) -> Result<()> {
- let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned();
- let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?;
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+
let frags = spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(
- &CONF.media_path,
- &node,
- &local_track,
- track as usize,
- )
+ jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
})
.await??;
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?;
- writeln!(out, "#EXT-X-TARGETDURATION:{}", media_info.duration)?;
+ writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?;
writeln!(out, "#EXT-X-VERSION:4")?;
writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?;
- for (i, Range { start, end }) in frags.iter().enumerate() {
+ for (index, Range { start, end }) in frags.iter().enumerate() {
writeln!(out, "#EXTINF:{:},", end - start)?;
writeln!(
out,
- "stream?{}",
+ "stream{}",
StreamSpec::Fragment {
segment,
track,
- index: i as u64,
+ index,
container,
format,
}
diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs
deleted file mode 100644
index 2a2faec..0000000
--- a/stream/src/jhls.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2025 metamuffin <metamuffin.org>
-*/
-use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- jhls::JhlsTrackIndex,
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
-use std::sync::Arc;
-use tokio::io::{AsyncWriteExt, DuplexStream};
-
-pub async fn jhls_index(
- node: Arc<Node>,
- local_tracks: &[LocalTrack],
- spec: StreamSpec,
- mut b: DuplexStream,
- perms: &PermissionSet,
-) -> Result<()> {
- // let local_track = local_tracks
- // .first()
- // .ok_or(anyhow!("track missing"))?
- // .to_owned();
-
- // let fragments = tokio::task::spawn_blocking(move || {
- // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0])
- // })
- // .await??;
-
- // let out = serde_json::to_string(&JhlsTrackIndex {
- // extra_profiles: if perms.check(&UserPermission::Transcode) {
- // CONF.transcoding_profiles.clone()
- // } else {
- // vec![]
- // },
- // fragments,
- // })?;
- // tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
-}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index d09759f..a6faf54 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -5,17 +5,20 @@
*/
#![feature(iterator_try_collect)]
pub mod fragment;
+pub mod fragment_index;
pub mod hls;
-pub mod jhls;
pub mod webvtt;
use anyhow::{anyhow, Context, Result};
+use fragment::fragment_stream;
+use fragment_index::fragment_index_stream;
+use hls::{hls_master_stream, hls_variant_stream};
use jellybase::common::{
stream::{
StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec,
StreamTrackInfo, TrackKind,
},
- LocalTrack, Node,
+ Node,
};
use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata};
use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc};
@@ -75,22 +78,26 @@ pub async fn stream(
StreamSpec::Remux { tracks, container } => todo!(),
StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
StreamSpec::HlsSuperMultiVariant { container } => todo!(),
- StreamSpec::HlsMultiVariant { segment, container } => todo!(),
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ hls_master_stream(b, info, segment, container).await?
+ }
StreamSpec::HlsVariant {
segment,
track,
container,
format,
- } => todo!(),
- StreamSpec::Info { segment } => write_stream_info(info, b).await?,
- StreamSpec::FragmentIndex { segment, track } => todo!(),
+ } => hls_variant_stream(b, info, segment, track, format, container).await?,
+ StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?,
+ StreamSpec::FragmentIndex { segment, track } => {
+ fragment_index_stream(b, info, segment, track).await?
+ }
StreamSpec::Fragment {
segment,
track,
index,
container,
format,
- } => todo!(),
+ } => fragment_stream(b, info, track, segment, index, format, container).await?,
}
Ok(a)
@@ -103,7 +110,7 @@ async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>>
pub(crate) struct InternalStreamInfo {
pub paths: Vec<PathBuf>,
pub metadata: Vec<Arc<MatroskaMetadata>>,
- pub track_to_file: Vec<usize>,
+ pub track_to_file: Vec<(usize, u64)>,
}
async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> {
@@ -142,14 +149,19 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, Strea
},
formats,
});
- track_to_file.push(i);
+ track_to_file.push((i, t.track_number));
}
}
}
let segment = StreamSegmentInfo {
name: None,
- duration: 0,
+ duration: metadata[0]
+ .info
+ .as_ref()
+ .unwrap()
+ .duration
+ .unwrap_or_default(),
tracks,
};
Ok((
@@ -173,7 +185,6 @@ async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result
async fn remux_stream(
node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
b: DuplexStream,
@@ -202,8 +213,7 @@ async fn original_stream(
b: DuplexStream,
) -> Result<()> {
let (iinfo, _info) = stream_info(info).await?;
-
- let file_index = *iinfo
+ let (file_index, _) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("unknown track"))?;
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
index fbd6382..960849c 100644
--- a/stream/src/webvtt.rs
+++ b/stream/src/webvtt.rs
@@ -6,7 +6,7 @@
use anyhow::{anyhow, Context, Result};
use jellybase::{
cache::async_cache_memory,
- common::{stream::StreamSpec, LocalTrack, Node},
+ common::{stream::StreamSpec, Node},
CONF,
};
use jellyremuxer::extract::extract_track;
@@ -17,7 +17,6 @@ use tokio::io::{AsyncWriteExt, DuplexStream};
pub async fn vtt_stream(
json: bool,
node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
mut b: DuplexStream,
) -> Result<()> {