aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-03-02 13:43:59 +0100
committermetamuffin <metamuffin@disroot.org>2025-03-02 13:43:59 +0100
commitb9539ee3afbf1440b8628bf0609dc0e24aed116c (patch)
tree5fa67c5db5e3c266385da36116f7723bf1a72c96
parentf0dbf139d8708194d1ff7e887b1dff48ccc166fa (diff)
downloadjellything-b9539ee3afbf1440b8628bf0609dc0e24aed116c.tar
jellything-b9539ee3afbf1440b8628bf0609dc0e24aed116c.tar.bz2
jellything-b9539ee3afbf1440b8628bf0609dc0e24aed116c.tar.zst
change things
-rw-r--r--common/src/stream.rs72
-rw-r--r--server/src/routes/compat/jellyfin/mod.rs27
-rw-r--r--stream/src/fragment.rs104
-rw-r--r--stream/src/hls.rs42
-rw-r--r--stream/src/jhls.rs34
-rw-r--r--stream/src/lib.rs24
-rw-r--r--stream/src/webvtt.rs97
7 files changed, 265 insertions, 135 deletions
diff --git a/common/src/stream.rs b/common/src/stream.rs
index 46f6abc..0e8f810 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -4,22 +4,23 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
use serde::{Deserialize, Serialize};
+use std::fmt::Display;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum StreamSpec {
Whep {
- track: u64,
+ track: usize,
seek: u64,
},
WhepControl {
token: String,
},
Remux {
- track: Vec<u64>,
+ tracks: Vec<usize>,
container: StreamContainer,
},
Original {
- track: u64,
+ track: usize,
},
HlsSuperMultiVariant {
container: StreamContainer,
@@ -30,7 +31,7 @@ pub enum StreamSpec {
},
HlsVariant {
segment: u64,
- track: u64,
+ track: usize,
container: StreamContainer,
format: usize,
},
@@ -39,11 +40,11 @@ pub enum StreamSpec {
},
FragmentIndex {
segment: u64,
- track: u64,
+ track: usize,
},
Fragment {
segment: u64,
- track: u64,
+ track: usize,
index: u64,
container: StreamContainer,
format: usize,
@@ -58,3 +59,62 @@ pub enum StreamContainer {
WebVTT,
JVTT,
}
+
+impl StreamSpec {
+ pub fn to_query(&self) -> String {
+ match self {
+ StreamSpec::Whep { track, seek } => format!("?whep&track={track}&seek={seek}"),
+ StreamSpec::WhepControl { token } => format!("?whepcontrol&token={token}"),
+ StreamSpec::Remux { tracks, container } => {
+ format!(
+ "?remux&tracks={}&container={container}",
+ tracks
+ .iter()
+ .map(|t| t.to_string())
+ .collect::<Vec<String>>()
+ .join(",")
+ )
+ }
+ StreamSpec::Original { track } => format!("?original&track={track}"),
+ StreamSpec::HlsSuperMultiVariant { container } => {
+ format!("?hlssupermultivariant&container={container}")
+ }
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ format!("?hlsmultivariant&segment={segment}&container={container}")
+ }
+ StreamSpec::HlsVariant {
+ segment,
+ track,
+ container,
+ format,
+ } => format!(
+ "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}"
+ ),
+ StreamSpec::Info {
+ segment: Some(segment),
+ } => format!("?info&segment={segment}"),
+ StreamSpec::Info { segment: None } => format!("?info"),
+ StreamSpec::FragmentIndex { segment, track } => {
+ format!("?fragmentindex&segment={segment}&track={track}")
+ }
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ } => format!("?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}"),
+ }
+ }
+}
+
+impl Display for StreamContainer {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(match self {
+ StreamContainer::WebM => "webm",
+ StreamContainer::Matroska => "matroska",
+ StreamContainer::WebVTT => "webvtt",
+ StreamContainer::JVTT => "jvtt",
+ })
+ }
+}
diff --git a/server/src/routes/compat/jellyfin/mod.rs b/server/src/routes/compat/jellyfin/mod.rs
index ab36a8c..6066760 100644
--- a/server/src/routes/compat/jellyfin/mod.rs
+++ b/server/src/routes/compat/jellyfin/mod.rs
@@ -21,7 +21,7 @@ use crate::routes::{
use anyhow::{anyhow, Context};
use jellybase::{database::Database, CONF};
use jellycommon::{
- stream::{StreamFormat, StreamSpec},
+ stream::{StreamContainer, StreamSpec},
user::{NodeUserData, WatchedState},
MediaInfo, Node, NodeID, NodeKind, SourceTrack, SourceTrackKind, Visibility,
};
@@ -446,16 +446,12 @@ pub fn r_jellyfin_video_stream(
.get_node_slug(id)?
.ok_or(anyhow!("node does not exist"))?;
let media = node.media.as_ref().ok_or(anyhow!("node has no media"))?;
- Ok(Redirect::temporary(rocket::uri!(r_stream(
- id,
- StreamSpec {
- format: StreamFormat::Matroska,
- webm: Some(true),
- track: (0..media.tracks.len()).collect(),
- index: None,
- profile: None,
- }
- ))))
+ let params = StreamSpec::Remux {
+ tracks: (0..media.tracks.len()).collect(),
+ container: StreamContainer::WebM,
+ }
+ .to_query();
+ Ok(Redirect::temporary(format!("/n/{id}/stream{params}")))
}
#[derive(Deserialize)]
@@ -498,9 +494,7 @@ pub fn r_jellyfin_playback_bitratetest(_session: Session, Size: usize) -> Vec<u8
}
#[post("/Sessions/Capabilities/Full")]
-pub fn r_jellyfin_sessions_capabilities_full(_session: Session) {
-
-}
+pub fn r_jellyfin_sessions_capabilities_full(_session: Session) {}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
@@ -796,7 +790,10 @@ fn item_object(node: &Node, userdata: &NodeUserData) -> JellyfinItem {
location_type: node.media.as_ref().map(|_| "FileSystem".to_owned()),
play_access: node.media.as_ref().map(|_| "Full".to_owned()),
container: node.media.as_ref().map(|_| "webm".to_owned()),
- run_time_ticks: node.media.as_ref().map(|m| (m.duration * 10_000_000.) as i64),
+ run_time_ticks: node
+ .media
+ .as_ref()
+ .map(|m| (m.duration * 10_000_000.) as i64),
media_sources: media_source.as_ref().map(|s| vec![s.clone()]),
media_streams: media_source.as_ref().map(|s| s.media_streams.clone()),
path: node
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index e276d29..a34bb8d 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,7 +3,7 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use anyhow::{anyhow, bail, Result};
+use anyhow::{anyhow, Result};
use jellybase::{
common::{
stream::StreamSpec,
@@ -25,64 +25,62 @@ pub async fn fragment_stream(
spec: StreamSpec,
mut b: DuplexStream,
perms: &PermissionSet,
+ webm: bool,
+ track: u64,
+ segment: u64,
+ index: usize,
) -> Result<()> {
- if spec.track.len() != 1 {
- bail!("unsupported number of tracks for segment, must be exactly one");
- }
- let track = spec.track[0];
- let n = spec.index.ok_or(anyhow!("segment index missing"))?;
-
let local_track = local_tracks
.first()
.ok_or(anyhow!("track missing"))?
.to_owned();
- if let Some(profile) = spec.profile {
- perms.assert(&UserPermission::Transcode)?;
- let location = transcode(
- &format!("{track} {n} {:?}", node), // TODO maybe not use the entire source
- CONF.transcoding_profiles
- .get(profile)
- .ok_or(anyhow!("profile out of range"))?,
- move |b| {
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- SyncIoBridge::new(b),
- &CONF.media_path,
- &node,
- &local_track,
- track,
- false,
- n,
- ) {
- warn!("segment stream error: {err}");
- }
- });
- },
- )
- .await?;
- let mut output = File::open(location.abs()).await?;
- tokio::task::spawn(async move {
- if let Err(err) = tokio::io::copy(&mut output, &mut b).await {
- warn!("cannot write stream: {err}")
- }
- });
- } else {
- let b = SyncIoBridge::new(b);
- tokio::task::spawn_blocking(move || {
- if let Err(err) = jellyremuxer::write_fragment_into(
- b,
- &CONF.media_path,
- &node,
- &local_track,
- track,
- spec.webm.unwrap_or(false),
- n,
- ) {
- warn!("segment stream error: {err}");
- }
- });
- }
+ // if let Some(profile) = None {
+ // perms.assert(&UserPermission::Transcode)?;
+ // let location = transcode(
+ // &format!("{track} {index} {:?}", node), // TODO maybe not use the entire source
+ // CONF.transcoding_profiles
+ // .get(profile)
+ // .ok_or(anyhow!("profile out of range"))?,
+ // move |b| {
+ // tokio::task::spawn_blocking(move || {
+ // if let Err(err) = jellyremuxer::write_fragment_into(
+ // SyncIoBridge::new(b),
+ // &CONF.media_path,
+ // &node,
+ // &local_track,
+ // track as usize,
+ // false,
+ // index,
+ // ) {
+ // warn!("segment stream error: {err}");
+ // }
+ // });
+ // },
+ // )
+ // .await?;
+ // let mut output = File::open(location.abs()).await?;
+ // tokio::task::spawn(async move {
+ // if let Err(err) = tokio::io::copy(&mut output, &mut b).await {
+ // warn!("cannot write stream: {err}")
+ // }
+ // });
+ // } else {
+ let b = SyncIoBridge::new(b);
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = jellyremuxer::write_fragment_into(
+ b,
+ &CONF.media_path,
+ &node,
+ &local_track,
+ track as usize,
+ webm,
+ index,
+ ) {
+ warn!("segment stream error: {err}");
+ }
+ });
+ // }
Ok(())
}
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index dca1036..56edd2d 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, Result};
use jellybase::{
common::{
- stream::{StreamFormat, StreamSpec},
+ stream::{StreamContainer, StreamSpec},
LocalTrack, Node, SourceTrackKind,
},
CONF,
@@ -21,7 +21,8 @@ use tokio::{
pub async fn hls_master_stream(
node: Arc<Node>,
_local_tracks: Vec<LocalTrack>,
- _spec: StreamSpec,
+ segment: u64,
+ container: StreamContainer,
mut b: DuplexStream,
) -> Result<()> {
let media = node.media.as_ref().ok_or(anyhow!("no media"))?;
@@ -32,10 +33,11 @@ pub async fn hls_master_stream(
for (i, t) in media.tracks.iter().enumerate() {
let uri = format!(
"stream?{}",
- StreamSpec {
- track: vec![i],
- format: StreamFormat::HlsVariant,
- ..Default::default()
+ StreamSpec::HlsVariant {
+ track: i,
+ segment,
+ container,
+ format: 0
}
.to_query()
);
@@ -54,14 +56,21 @@ pub async fn hls_master_stream(
pub async fn hls_variant_stream(
node: Arc<Node>,
local_tracks: Vec<LocalTrack>,
- mut spec: StreamSpec,
+ segment: u64,
+ track: usize,
+ format: usize,
+ container: StreamContainer,
mut b: DuplexStream,
) -> Result<()> {
let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned();
- let track_index = spec.track[0];
let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?;
let frags = spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, track_index)
+ jellyremuxer::fragment::fragment_index(
+ &CONF.media_path,
+ &node,
+ &local_track,
+ track as usize,
+ )
})
.await??;
@@ -72,11 +81,20 @@ pub async fn hls_variant_stream(
writeln!(out, "#EXT-X-VERSION:4")?;
writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?;
- spec.format = StreamFormat::Fragment;
for (i, Range { start, end }) in frags.iter().enumerate() {
writeln!(out, "#EXTINF:{:},", end - start)?;
- spec.index = Some(i);
- writeln!(out, "stream?{}", spec.to_query())?;
+ writeln!(
+ out,
+ "stream?{}",
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index: i as u64,
+ container,
+ format,
+ }
+ .to_query()
+ )?;
}
writeln!(out, "#EXT-X-ENDLIST")?;
diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs
index b222e39..2a2faec 100644
--- a/stream/src/jhls.rs
+++ b/stream/src/jhls.rs
@@ -24,24 +24,24 @@ pub async fn jhls_index(
mut b: DuplexStream,
perms: &PermissionSet,
) -> Result<()> {
- let local_track = local_tracks
- .first()
- .ok_or(anyhow!("track missing"))?
- .to_owned();
+ // let local_track = local_tracks
+ // .first()
+ // .ok_or(anyhow!("track missing"))?
+ // .to_owned();
- let fragments = tokio::task::spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0])
- })
- .await??;
+ // let fragments = tokio::task::spawn_blocking(move || {
+ // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0])
+ // })
+ // .await??;
- let out = serde_json::to_string(&JhlsTrackIndex {
- extra_profiles: if perms.check(&UserPermission::Transcode) {
- CONF.transcoding_profiles.clone()
- } else {
- vec![]
- },
- fragments,
- })?;
- tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ // let out = serde_json::to_string(&JhlsTrackIndex {
+ // extra_profiles: if perms.check(&UserPermission::Transcode) {
+ // CONF.transcoding_profiles.clone()
+ // } else {
+ // vec![]
+ // },
+ // fragments,
+ // })?;
+ // tokio::spawn(async move { b.write_all(out.as_bytes()).await });
Ok(())
}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 6f31e6b..68b7e44 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -60,6 +60,30 @@ pub async fn stream(
) -> Result<DuplexStream> {
let (a, b) = duplex(4096);
+ match spec {
+ StreamSpec::Whep { track, seek } => todo!(),
+ StreamSpec::WhepControl { token } => todo!(),
+ StreamSpec::Remux { tracks, container } => todo!(),
+ StreamSpec::Original { track } => todo!(),
+ StreamSpec::HlsSuperMultiVariant { container } => todo!(),
+ StreamSpec::HlsMultiVariant { segment, container } => todo!(),
+ StreamSpec::HlsVariant {
+ segment,
+ track,
+ container,
+ format,
+ } => todo!(),
+ StreamSpec::Info { segment } => todo!(),
+ StreamSpec::FragmentIndex { segment, track } => todo!(),
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ } => todo!(),
+ }
+
Ok(a)
}
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
index f78ac2f..fbd6382 100644
--- a/stream/src/webvtt.rs
+++ b/stream/src/webvtt.rs
@@ -25,39 +25,72 @@ pub async fn vtt_stream(
// TODO should use fragments too? big films take too long...
- let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?;
- let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
- let track = &node.media.as_ref().unwrap().tracks[tracki];
- let cp = local_track.codec_private.clone();
+ // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?;
+ // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
+ // let track = &node.media.as_ref().unwrap().tracks[tracki];
+ // let cp = local_track.codec_private.clone();
- let subtitles = async_cache_memory(
- &[
- "vtt",
- &format!(
- "{} {}",
- local_track.path.to_str().unwrap(),
- local_track.track
- ),
- ],
- move || async move {
- let blocks = tokio::task::spawn_blocking(move || {
- extract_track(CONF.media_path.clone(), local_track)
- })
- .await??;
- let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
- Ok(subtitles)
- },
- )
- .await?;
+ // let subtitles = async_cache_memory(
+ // &[
+ // "vtt",
+ // &format!(
+ // "{} {}",
+ // local_track.path.to_str().unwrap(),
+ // local_track.track
+ // ),
+ // ],
+ // move || async move {
+ // let blocks = tokio::task::spawn_blocking(move || {
+ // extract_track(CONF.media_path.clone(), local_track)
+ // })
+ // .await??;
+ // let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
+ // Ok(subtitles)
+ // },
+ // )spec.track.first().ok_or(anyhow!("no track selected"))?;
+ // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
+ // let track = &node.media.as_ref().unwrap().tracks[tracki];
+ // let cp = local_track.codec_private.clone();
- let output = if json {
- serde_json::to_string(subtitles.as_ref())?
- } else {
- write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
- .context("writing webvtt")?
- };
- tokio::task::spawn(async move {
- let _ = b.write_all(output.as_bytes()).await;
- });
+ // let subtitles = async_cache_memory(
+ // &[
+ // "vtt",
+ // &format!(
+ // "{} {}",
+ // local_track.path.to_str().unwrap(),
+ // local_track.track
+ // ),
+ // ],
+ // move || async move {
+ // let blocks = tokio::task::spawn_blocking(move || {
+ // extract_track(CONF.media_path.clone(), local_track)
+ // })
+ // .await??;
+ // let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
+ // Ok(subtitles)
+ // },
+ // )
+ // .await?;
+
+ // let output = if json {
+ // serde_json::to_string(subtitles.as_ref())?
+ // } else {
+ // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
+ // .context("writing webvtt")?
+ // };
+ // tokio::task::spawn(async move {
+ // let _ = b.write_all(output.as_bytes()).await;
+ // });
+ // .await?;
+
+ // let output = if json {
+ // serde_json::to_string(subtitles.as_ref())?
+ // } else {
+ // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
+ // .context("writing webvtt")?
+ // };
+ // tokio::task::spawn(async move {
+ // let _ = b.write_all(output.as_bytes()).await;
+ // });
Ok(())
}