aboutsummaryrefslogtreecommitdiff
path: root/stream
diff options
context:
space:
mode:
Diffstat (limited to 'stream')
-rw-r--r--stream/Cargo.toml1
-rw-r--r--stream/src/fragment.rs6
-rw-r--r--stream/src/fragment_index.rs2
-rw-r--r--stream/src/hls.rs4
-rw-r--r--stream/src/lib.rs17
-rw-r--r--stream/src/stream_info.rs8
-rw-r--r--stream/src/webvtt.rs150
-rw-r--r--stream/types/Cargo.toml7
-rw-r--r--stream/types/src/lib.rs281
9 files changed, 372 insertions, 104 deletions
diff --git a/stream/Cargo.toml b/stream/Cargo.toml
index 33741d7..fb8cfe2 100644
--- a/stream/Cargo.toml
+++ b/stream/Cargo.toml
@@ -8,6 +8,7 @@ jellycommon = { path = "../common" }
jellycache = { path = "../cache" }
jellytranscoder = { path = "../transcoder" }
jellyremuxer = { path = "../remuxer" }
+jellystream-types = { path = "types" }
log = { workspace = true }
anyhow = { workspace = true }
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index 49817ca..89ce94f 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -5,8 +5,8 @@
*/
use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, bail, Result};
-use jellycommon::stream::StreamContainer;
use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm};
+use jellystream_types::StreamContainer;
use jellytranscoder::fragment::transcode;
use log::warn;
use std::{
@@ -84,9 +84,7 @@ pub async fn fragment_stream(
}
});
}
- StreamContainer::JVTT => {
-
- }
+ StreamContainer::JVTT => {}
_ => bail!("not yet supported"),
}
} else {
diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs
index 0632f0a..cb54948 100644
--- a/stream/src/fragment_index.rs
+++ b/stream/src/fragment_index.rs
@@ -5,7 +5,7 @@
*/
use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellycommon::stream::{SegmentNum, TrackNum};
+use jellystream_types::{SegmentNum, TrackNum};
use std::sync::Arc;
use tokio::io::{AsyncWriteExt, DuplexStream};
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 41b896b..949ddb4 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -6,9 +6,7 @@
use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellycommon::stream::{
- FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum,
-};
+use jellystream_types::{FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum};
use std::{fmt::Write, ops::Range, sync::Arc};
use tokio::{
io::{AsyncWriteExt, DuplexStream},
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 9c62ac8..5b4e8ed 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -4,20 +4,17 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
#![feature(iterator_try_collect)]
-pub mod fragment;
-pub mod fragment_index;
-pub mod hls;
-pub mod stream_info;
-pub mod webvtt;
+mod fragment;
+mod fragment_index;
+mod hls;
+mod stream_info;
+mod webvtt;
use anyhow::{anyhow, bail, Context, Result};
use fragment::fragment_stream;
use fragment_index::fragment_index_stream;
use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream};
-use jellycommon::{
- stream::{StreamContainer, StreamSpec},
- Node,
-};
+use jellystream_types::{StreamContainer, StreamSpec};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeSet,
@@ -53,7 +50,7 @@ static CONF: LazyLock<Config> = LazyLock::new(|| {
#[derive(Debug)]
pub struct SMediaInfo {
- pub info: Arc<Node>,
+ pub title: Option<String>,
pub files: BTreeSet<PathBuf>,
}
diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs
index 920ce69..560ec9b 100644
--- a/stream/src/stream_info.rs
+++ b/stream/src/stream_info.rs
@@ -5,13 +5,13 @@
*/
use crate::{SMediaInfo, CONF};
use anyhow::Result;
-use jellycommon::stream::{
- StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, TrackKind,
-};
use jellyremuxer::{
metadata::{matroska_metadata, MatroskaMetadata, MatroskaTrackEntry},
seek_index::get_track_sizes,
};
+use jellystream_types::{
+ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, TrackKind,
+};
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
use tokio::{
io::{AsyncWriteExt, DuplexStream},
@@ -76,7 +76,7 @@ pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStream
track_to_file,
},
StreamInfo {
- name: info.info.title.clone(),
+ name: info.title.clone(),
segments: vec![segment],
},
))
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
index c0bc466..d99111e 100644
--- a/stream/src/webvtt.rs
+++ b/stream/src/webvtt.rs
@@ -3,91 +3,77 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use anyhow::Result;
-use jellycommon::{stream::StreamSpec, Node};
-use std::sync::Arc;
-use tokio::io::DuplexStream;
-pub async fn vtt_stream(
- json: bool,
- node: Arc<Node>,
- spec: StreamSpec,
- b: DuplexStream,
-) -> Result<()> {
- let _ = b;
- let _ = spec;
- let _ = node;
- let _ = json;
- // TODO cache
+// async fn vtt_stream(json: bool, node: Arc<Node>, spec: StreamSpec, b: DuplexStream) -> Result<()> {
+// TODO cache
+// TODO should use fragments too? big films take too long...
- // TODO should use fragments too? big films take too long...
+// let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?;
+// let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
+// let track = &node.media.as_ref().unwrap().tracks[tracki];
+// let cp = local_track.codec_private.clone();
- // let tracki = *spec.track.first().ok_or(anyhow!("no track selected"))?;
- // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
- // let track = &node.media.as_ref().unwrap().tracks[tracki];
- // let cp = local_track.codec_private.clone();
+// let subtitles = async_cache_memory(
+// &[
+// "vtt",
+// &format!(
+// "{} {}",
+// local_track.path.to_str().unwrap(),
+// local_track.track
+// ),
+// ],
+// move || async move {
+// let blocks = tokio::task::spawn_blocking(move || {
+// extract_track(CONF.media_path.clone(), local_track)
+// })
+// .await??;
+// let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
+// Ok(subtitles)
+// },
+// )spec.track.first().ok_or(anyhow!("no track selected"))?;
+// let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
+// let track = &node.media.as_ref().unwrap().tracks[tracki];
+// let cp = local_track.codec_private.clone();
- // let subtitles = async_cache_memory(
- // &[
- // "vtt",
- // &format!(
- // "{} {}",
- // local_track.path.to_str().unwrap(),
- // local_track.track
- // ),
- // ],
- // move || async move {
- // let blocks = tokio::task::spawn_blocking(move || {
- // extract_track(CONF.media_path.clone(), local_track)
- // })
- // .await??;
- // let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
- // Ok(subtitles)
- // },
- // )spec.track.first().ok_or(anyhow!("no track selected"))?;
- // let local_track = local_tracks.first().ok_or(anyhow!("no tracks"))?.clone();
- // let track = &node.media.as_ref().unwrap().tracks[tracki];
- // let cp = local_track.codec_private.clone();
+// let subtitles = async_cache_memory(
+// &[
+// "vtt",
+// &format!(
+// "{} {}",
+// local_track.path.to_str().unwrap(),
+// local_track.track
+// ),
+// ],
+// move || async move {
+// let blocks = tokio::task::spawn_blocking(move || {
+// extract_track(CONF.media_path.clone(), local_track)
+// })
+// .await??;
+// let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
+// Ok(subtitles)
+// },
+// )
+// .await?;
- // let subtitles = async_cache_memory(
- // &[
- // "vtt",
- // &format!(
- // "{} {}",
- // local_track.path.to_str().unwrap(),
- // local_track.track
- // ),
- // ],
- // move || async move {
- // let blocks = tokio::task::spawn_blocking(move || {
- // extract_track(CONF.media_path.clone(), local_track)
- // })
- // .await??;
- // let subtitles = parse_subtitles(&track.codec, cp, blocks)?;
- // Ok(subtitles)
- // },
- // )
- // .await?;
+// let output = if json {
+// serde_json::to_string(subtitles.as_ref())?
+// } else {
+// write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
+// .context("writing webvtt")?
+// };
+// tokio::task::spawn(async move {
+// let _ = b.write_all(output.as_bytes()).await;
+// });
+// .await?;
- // let output = if json {
- // serde_json::to_string(subtitles.as_ref())?
- // } else {
- // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
- // .context("writing webvtt")?
- // };
- // tokio::task::spawn(async move {
- // let _ = b.write_all(output.as_bytes()).await;
- // });
- // .await?;
-
- // let output = if json {
- // serde_json::to_string(subtitles.as_ref())?
- // } else {
- // write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
- // .context("writing webvtt")?
- // };
- // tokio::task::spawn(async move {
- // let _ = b.write_all(output.as_bytes()).await;
- // });
- Ok(())
-}
+// let output = if json {
+// serde_json::to_string(subtitles.as_ref())?
+// } else {
+// write_webvtt(node.title.clone().unwrap_or_default(), subtitles.as_ref())
+// .context("writing webvtt")?
+// };
+// tokio::task::spawn(async move {
+// let _ = b.write_all(output.as_bytes()).await;
+// });
+// Ok(())
+// }
diff --git a/stream/types/Cargo.toml b/stream/types/Cargo.toml
new file mode 100644
index 0000000..f441730
--- /dev/null
+++ b/stream/types/Cargo.toml
@@ -0,0 +1,7 @@
+[package]
+name = "jellystream-types"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+serde = { version = "1.0.219", features = ["derive"] }
diff --git a/stream/types/src/lib.rs b/stream/types/src/lib.rs
new file mode 100644
index 0000000..a90db03
--- /dev/null
+++ b/stream/types/src/lib.rs
@@ -0,0 +1,281 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use serde::{Deserialize, Serialize};
+use std::{collections::BTreeMap, fmt::Display, str::FromStr};
+
+pub type SegmentNum = usize;
+pub type TrackNum = usize;
+pub type FormatNum = usize;
+pub type IndexNum = usize;
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub enum StreamSpec {
+ // Whep {
+ // track: TrackNum,
+ // seek: u64,
+ // },
+ // WhepControl {
+ // token: String,
+ // },
+ Remux {
+ tracks: Vec<usize>,
+ container: StreamContainer,
+ },
+ Original {
+ track: TrackNum,
+ },
+ HlsSuperMultiVariant {
+ container: StreamContainer,
+ },
+ HlsMultiVariant {
+ segment: SegmentNum,
+ container: StreamContainer,
+ },
+ HlsVariant {
+ segment: SegmentNum,
+ track: TrackNum,
+ container: StreamContainer,
+ format: FormatNum,
+ },
+ Info {
+ segment: Option<u64>,
+ },
+ FragmentIndex {
+ segment: SegmentNum,
+ track: TrackNum,
+ },
+ Fragment {
+ segment: SegmentNum,
+ track: TrackNum,
+ index: IndexNum,
+ container: StreamContainer,
+ format: FormatNum,
+ },
+ // Track {
+ // segment: SegmentNum,
+ // track: TrackNum,
+ // container: StreamContainer,
+ // foramt: FormatNum,
+ // },
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct StreamInfo {
+ pub name: Option<String>,
+ pub segments: Vec<StreamSegmentInfo>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct StreamSegmentInfo {
+ pub name: Option<String>,
+ pub duration: f64,
+ pub tracks: Vec<StreamTrackInfo>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct StreamTrackInfo {
+ pub name: Option<String>,
+ pub kind: TrackKind,
+ pub formats: Vec<StreamFormatInfo>,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum TrackKind {
+ Video,
+ Audio,
+ Subtitle,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub struct StreamFormatInfo {
+ pub codec: String,
+ pub bitrate: f64,
+ pub remux: bool,
+ pub containers: Vec<StreamContainer>,
+
+ pub width: Option<u64>,
+ pub height: Option<u64>,
+ pub samplerate: Option<f64>,
+ pub channels: Option<usize>,
+ pub bit_depth: Option<u8>,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(rename_all = "lowercase")]
+pub enum StreamContainer {
+ WebM,
+ Matroska,
+ WebVTT,
+ MPEG4,
+ JVTT,
+}
+
+impl StreamSpec {
+ pub fn to_query(&self) -> String {
+ match self {
+ StreamSpec::Remux { tracks, container } => {
+ format!(
+ "?remux&tracks={}&container={container}",
+ tracks
+ .iter()
+ .map(|t| t.to_string())
+ .collect::<Vec<String>>()
+ .join(",")
+ )
+ }
+ StreamSpec::Original { track } => format!("?original&track={track}"),
+ StreamSpec::HlsSuperMultiVariant { container } => {
+ format!("?hlssupermultivariant&container={container}")
+ }
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ format!("?hlsmultivariant&segment={segment}&container={container}")
+ }
+ StreamSpec::HlsVariant {
+ segment,
+ track,
+ container,
+ format,
+ } => format!(
+ "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}"
+ ),
+ StreamSpec::Info {
+ segment: Some(segment),
+ } => format!("?info&segment={segment}"),
+ StreamSpec::Info { segment: None } => "?info".to_string(),
+ StreamSpec::FragmentIndex { segment, track } => {
+ format!("?fragmentindex&segment={segment}&track={track}")
+ }
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ } => format!(
+ "?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}"
+ ),
+ }
+ }
+ pub fn to_query_short(&self) -> String {
+ match self {
+ StreamSpec::Remux { tracks, container } => {
+ format!(
+ "?remux&ts={}&c={container}",
+ tracks
+ .iter()
+ .map(|t| t.to_string())
+ .collect::<Vec<String>>()
+ .join(",")
+ )
+ }
+ StreamSpec::Original { track } => format!("?original&t={track}"),
+ StreamSpec::HlsSuperMultiVariant { container } => {
+ format!("?hlssupermultivariant&c={container}")
+ }
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ format!("?hlsmultivariant&s={segment}&c={container}")
+ }
+ StreamSpec::HlsVariant {
+ segment,
+ track,
+ container,
+ format,
+ } => format!("?hlsvariant&s={segment}&t={track}&c={container}&f={format}"),
+ StreamSpec::Info {
+ segment: Some(segment),
+ } => format!("?info&s={segment}"),
+ StreamSpec::Info { segment: None } => "?info".to_string(),
+ StreamSpec::FragmentIndex { segment, track } => {
+ format!("?fragmentindex&s={segment}&t={track}")
+ }
+ StreamSpec::Fragment {
+ segment,
+ track,
+ index,
+ container,
+ format,
+ } => format!("?fragment&s={segment}&t={track}&i={index}&c={container}&f={format}"),
+ }
+ }
+ pub fn from_query_kv(query: &BTreeMap<String, String>) -> Result<Self, &'static str> {
+ let get_num = |k: &'static str, ks: &'static str| {
+ query
+ .get(k)
+ .or(query.get(ks))
+ .ok_or(k)
+ .and_then(|a| a.parse().map_err(|_| "invalid number"))
+ };
+ let get_container = || {
+ query
+ .get("container")
+ .or(query.get("c"))
+ .ok_or("container")
+ .and_then(|s| s.parse().map_err(|()| "unknown container"))
+ };
+ if query.contains_key("info") {
+ Ok(Self::Info {
+ segment: get_num("segment", "s").ok(),
+ })
+ } else if query.contains_key("hlssupermultivariant") {
+ Ok(Self::HlsSuperMultiVariant {
+ container: get_container().ok().unwrap_or(StreamContainer::Matroska),
+ })
+ } else if query.contains_key("hlsmultivariant") {
+ Ok(Self::HlsMultiVariant {
+ segment: get_num("segment", "s")? as SegmentNum,
+ container: get_container()?,
+ })
+ } else if query.contains_key("hlsvariant") {
+ Ok(Self::HlsVariant {
+ segment: get_num("segment", "s")? as SegmentNum,
+ track: get_num("track", "t")? as TrackNum,
+ format: get_num("format", "f")? as FormatNum,
+ container: get_container()?,
+ })
+ } else if query.contains_key("fragment") {
+ Ok(Self::Fragment {
+ segment: get_num("segment", "s")? as SegmentNum,
+ track: get_num("track", "t")? as TrackNum,
+ format: get_num("format", "f")? as FormatNum,
+ index: get_num("index", "i")? as IndexNum,
+ container: get_container()?,
+ })
+ } else if query.contains_key("fragmentindex") {
+ Ok(Self::FragmentIndex {
+ segment: get_num("segment", "s")? as SegmentNum,
+ track: get_num("track", "t")? as TrackNum,
+ })
+ } else {
+ Err("invalid stream spec")
+ }
+ }
+}
+
+impl Display for StreamContainer {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(match self {
+ StreamContainer::WebM => "webm",
+ StreamContainer::Matroska => "matroska",
+ StreamContainer::WebVTT => "webvtt",
+ StreamContainer::JVTT => "jvtt",
+ StreamContainer::MPEG4 => "mpeg4",
+ })
+ }
+}
+impl FromStr for StreamContainer {
+ type Err = ();
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(match s {
+ "webm" => StreamContainer::WebM,
+ "matroska" => StreamContainer::Matroska,
+ "webvtt" => StreamContainer::WebVTT,
+ "jvtt" => StreamContainer::JVTT,
+ "mpeg4" => StreamContainer::MPEG4,
+ _ => return Err(()),
+ })
+ }
+}