aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-14 13:41:42 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-14 13:41:42 +0200
commita3afc2756a52f7d6fedc928b97c8ff3eb1ade338 (patch)
tree9bb043975a6b92e45bbc2f09e19641f1109950b1
parent48a57a52d85d387efe122fb4d9fb113f577a0a98 (diff)
downloadjellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar
jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar.bz2
jellything-a3afc2756a52f7d6fedc928b97c8ff3eb1ade338.tar.zst
lots of rewriting and removing dumb code
-rw-r--r--base/src/assetfed.rs7
-rw-r--r--base/src/database.rs27
-rw-r--r--common/src/lib.rs1
-rw-r--r--common/src/stream.rs53
-rw-r--r--import/src/lib.rs10
-rw-r--r--remuxer/src/extract.rs17
-rw-r--r--remuxer/src/fragment.rs101
-rw-r--r--remuxer/src/lib.rs63
-rw-r--r--remuxer/src/remux.rs572
-rw-r--r--server/src/routes/ui/player.rs22
-rw-r--r--stream/src/fragment.rs45
-rw-r--r--stream/src/fragment_index.rs32
-rw-r--r--stream/src/hls.rs72
-rw-r--r--stream/src/jhls.rs47
-rw-r--r--stream/src/lib.rs36
-rw-r--r--stream/src/webvtt.rs3
16 files changed, 542 insertions, 566 deletions
diff --git a/base/src/assetfed.rs b/base/src/assetfed.rs
index 575188d..697cacb 100644
--- a/base/src/assetfed.rs
+++ b/base/src/assetfed.rs
@@ -78,11 +78,4 @@ impl AssetInner {
pub fn is_federated(&self) -> bool {
matches!(self, Self::Federated { .. })
}
-
- pub fn as_local_track(self) -> Option<LocalTrack> {
- match self {
- AssetInner::LocalTrack(x) => Some(x),
- _ => None,
- }
- }
}
diff --git a/base/src/database.rs b/base/src/database.rs
index 407db29..32f1464 100644
--- a/base/src/database.rs
+++ b/base/src/database.rs
@@ -14,7 +14,8 @@ use redb::{Durability, ReadableTable, StorageError, TableDefinition};
use std::{
fs::create_dir_all,
hash::{DefaultHasher, Hasher},
- path::Path,
+ path::{Path, PathBuf},
+ str::FromStr,
sync::{Arc, RwLock},
time::SystemTime,
};
@@ -38,6 +39,8 @@ const T_NODE_EXTERNAL_ID: TableDefinition<(&str, &str), [u8; 32]> =
TableDefinition::new("node_external_id");
const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime");
const T_NODE_MTIME: TableDefinition<[u8; 32], u64> = TableDefinition::new("node_mtime");
+const T_NODE_MEDIA_PATHS: TableDefinition<([u8; 32], &str), ()> =
+ TableDefinition::new("node_media_paths");
#[derive(Clone)]
pub struct Database {
@@ -67,6 +70,7 @@ impl Database {
txn.open_table(T_NODE_MTIME)?;
txn.open_table(T_NODE_CHILDREN)?;
txn.open_table(T_NODE_EXTERNAL_ID)?;
+ txn.open_table(T_NODE_MEDIA_PATHS)?;
txn.open_table(T_IMPORT_FILE_MTIME)?;
txn.commit()?;
}
@@ -123,17 +127,20 @@ impl Database {
let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?;
let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?;
let mut t_import_file_mtime = txn.open_table(T_IMPORT_FILE_MTIME)?;
+ let mut t_node_media_paths = txn.open_table(T_NODE_MEDIA_PATHS)?;
t_node.retain(|_, _| false)?;
t_node_mtime.retain(|_, _| false)?;
t_node_children.retain(|_, _| false)?;
t_node_external_id.retain(|_, _| false)?;
t_import_file_mtime.retain(|_, _| false)?;
+ t_node_media_paths.retain(|_, _| false)?;
drop((
t_node,
t_node_mtime,
t_node_children,
t_node_external_id,
t_import_file_mtime,
+ t_node_media_paths,
));
txn.set_durability(Durability::Eventual);
txn.commit()?;
@@ -189,6 +196,24 @@ impl Database {
txn.commit()?;
Ok(())
}
+ pub fn get_node_media_paths(&self, id: NodeID) -> Result<Vec<PathBuf>> {
+ let txn = self.inner.begin_read()?;
+ let table = txn.open_table(T_NODE_MEDIA_PATHS)?;
+ let mut paths = Vec::new();
+ // TODO fix this
+ for p in table.range((id.0, "\0")..(id.0, "\x7f"))? {
+ paths.push(PathBuf::from_str(p?.0.value().1)?);
+ }
+ Ok(paths)
+ }
+ pub fn insert_node_media_path(&self, id: NodeID, path: &Path) -> Result<()> {
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_MEDIA_PATHS)?;
+ table.insert((id.0, path.to_str().unwrap()), ())?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
pub fn update_node_udata(
&self,
diff --git a/common/src/lib.rs b/common/src/lib.rs
index ce333eb..00f07b6 100644
--- a/common/src/lib.rs
+++ b/common/src/lib.rs
@@ -171,7 +171,6 @@ pub type TrackID = usize;
pub struct LocalTrack {
pub path: PathBuf,
pub track: TrackID,
- pub codec_private: Option<Vec<u8>>,
}
#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
diff --git a/common/src/stream.rs b/common/src/stream.rs
index a06dad5..75349cc 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -6,10 +6,15 @@
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, fmt::Display, str::FromStr};
+pub type SegmentNum = usize;
+pub type TrackNum = usize;
+pub type FormatNum = usize;
+pub type IndexNum = usize;
+
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum StreamSpec {
Whep {
- track: usize,
+ track: TrackNum,
seek: u64,
},
WhepControl {
@@ -20,34 +25,34 @@ pub enum StreamSpec {
container: StreamContainer,
},
Original {
- track: usize,
+ track: TrackNum,
},
HlsSuperMultiVariant {
container: StreamContainer,
},
HlsMultiVariant {
- segment: u64,
+ segment: SegmentNum,
container: StreamContainer,
},
HlsVariant {
- segment: u64,
- track: usize,
+ segment: SegmentNum,
+ track: TrackNum,
container: StreamContainer,
- format: usize,
+ format: FormatNum,
},
Info {
segment: Option<u64>,
},
FragmentIndex {
- segment: u64,
- track: usize,
+ segment: SegmentNum,
+ track: TrackNum,
},
Fragment {
- segment: u64,
- track: usize,
- index: u64,
+ segment: SegmentNum,
+ track: TrackNum,
+ index: IndexNum,
container: StreamContainer,
- format: usize,
+ format: FormatNum,
},
}
@@ -60,7 +65,7 @@ pub struct StreamInfo {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamSegmentInfo {
pub name: Option<String>,
- pub duration: u64,
+ pub duration: f64,
pub tracks: Vec<StreamTrackInfo>,
}
@@ -92,7 +97,7 @@ pub struct StreamFormatInfo {
pub bit_depth: Option<u8>,
}
-#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum StreamContainer {
WebM,
@@ -164,13 +169,25 @@ impl StreamSpec {
Ok(Self::Info {
segment: get_num("segment").ok(),
})
+ } else if query.contains_key("hlsmultivariant") {
+ Ok(Self::HlsMultiVariant {
+ segment: get_num("segment")? as SegmentNum,
+ container: get_container()?,
+ })
+ } else if query.contains_key("hlsvariant") {
+ Ok(Self::HlsVariant {
+ segment: get_num("segment")? as SegmentNum,
+ track: get_num("track")? as TrackNum,
+ format: get_num("format")? as FormatNum,
+ container: get_container()?,
+ })
} else if query.contains_key("fragment") {
Ok(Self::Fragment {
- segment: get_num("segment")?,
- track: get_num("track")? as usize,
- index: get_num("index")?,
+ segment: get_num("segment")? as SegmentNum,
+ track: get_num("track")? as TrackNum,
+ format: get_num("format")? as FormatNum,
+ index: get_num("index")? as IndexNum,
container: get_container()?,
- format: get_num("format")? as usize,
})
} else {
Err("invalid stream spec")
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 5607450..3ea42f1 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -7,14 +7,13 @@ use anyhow::{anyhow, bail, Context, Result};
use infojson::YVideo;
use jellybase::{
assetfed::AssetInner,
- common::{
- Chapter, LocalTrack, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack,
- SourceTrackKind, TrackSource,
- },
+ common::{Chapter, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind},
database::Database,
CONF, SECRETS,
};
-use jellyclient::{Appearance, PeopleGroup, TmdbKind, TraktKind, Visibility};
+use jellyclient::{
+ Appearance, LocalTrack, PeopleGroup, TmdbKind, TrackSource, TraktKind, Visibility,
+};
use jellyremuxer::metadata::checked_matroska_metadata;
use log::info;
use rayon::iter::{ParallelBridge, ParallelIterator};
@@ -397,7 +396,6 @@ fn import_media_file(
},
source: TrackSource::Local(
AssetInner::LocalTrack(LocalTrack {
- codec_private: track.codec_private,
path: path.to_owned(),
track: track.track_number as usize,
})
diff --git a/remuxer/src/extract.rs b/remuxer/src/extract.rs
index 12e4003..15c1e9d 100644
--- a/remuxer/src/extract.rs
+++ b/remuxer/src/extract.rs
@@ -5,29 +5,22 @@
*/
use crate::seek_index::get_seek_index;
use anyhow::{anyhow, bail};
-use jellybase::common::LocalTrack;
use jellymatroska::{block::Block, read::EbmlReader, Master, MatroskaTag};
use log::debug;
use std::{fs::File, io::BufReader, path::PathBuf};
pub type TrackExtract = Vec<(u64, Option<u64>, Vec<u8>)>;
-pub fn extract_track(path_base: PathBuf, track_info: LocalTrack) -> anyhow::Result<TrackExtract> {
- let source_path = path_base.join(track_info.path);
- let file = File::open(&source_path)?;
+pub fn extract_track(path: PathBuf, track: u64) -> anyhow::Result<TrackExtract> {
+ let file = File::open(&path)?;
let mut reader = EbmlReader::new(BufReader::new(file));
- let index = get_seek_index(&source_path)?;
- let index = index
- .get(&(track_info.track as u64))
- .ok_or(anyhow!("track missing"))?;
+ let index = get_seek_index(&path)?;
+ let index = index.get(&track).ok_or(anyhow!("track missing"))?;
let mut out = Vec::new();
for b in &index.blocks {
reader.seek(b.source_off, MatroskaTag::BlockGroup(Master::Start))?;
let (duration, block) = read_group(&mut reader)?;
- assert_eq!(
- track_info.track, block.track as usize,
- "seek index is wrong"
- );
+ assert_eq!(track, block.track, "seek index is wrong");
out.push((b.pts, duration, block.data))
}
Ok(out)
diff --git a/remuxer/src/fragment.rs b/remuxer/src/fragment.rs
index 9fa68f3..73fe046 100644
--- a/remuxer/src/fragment.rs
+++ b/remuxer/src/fragment.rs
@@ -5,11 +5,10 @@
*/
use crate::{
- ebml_header, ebml_segment_info, ebml_track_entry, seek_index::get_seek_index,
- segment_extractor::SegmentExtractIter,
+ ebml_header, ebml_segment_info, ebml_track_entry, metadata::matroska_metadata,
+ seek_index::get_seek_index, segment_extractor::SegmentExtractIter,
};
use anyhow::{anyhow, Context, Result};
-use jellybase::common::{LocalTrack, Node, SourceTrackKind};
use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag};
use log::{debug, info};
use std::{
@@ -21,32 +20,33 @@ use std::{
const FRAGMENT_LENGTH: f64 = 2.;
-pub fn fragment_index(
- path_base: &Path,
- item: &Node,
- local_track: &LocalTrack,
- track_index: usize,
-) -> Result<Vec<Range<f64>>> {
- let media_info = item.media.as_ref().unwrap();
- let source_path = path_base.join(&local_track.path);
- let index = get_seek_index(&source_path)?;
+pub fn fragment_index(path: &Path, track: u64) -> Result<Vec<Range<f64>>> {
+ let meta = matroska_metadata(path)?;
+ let duration = meta.info.as_ref().unwrap().duration.unwrap();
+ let force_kf = meta
+ .as_ref()
+ .tracks
+ .as_ref()
+ .unwrap()
+ .entries
+ .iter()
+ .find(|t| t.track_number == track)
+ .unwrap()
+ .track_type
+ == 17;
+
+ let index = get_seek_index(&path)?;
let index = index
- .get(&(local_track.track as u64))
+ .get(&track)
.ok_or(anyhow!("seek index track missing"))?;
- // everything is a keyframe (even though nothing is...)
- let force_kf = matches!(
- media_info.tracks[track_index].kind,
- SourceTrackKind::Subtitles { .. }
- );
-
let n_kf = if force_kf {
index.blocks.len()
} else {
index.keyframes.len()
};
- let average_kf_interval = media_info.duration / n_kf as f64;
+ let average_kf_interval = duration / n_kf as f64;
let kf_per_frag = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize;
debug!("average keyframe interval: {average_kf_interval}");
debug!(" => keyframes per frag {kf_per_frag}");
@@ -72,7 +72,7 @@ pub fn fragment_index(
index.keyframes.get((i + 1) * kf_per_frag).copied()
}
.map(|i| index.blocks[i].pts as f64 / 1000.)
- .unwrap_or(media_info.duration);
+ .unwrap_or(duration);
start..end
})
.collect())
@@ -80,45 +80,45 @@ pub fn fragment_index(
pub fn write_fragment_into(
writer: impl Write,
- path_base: &Path,
- item: &Node,
- local_track: &LocalTrack,
- track: usize,
+ path: &Path,
+ track: u64,
webm: bool,
+ title: &str,
n: usize,
) -> anyhow::Result<()> {
- info!("writing fragment {n} of {:?} (track {track})", item.title);
- let mut output = EbmlWriter::new(BufWriter::new(writer), 0);
- let media_info = item.media.as_ref().unwrap();
- let info = media_info
+ let meta = matroska_metadata(path)?;
+ let duration = meta.info.as_ref().unwrap().duration.unwrap();
+ let track_meta = meta
+ .as_ref()
.tracks
- .get(track)
- .ok_or(anyhow!("track not available"))?
- .to_owned();
- let source_path = path_base.join(&local_track.path);
+ .as_ref()
+ .unwrap()
+ .entries
+ .iter()
+ .find(|t| t.track_number == track)
+ .unwrap();
+ let force_kf = track_meta.track_type == 17;
+
+ info!("writing fragment {n} of {:?} (track {track})", title);
+ let mut output = EbmlWriter::new(BufWriter::new(writer), 0);
let mapped = 1;
- info!(
- "\t- {track} {source_path:?} ({} => {mapped})",
- local_track.track
- );
- info!("\t {}", info);
- let file = File::open(&source_path).context("opening source file")?;
- let index = get_seek_index(&source_path)?;
+ info!("\t- {track} {path:?} ({} => {mapped})", track);
+ // info!("\t {}", info);
+ let file = File::open(&path).context("opening source file")?;
+ let index = get_seek_index(&path)?;
let index = index
- .get(&(local_track.track as u64))
+ .get(&track)
.ok_or(anyhow!("track missing 2"))?
.to_owned();
debug!("\t seek index: {} blocks loaded", index.blocks.len());
let mut reader = EbmlReader::new(BufReader::new(file));
- let force_kf = matches!(info.kind, SourceTrackKind::Subtitles { .. });
let n_kf = if force_kf {
index.blocks.len()
} else {
index.keyframes.len()
};
-
- let average_kf_interval = media_info.duration / n_kf as f64;
+ let average_kf_interval = duration / n_kf as f64;
let kf_per_frag = (FRAGMENT_LENGTH / average_kf_interval).ceil() as usize;
debug!("average keyframe interval: {average_kf_interval}");
debug!(" => keyframes per frag {kf_per_frag}");
@@ -144,25 +144,20 @@ pub fn write_fragment_into(
.blocks
.get(end_block_index)
.map(|b| b.pts)
- .unwrap_or((media_info.duration * 1000.) as u64);
+ .unwrap_or((duration * 1000.) as u64);
output.write_tag(&ebml_header(webm))?;
output.write_tag(&MatroskaTag::Segment(Master::Start))?;
output.write_tag(&ebml_segment_info(
- format!("{}: {info}", item.title.clone().unwrap_or_default()),
+ title.to_string(),
(last_block_pts - start_block.pts) as f64 / 1000.,
))?;
output.write_tag(&MatroskaTag::Tracks(Master::Collected(vec![
- ebml_track_entry(
- mapped,
- local_track.track as u64 * 100, // TODO something else that is unique to the track
- &info,
- local_track.codec_private.clone(),
- ),
+ ebml_track_entry(mapped, track_meta),
])))?;
reader.seek(start_block.source_off, MatroskaTag::Cluster(Master::Start))?;
- let mut reader = SegmentExtractIter::new(&mut reader, local_track.track as u64);
+ let mut reader = SegmentExtractIter::new(&mut reader, track);
{
// TODO this one caused fragments to get dropped by MSE for no reason
diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs
index cc4b39b..9ddf7c1 100644
--- a/remuxer/src/lib.rs
+++ b/remuxer/src/lib.rs
@@ -5,17 +5,16 @@
*/
pub mod extract;
pub mod fragment;
+pub mod metadata;
pub mod remux;
pub mod seek_index;
pub mod segment_extractor;
pub mod trim_writer;
-pub mod metadata;
+use ebml_struct::matroska::TrackEntry;
pub use fragment::write_fragment_into;
-pub use remux::remux_stream_into;
-
-use jellybase::common::{SourceTrack, SourceTrackKind};
use jellymatroska::{Master, MatroskaTag};
+pub use remux::remux_stream_into;
pub fn ebml_header(webm: bool) -> MatroskaTag {
MatroskaTag::Ebml(Master::Collected(vec![
@@ -42,66 +41,56 @@ pub fn ebml_segment_info(title: String, duration: f64) -> MatroskaTag {
]))
}
-pub fn ebml_track_entry(
- number: u64,
- uid: u64,
- track: &SourceTrack,
- codec_private: Option<Vec<u8>>,
-) -> MatroskaTag {
+pub fn ebml_track_entry(number: u64, track: &TrackEntry) -> MatroskaTag {
let mut els = vec![
MatroskaTag::TrackNumber(number),
- MatroskaTag::TrackUID(uid),
MatroskaTag::FlagLacing(track.flag_lacing),
MatroskaTag::Language(track.language.clone()),
- MatroskaTag::CodecID(track.codec.clone()),
+ MatroskaTag::CodecID(track.codec_id.clone()),
MatroskaTag::CodecDelay(track.codec_delay),
MatroskaTag::SeekPreRoll(track.seek_pre_roll),
];
if let Some(d) = &track.default_duration {
els.push(MatroskaTag::DefaultDuration(*d));
}
- match track.kind {
- SourceTrackKind::Video {
- width,
- height,
- display_height,
- display_width,
- display_unit,
- fps,
- } => {
+ match track.track_type {
+ 1 => {
+ let video = track.video.as_ref().unwrap();
els.push(MatroskaTag::TrackType(1));
let mut props = vec![
- MatroskaTag::PixelWidth(width),
- MatroskaTag::PixelHeight(height),
+ MatroskaTag::PixelWidth(video.pixel_width),
+ MatroskaTag::PixelHeight(video.pixel_height),
];
- props.push(MatroskaTag::DisplayWidth(display_width.unwrap_or(width)));
- props.push(MatroskaTag::DisplayHeight(display_height.unwrap_or(height)));
- props.push(MatroskaTag::DisplayUnit(display_unit));
- if let Some(fps) = fps {
+ props.push(MatroskaTag::DisplayWidth(
+ video.display_width.unwrap_or(video.pixel_width),
+ ));
+ props.push(MatroskaTag::DisplayHeight(
+ video.display_height.unwrap_or(video.pixel_height),
+ ));
+ props.push(MatroskaTag::DisplayUnit(video.display_unit));
+ if let Some(fps) = video.frame_rate {
props.push(MatroskaTag::FrameRate(fps))
}
els.push(MatroskaTag::Video(Master::Collected(props)))
}
- SourceTrackKind::Audio {
- channels,
- sample_rate,
- bit_depth,
- } => {
+ 2 => {
+ let audio = track.audio.as_ref().unwrap();
els.push(MatroskaTag::TrackType(2));
let mut props = vec![
- MatroskaTag::SamplingFrequency(sample_rate),
- MatroskaTag::Channels(channels.try_into().unwrap()),
+ MatroskaTag::SamplingFrequency(audio.sampling_frequency),
+ MatroskaTag::Channels(audio.channels),
];
- if let Some(bit_depth) = bit_depth {
+ if let Some(bit_depth) = audio.bit_depth {
props.push(MatroskaTag::BitDepth(bit_depth.try_into().unwrap()));
}
els.push(MatroskaTag::Audio(Master::Collected(props)));
}
- SourceTrackKind::Subtitles => {
+ 17 => {
els.push(MatroskaTag::TrackType(17));
}
+ _ => unreachable!(),
}
- if let Some(d) = &codec_private {
+ if let Some(d) = &track.codec_private {
els.push(MatroskaTag::CodecPrivate(d.clone()));
}
MatroskaTag::TrackEntry(Master::Collected(els))
diff --git a/remuxer/src/remux.rs b/remuxer/src/remux.rs
index 0507f1e..a44c58b 100644
--- a/remuxer/src/remux.rs
+++ b/remuxer/src/remux.rs
@@ -3,333 +3,311 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
-use crate::{
- ebml_header, ebml_track_entry, seek_index::get_seek_index,
- segment_extractor::SegmentExtractIter, trim_writer::TrimWriter,
-};
-use anyhow::{anyhow, Context};
-use jellybase::common::{
- seek_index::{BlockIndex, SeekIndex},
- LocalTrack, Node, SourceTrack,
-};
-use jellymatroska::{
- read::EbmlReader,
- write::{bad_vint_length, vint_length, EbmlWriter},
- Master, MatroskaTag,
-};
-use log::{debug, info, trace, warn};
-use std::{
- fs::File,
- io::{BufReader, BufWriter, Seek, SeekFrom, Write},
- ops::Range,
- path::PathBuf,
- sync::Arc,
- time::Instant,
-};
+use jellybase::common::Node;
+use std::{io::Write, ops::Range, path::PathBuf};
-struct ClusterLayout {
- position: usize,
- timestamp: u64,
- source_offsets: Vec<Option<u64>>,
- blocks: Vec<(usize, BlockIndex)>,
-}
+// struct ClusterLayout {
+// position: usize,
+// timestamp: u64,
+// source_offsets: Vec<Option<u64>>,
+// blocks: Vec<(usize, BlockIndex)>,
+// }
pub fn remux_stream_into(
- writer: impl Write,
- range: Range<usize>,
- path_base: PathBuf,
- item: &Node,
- track_sources: Vec<LocalTrack>,
- selection: Vec<usize>,
- webm: bool,
+ _writer: impl Write,
+ _range: Range<usize>,
+ _path_base: PathBuf,
+ _item: &Node,
+ _selection: Vec<usize>,
+ _webm: bool,
) -> anyhow::Result<()> {
- info!("remuxing {:?} to have tracks {selection:?}", item.title);
- let writer = TrimWriter::new(BufWriter::new(writer), range.clone());
- let mut output = EbmlWriter::new(writer, 0);
+ // info!("remuxing {:?} to have tracks {selection:?}", item.title);
+ // let writer = TrimWriter::new(BufWriter::new(writer), range.clone());
+ // let mut output = EbmlWriter::new(writer, 0);
- struct ReaderC {
- info: SourceTrack,
- reader: EbmlReader,
- mapped: u64,
- index: Arc<SeekIndex>,
- source_track_index: usize,
- codec_private: Option<Vec<u8>>,
- layouting_progress_index: usize,
- }
+ // struct ReaderC {
+ // info: SourceTrack,
+ // reader: EbmlReader,
+ // mapped: u64,
+ // index: Arc<SeekIndex>,
+ // source_track_index: usize,
+ // codec_private: Option<Vec<u8>>,
+ // layouting_progress_index: usize,
+ // }
- let timing_cp = Instant::now();
+ // let timing_cp = Instant::now();
- let mut inputs = selection
- .iter()
- .enumerate()
- .map(|(index, sel)| {
- let info = item
- .media
- .as_ref()
- .unwrap()
- .tracks
- .get(*sel)
- .ok_or(anyhow!("track not available"))?
- .to_owned();
- let private = &track_sources[index];
- let source_path = path_base.join(&private.path);
- let mapped = index as u64 + 1;
- info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track);
- info!("\t {}", info);
- let file = File::open(&source_path).context("opening source file")?;
- let index = get_seek_index(&source_path)?;
- let index = index
- .get(&(private.track as u64))
- .ok_or(anyhow!("track missing 3"))?
- .to_owned();
- debug!("\t seek index: {} blocks loaded", index.blocks.len());
- let reader = EbmlReader::new(BufReader::new(file));
- Ok(ReaderC {
- index,
- reader,
- info,
- mapped,
- source_track_index: private.track,
- codec_private: private.codec_private.clone(),
- layouting_progress_index: 0,
- })
- })
- .collect::<anyhow::Result<Vec<_>>>()?;
+ // let mut inputs = selection
+ // .iter()
+ // .enumerate()
+ // .map(|(index, sel)| {
+ // let info = item
+ // .media
+ // .as_ref()
+ // .unwrap()
+ // .tracks
+ // .get(*sel)
+ // .ok_or(anyhow!("track not available"))?
+ // .to_owned();
+ // let source_path = path_base.join(&private.path);
+ // let mapped = index as u64 + 1;
+ // info!("\t- {sel} {source_path:?} ({} => {mapped})", private.track);
+ // info!("\t {}", info);
+ // let file = File::open(&source_path).context("opening source file")?;
+ // let index = get_seek_index(&source_path)?;
+ // let index = index
+ // .get(&(private.track as u64))
+ // .ok_or(anyhow!("track missing 3"))?
+ // .to_owned();
+ // debug!("\t seek index: {} blocks loaded", index.blocks.len());
+ // let reader = EbmlReader::new(BufReader::new(file));
+ // Ok(ReaderC {
+ // index,
+ // reader,
+ // info,
+ // mapped,
+ // source_track_index: private.track,
+ // codec_private: private.codec_private.clone(),
+ // layouting_progress_index: 0,
+ // })
+ // })
+ // .collect::<anyhow::Result<Vec<_>>>()?;
- info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp);
- let timing_cp = Instant::now();
+ // info!("(perf) prepare inputs: {:?}", Instant::now() - timing_cp);
+ // let timing_cp = Instant::now();
- output.write_tag(&ebml_header(webm))?;
+ // output.write_tag(&ebml_header(webm))?;
- output.write_tag(&MatroskaTag::Segment(Master::Start))?;
- let segment_offset = output.position();
+ // output.write_tag(&MatroskaTag::Segment(Master::Start))?;
+ // let segment_offset = output.position();
- output.write_tag(&MatroskaTag::Info(Master::Collected(vec![
- MatroskaTag::TimestampScale(1_000_000),
- MatroskaTag::Duration(item.media.as_ref().unwrap().duration * 1000.0),
- MatroskaTag::Title(item.title.clone().unwrap_or_default()),
- MatroskaTag::MuxingApp("jellyremux".to_string()),
- MatroskaTag::WritingApp("jellything".to_string()),
- ])))?;
+ // output.write_tag(&MatroskaTag::Info(Master::Collected(vec![
+ // MatroskaTag::TimestampScale(1_000_000),
+ // MatroskaTag::Duration(item.media.as_ref().unwrap().duration * 1000.0),
+ // MatroskaTag::Title(item.title.clone().unwrap_or_default()),
+ // MatroskaTag::MuxingApp("jellyremux".to_string()),
+ // MatroskaTag::WritingApp("jellything".to_string()),
+ // ])))?;
- let tracks_header = inputs
- .iter_mut()
- .map(|rc| ebml_track_entry(rc.mapped, rc.mapped, &rc.info, rc.codec_private.take()))
- .collect();
- output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?;
+ // let tracks_header = inputs
+ // .iter_mut()
+ // .map(|rc| ebml_track_entry(rc.mapped, rc.mapped, &rc.info, rc.codec_private.take()))
+ // .collect();
+ // output.write_tag(&MatroskaTag::Tracks(Master::Collected(tracks_header)))?;
- let mut segment_layout: Vec<ClusterLayout> = {
- let mut cluster_pts = 0;
- let mut clusters = vec![];
- let mut cluster = vec![];
- let mut source_offsets = vec![None; inputs.len()];
- let mut gp = 0usize; // cluster position (in the segment)
- let mut p = 0usize; // block position (in the cluster)
- loop {
- let (track, block) = {
- let mut best_block = BlockIndex {
- pts: u64::MAX,
- size: 0,
- source_off: 0,
- };
- let mut best_track = 0;
- for (i, r) in inputs.iter().enumerate() {
- if let Some(v) = r.index.blocks.get(r.layouting_progress_index) {
- if v.pts < best_block.pts {
- best_block = v.to_owned();
- best_track = i;
- }
- };
- }
- (best_track, best_block)
- };
- inputs[track].layouting_progress_index += 1;
- source_offsets[track].get_or_insert(block.source_off);
- if block.pts > cluster_pts + 1_000 {
- let cluster_content_size = 1 + 1 // timestamp {tag, size}
- + bad_vint_length(cluster_pts) // timestamp tag value
- + p;
- let cluster_size = 4 // tag length
- + vint_length(cluster_content_size as u64) // size varint
- + cluster_content_size;
- clusters.push(ClusterLayout {
- position: gp, // relative to the first cluster
- timestamp: cluster_pts,
- source_offsets,
- blocks: std::mem::take(&mut cluster),
- });
+ // let mut segment_layout: Vec<ClusterLayout> = {
+ // let mut cluster_pts = 0;
+ // let mut clusters = vec![];
+ // let mut cluster = vec![];
+ // let mut source_offsets = vec![None; inputs.len()];
+ // let mut gp = 0usize; // cluster position (in the segment)
+ // let mut p = 0usize; // block position (in the cluster)
+ // loop {
+ // let (track, block) = {
+ // let mut best_block = BlockIndex {
+ // pts: u64::MAX,
+ // size: 0,
+ // source_off: 0,
+ // };
+ // let mut best_track = 0;
+ // for (i, r) in inputs.iter().enumerate() {
+ // if let Some(v) = r.index.blocks.get(r.layouting_progress_index) {
+ // if v.pts < best_block.pts {
+ // best_block = v.to_owned();
+ // best_track = i;
+ // }
+ // };
+ // }
+ // (best_track, best_block)
+ // };
+ // inputs[track].layouting_progress_index += 1;
+ // source_offsets[track].get_or_insert(block.source_off);
+ // if block.pts > cluster_pts + 1_000 {
+ // let cluster_content_size = 1 + 1 // timestamp {tag, size}
+ // + bad_vint_length(cluster_pts) // timestamp tag value
+ // + p;
+ // let cluster_size = 4 // tag length
+ // + vint_length(cluster_content_size as u64) // size varint
+ // + cluster_content_size;
+ // clusters.push(ClusterLayout {
+ // position: gp, // relative to the first cluster
+ // timestamp: cluster_pts,
+ // source_offsets,
+ // blocks: std::mem::take(&mut cluster),
+ // });
- cluster_pts = block.pts;
- source_offsets = vec![None; inputs.len()];
- gp += cluster_size;
- p = 0;
- }
- if block.pts == u64::MAX {
- break;
- }
+ // cluster_pts = block.pts;
+ // source_offsets = vec![None; inputs.len()];
+ // gp += cluster_size;
+ // p = 0;
+ // }
+ // if block.pts == u64::MAX {
+ // break;
+ // }
- let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags}
- // TODO does not work, if more than 127 tracks are present
- + block.size; // block payload
- p += 1; // simpleblock tag
- p += vint_length(simpleblock_size as u64); // simpleblock size vint
- p += simpleblock_size;
+ // let simpleblock_size = 1 + 2 + 1 // block {tracknum, pts_off, flags}
+ // // TODO does not work, if more than 127 tracks are present
+ // + block.size; // block payload
+ // p += 1; // simpleblock tag
+ // p += vint_length(simpleblock_size as u64); // simpleblock size vint
+ // p += simpleblock_size;
- cluster.push((track, block))
- }
- info!("segment layout computed ({} clusters)", clusters.len());
- clusters
- };
- info!(
- "(perf) compute segment layout: {:?}",
- Instant::now() - timing_cp
- );
- let timing_cp = Instant::now();
+ // cluster.push((track, block))
+ // }
+ // info!("segment layout computed ({} clusters)", clusters.len());
+ // clusters
+ // };
+ // info!(
+ // "(perf) compute segment layout: {:?}",
+ // Instant::now() - timing_cp
+ // );
+ // let timing_cp = Instant::now();
- let max_cue_size = 4 // cues id
- + 8 // cues len
- + ( // cues content
- 1 // cp id
- + 1 // cp len
- + ( // cp content
- 1 // ctime id,
- + 1 // ctime len
- + 8 // ctime content uint
- + ( // ctps
- 1 // ctp id
- + 8 // ctp len
- + (// ctp content
- 1 // ctrack id
- + 1 // ctrack size
- + 1 // ctrack content int
- // TODO this breaks if inputs.len() >= 127
- + 1 // ccp id
- + 1 // ccp len
- + 8 // ccp content offset
- )
- )
- ) * inputs.len()
- ) * segment_layout.len()
- + 1 // void id
- + 8; // void len
+ // let max_cue_size = 4 // cues id
+ // + 8 // cues len
+ // + ( // cues content
+ // 1 // cp id
+ // + 1 // cp len
+ // + ( // cp content
+ // 1 // ctime id,
+ // + 1 // ctime len
+ // + 8 // ctime content uint
+ // + ( // ctps
+ // 1 // ctp id
+ // + 8 // ctp len
+ // + (// ctp content
+ // 1 // ctrack id
+ // + 1 // ctrack size
+ // + 1 // ctrack content int
+ // // TODO this breaks if inputs.len() >= 127
+ // + 1 // ccp id
+ // + 1 // ccp len
+ // + 8 // ccp content offset
+ // )
+ // )
+ // ) * inputs.len()
+ // ) * segment_layout.len()
+ // + 1 // void id
+ // + 8; // void len
- let first_cluster_offset_predict = max_cue_size + output.position();
+ // let first_cluster_offset_predict = max_cue_size + output.position();
- // make the cluster position relative to the segment start as they should
- segment_layout
- .iter_mut()
- .for_each(|e| e.position += first_cluster_offset_predict - segment_offset);
+ // // make the cluster position relative to the segment start as they should
+ // segment_layout
+ // .iter_mut()
+ // .for_each(|e| e.position += first_cluster_offset_predict - segment_offset);
- output.write_tag(&MatroskaTag::Cues(Master::Collected(
- segment_layout
- .iter()
- .map(|cluster| {
- MatroskaTag::CuePoint(Master::Collected(
- Some(MatroskaTag::CueTime(cluster.timestamp))
- .into_iter()
- // TODO: Subtitles should not have cues for every cluster
- .chain(inputs.iter().map(|i| {
- MatroskaTag::CueTrackPositions(Master::Collected(vec![
- MatroskaTag::CueTrack(i.mapped),
- MatroskaTag::CueClusterPosition(cluster.position as u64),
- ]))
- }))
- .collect(),
- ))
- })
- .collect(),
- )))?;
- output.write_padding(first_cluster_offset_predict)?;
- let first_cluster_offset = output.position();
- assert_eq!(first_cluster_offset, first_cluster_offset_predict);
+ // output.write_tag(&MatroskaTag::Cues(Master::Collected(
+ // segment_layout
+ // .iter()
+ // .map(|cluster| {
+ // MatroskaTag::CuePoint(Master::Collected(
+ // Some(MatroskaTag::CueTime(cluster.timestamp))
+ // .into_iter()
+ // // TODO: Subtitles should not have cues for every cluster
+ // .chain(inputs.iter().map(|i| {
+ // MatroskaTag::CueTrackPositions(Master::Collected(vec![
+ // MatroskaTag::CueTrack(i.mapped),
+ // MatroskaTag::CueClusterPosition(cluster.position as u64),
+ // ]))
+ // }))
+ // .collect(),
+ // ))
+ // })
+ // .collect(),
+ // )))?;
+ // output.write_padding(first_cluster_offset_predict)?;
+ // let first_cluster_offset = output.position();
+ // assert_eq!(first_cluster_offset, first_cluster_offset_predict);
- let mut skip = 0;
- // TODO binary search
- for (i, cluster) in segment_layout.iter().enumerate() {
- if (cluster.position + segment_offset) >= range.start {
- break;
- }
- skip = i;
- }
+ // let mut skip = 0;
+ // // TODO binary search
+ // for (i, cluster) in segment_layout.iter().enumerate() {
+ // if (cluster.position + segment_offset) >= range.start {
+ // break;
+ // }
+ // skip = i;
+ // }
- if skip != 0 {
- info!("skipping {skip} clusters");
- output.seek(SeekFrom::Start(
- (segment_layout[skip].position + segment_offset) as u64,
- ))?;
- }
+ // if skip != 0 {
+ // info!("skipping {skip} clusters");
+ // output.seek(SeekFrom::Start(
+ // (segment_layout[skip].position + segment_offset) as u64,
+ // ))?;
+ // }
- struct ReaderD<'a> {
- stream: SegmentExtractIter<'a>,
- mapped: u64,
- }
+ // struct ReaderD<'a> {
+ // stream: SegmentExtractIter<'a>,
+ // mapped: u64,
+ // }
- let mut track_readers = inputs
- .iter_mut()
- .enumerate()
- .map(|(i, inp)| {
- inp.reader
- .seek(
- // the seek target might be a hole; we continue until the next cluster of that track.
- // this should be fine since tracks are only read according to segment_layout
- find_first_cluster_with_off(&segment_layout, skip, i)
- .ok_or(anyhow!("cluster hole at eof"))?,
- MatroskaTag::Cluster(Master::Start), // TODO shouldn't this be a child of cluster?
- )
- .context("seeking in input")?;
- let stream = SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64);
+ // let mut track_readers = inputs
+ // .iter_mut()
+ // .enumerate()
+ // .map(|(i, inp)| {
+ // inp.reader
+ // .seek(
+ // // the seek target might be a hole; we continue until the next cluster of that track.
+ // // this should be fine since tracks are only read according to segment_layout
+ // find_first_cluster_with_off(&segment_layout, skip, i)
+ // .ok_or(anyhow!("cluster hole at eof"))?,
+ // MatroskaTag::Cluster(Master::Start), // TODO shouldn't this be a child of cluster?
+ // )
+ // .context("seeking in input")?;
+ // let stream = SegmentExtractIter::new(&mut inp.reader, inp.source_track_index as u64);
- Ok(ReaderD {
- mapped: inp.mapped,
- stream,
- })
- })
- .collect::<anyhow::Result<Vec<_>>>()?;
+ // Ok(ReaderD {
+ // mapped: inp.mapped,
+ // stream,
+ // })
+ // })
+ // .collect::<anyhow::Result<Vec<_>>>()?;
- info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp);
+ // info!("(perf) seek inputs: {:?}", Instant::now() - timing_cp);
- for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) {
- debug!(
- "writing cluster {cluster_index} (pts_base={}) with {} blocks",
- cluster.timestamp,
- cluster.blocks.len()
- );
- {
- let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64;
- if cue_error != 0 {
- warn!("calculation was {} bytes off", cue_error);
- }
- }
+ // for (cluster_index, cluster) in segment_layout.into_iter().enumerate().skip(skip) {
+ // debug!(
+ // "writing cluster {cluster_index} (pts_base={}) with {} blocks",
+ // cluster.timestamp,
+ // cluster.blocks.len()
+ // );
+ // {
+ // let cue_error = cluster.position as i64 - (output.position() - segment_offset) as i64;
+ // if cue_error != 0 {
+ // warn!("calculation was {} bytes off", cue_error);
+ // }
+ // }
- let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)];
- for (block_track, index_block) in cluster.blocks {
- let track_reader = &mut track_readers[block_track];
- // TODO handle duration
- let mut block = track_reader.stream.next_block()?.0;
+ // let mut cluster_blocks = vec![MatroskaTag::Timestamp(cluster.timestamp)];
+ // for (block_track, index_block) in cluster.blocks {
+ // let track_reader = &mut track_readers[block_track];
+ // // TODO handle duration
+ // let mut block = track_reader.stream.next_block()?.0;
- assert_eq!(index_block.size, block.data.len(), "seek index is wrong");
+ // assert_eq!(index_block.size, block.data.len(), "seek index is wrong");
- block.track = track_reader.mapped;
- block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap();
- trace!("n={} tso={}", block.track, block.timestamp_off);
+ // block.track = track_reader.mapped;
+ // block.timestamp_off = (index_block.pts - cluster.timestamp).try_into().unwrap();
+ // trace!("n={} tso={}", block.track, block.timestamp_off);
- cluster_blocks.push(MatroskaTag::SimpleBlock(block))
- }
- output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?;
- }
- // output.write_tag(&MatroskaTag::Segment(Master::End))?;
- Ok(())
+ // cluster_blocks.push(MatroskaTag::SimpleBlock(block))
+ // }
+ // output.write_tag(&MatroskaTag::Cluster(Master::Collected(cluster_blocks)))?;
+ // }
+ // // output.write_tag(&MatroskaTag::Segment(Master::End))?;
+ // Ok(())
+ todo!()
}
-fn find_first_cluster_with_off(
- segment_layout: &[ClusterLayout],
- skip: usize,
- track: usize,
-) -> Option<u64> {
- for cluster in segment_layout.iter().skip(skip) {
- if let Some(off) = cluster.source_offsets[track] {
- return Some(off);
- }
- }
- None
-}
+// fn find_first_cluster_with_off(
+// segment_layout: &[ClusterLayout],
+// skip: usize,
+// track: usize,
+// ) -> Option<u64> {
+// for cluster in segment_layout.iter().skip(skip) {
+// if let Some(off) = cluster.source_offsets[track] {
+// return Some(off);
+// }
+// }
+// None
+// }
diff --git a/server/src/routes/ui/player.rs b/server/src/routes/ui/player.rs
index aa567ab..2cc2dd4 100644
--- a/server/src/routes/ui/player.rs
+++ b/server/src/routes/ui/player.rs
@@ -15,9 +15,10 @@ use crate::{
uri,
};
use anyhow::anyhow;
-use jellybase::{permission::PermissionSetExt, CONF};
+use jellybase::CONF;
use jellycommon::{
- user::{PermissionSet, PlayerKind, UserPermission},
+ stream::{StreamContainer, StreamSpec},
+ user::{PermissionSet, PlayerKind},
Node, NodeID, SourceTrackKind, TrackID,
};
use markup::DynRender;
@@ -45,15 +46,14 @@ impl PlayerConfig {
fn jellynative_url(action: &str, seek: f64, secret: &str, node: &str, session: &str) -> String {
let protocol = if CONF.tls { "https" } else { "http" };
let host = &CONF.hostname;
- let stream_url = "";
- // TODO
- // uri!(r_stream(
- // node,
- // StreamSpec {
- // format: StreamFormat::HlsMaster,
- // ..Default::default()
- // }
- // ));
+ let stream_url = format!(
+ "/n/{node}/stream{}",
+ StreamSpec::HlsMultiVariant {
+ segment: 0,
+ container: StreamContainer::Matroska
+ }
+ .to_query()
+ );
format!("jellynative://{action}/{secret}/{session}/{seek}/{protocol}://{host}{stream_url}",)
}
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
index a34bb8d..52d32f4 100644
--- a/stream/src/fragment.rs
+++ b/stream/src/fragment.rs
@@ -3,37 +3,29 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
-use jellytranscoder::fragment::transcode;
+use jellybase::common::stream::StreamContainer;
use log::warn;
use std::sync::Arc;
-use tokio::{fs::File, io::DuplexStream};
+use tokio::io::DuplexStream;
use tokio_util::io::SyncIoBridge;
pub async fn fragment_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
mut b: DuplexStream,
- perms: &PermissionSet,
- webm: bool,
- track: u64,
- segment: u64,
+ info: Arc<SMediaInfo>,
+ track: usize,
+ segment: usize,
index: usize,
+ format: usize,
+ container: StreamContainer,
) -> Result<()> {
- let local_track = local_tracks
- .first()
- .ok_or(anyhow!("track missing"))?
- .to_owned();
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let path = iinfo.paths[file_index].clone();
// if let Some(profile) = None {
// perms.assert(&UserPermission::Transcode)?;
@@ -70,11 +62,10 @@ pub async fn fragment_stream(
tokio::task::spawn_blocking(move || {
if let Err(err) = jellyremuxer::write_fragment_into(
b,
- &CONF.media_path,
- &node,
- &local_track,
- track as usize,
- webm,
+ &path,
+ track_num,
+ container == StreamContainer::WebM,
+ &info.name.unwrap_or_default(),
index,
) {
warn!("segment stream error: {err}");
diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs
new file mode 100644
index 0000000..6fbddc6
--- /dev/null
+++ b/stream/src/fragment_index.rs
@@ -0,0 +1,32 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use crate::{stream_info, SMediaInfo};
+use anyhow::{anyhow, Result};
+use jellybase::common::stream::{SegmentNum, TrackNum};
+use std::sync::Arc;
+use tokio::io::{AsyncWriteExt, DuplexStream};
+
+pub async fn fragment_index_stream(
+ mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ _segment: SegmentNum,
+ track: TrackNum,
+) -> Result<()> {
+ let (iinfo, _info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+
+ let fragments = tokio::task::spawn_blocking(move || {
+ jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
+ })
+ .await??;
+
+ let out = serde_json::to_string(&fragments)?;
+ tokio::spawn(async move { b.write_all(out.as_bytes()).await });
+ Ok(())
+}
diff --git a/stream/src/hls.rs b/stream/src/hls.rs
index 27630b2..f06ac72 100644
--- a/stream/src/hls.rs
+++ b/stream/src/hls.rs
@@ -4,13 +4,10 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
+use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- stream::{StreamContainer, StreamSpec},
- LocalTrack, Node, SourceTrackKind,
- },
- CONF,
+use jellybase::common::stream::{
+ FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum,
};
use std::{fmt::Write, ops::Range, sync::Arc};
use tokio::{
@@ -19,20 +16,24 @@ use tokio::{
};
pub async fn hls_master_stream(
- node: Arc<Node>,
- _local_tracks: Vec<LocalTrack>,
- segment: u64,
- container: StreamContainer,
mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ segment: SegmentNum,
+ container: StreamContainer,
) -> Result<()> {
- let media = node.media.as_ref().ok_or(anyhow!("no media"))?;
+ let (_iinfo, info) = stream_info(info).await?;
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-VERSION:4")?;
// writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?;
- for (i, t) in media.tracks.iter().enumerate() {
+ for (i, t) in seg.tracks.iter().enumerate() {
let uri = format!(
- "stream?{}",
+ "stream{}",
StreamSpec::HlsVariant {
segment,
track: i,
@@ -42,10 +43,11 @@ pub async fn hls_master_stream(
.to_query()
);
let r#type = match t.kind {
- SourceTrackKind::Video { .. } => "VIDEO",
- SourceTrackKind::Audio { .. } => "AUDIO",
- SourceTrackKind::Subtitles => "SUBTITLES",
+ TrackKind::Video => "VIDEO",
+ TrackKind::Audio => "AUDIO",
+ TrackKind::Subtitle => "SUBTITLES",
};
+ // TODO bw
writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?;
writeln!(out, "{uri}")?;
}
@@ -54,42 +56,44 @@ pub async fn hls_master_stream(
}
pub async fn hls_variant_stream(
- node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
- segment: u64,
- track: usize,
- format: usize,
- container: StreamContainer,
mut b: DuplexStream,
+ info: Arc<SMediaInfo>,
+ segment: SegmentNum,
+ track: TrackNum,
+ format: FormatNum,
+ container: StreamContainer,
) -> Result<()> {
- let local_track = local_tracks.first().ok_or(anyhow!("no track"))?.to_owned();
- let media_info = node.media.to_owned().ok_or(anyhow!("no media?"))?;
+ let (iinfo, info) = stream_info(info).await?;
+ let (file_index, track_num) = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("track not found"))?;
+ let seg = info
+ .segments
+ .get(segment)
+ .ok_or(anyhow!("segment not found"))?;
+
let frags = spawn_blocking(move || {
- jellyremuxer::fragment::fragment_index(
- &CONF.media_path,
- &node,
- &local_track,
- track as usize,
- )
+ jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num)
})
.await??;
let mut out = String::new();
writeln!(out, "#EXTM3U")?;
writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?;
- writeln!(out, "#EXT-X-TARGETDURATION:{}", media_info.duration)?;
+ writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?;
writeln!(out, "#EXT-X-VERSION:4")?;
writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?;
- for (i, Range { start, end }) in frags.iter().enumerate() {
+ for (index, Range { start, end }) in frags.iter().enumerate() {
writeln!(out, "#EXTINF:{:},", end - start)?;
writeln!(
out,
- "stream?{}",
+ "stream{}",
StreamSpec::Fragment {
segment,
track,
- index: i as u64,
+ index,
container,
format,
}
diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs
deleted file mode 100644
index 2a2faec..0000000
--- a/stream/src/jhls.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2025 metamuffin <metamuffin.org>
-*/
-use anyhow::{anyhow, Result};
-use jellybase::{
- common::{
- jhls::JhlsTrackIndex,
- stream::StreamSpec,
- user::{PermissionSet, UserPermission},
- LocalTrack, Node,
- },
- permission::PermissionSetExt,
- CONF,
-};
-use std::sync::Arc;
-use tokio::io::{AsyncWriteExt, DuplexStream};
-
-pub async fn jhls_index(
- node: Arc<Node>,
- local_tracks: &[LocalTrack],
- spec: StreamSpec,
- mut b: DuplexStream,
- perms: &PermissionSet,
-) -> Result<()> {
- // let local_track = local_tracks
- // .first()
- // .ok_or(anyhow!("track missing"))?
- // .to_owned();
-
- // let fragments = tokio::task::spawn_blocking(move || {
- // jellyremuxer::fragment::fragment_index(&CONF.media_path, &node, &local_track, spec.track[0])
- // })
- // .await??;
-
- // let out = serde_json::to_string(&JhlsTrackIndex {
- // extra_profiles: if perms.check(&UserPermission::Transcode) {
- // CONF.transcoding_profiles.clone()
- // } else {
- // vec![]
- // },
- // fragments,
- // })?;
- // tokio::spawn(async move { b.write_all(out.as_bytes()).await });
- Ok(())
-}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index d09759f..a6faf54 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -5,17 +5,20 @@
*/
#![feature(iterator_try_collect)]
pub mod fragment;
+pub mod fragment_index;
pub mod hls;
-pub mod jhls;
pub mod webvtt;
use anyhow::{anyhow, Context, Result};
+use fragment::fragment_stream;
+use fragment_index::fragment_index_stream;
+use hls::{hls_master_stream, hls_variant_stream};
use jellybase::common::{
stream::{
StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec,
StreamTrackInfo, TrackKind,
},
- LocalTrack, Node,
+ Node,
};
use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata};
use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc};
@@ -75,22 +78,26 @@ pub async fn stream(
StreamSpec::Remux { tracks, container } => todo!(),
StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
StreamSpec::HlsSuperMultiVariant { container } => todo!(),
- StreamSpec::HlsMultiVariant { segment, container } => todo!(),
+ StreamSpec::HlsMultiVariant { segment, container } => {
+ hls_master_stream(b, info, segment, container).await?
+ }
StreamSpec::HlsVariant {
segment,
track,
container,
format,
- } => todo!(),
- StreamSpec::Info { segment } => write_stream_info(info, b).await?,
- StreamSpec::FragmentIndex { segment, track } => todo!(),
+ } => hls_variant_stream(b, info, segment, track, format, container).await?,
+ StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?,
+ StreamSpec::FragmentIndex { segment, track } => {
+ fragment_index_stream(b, info, segment, track).await?
+ }
StreamSpec::Fragment {
segment,
track,
index,
container,
format,
- } => todo!(),
+ } => fragment_stream(b, info, track, segment, index, format, container).await?,
}
Ok(a)
@@ -103,7 +110,7 @@ async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>>
pub(crate) struct InternalStreamInfo {
pub paths: Vec<PathBuf>,
pub metadata: Vec<Arc<MatroskaMetadata>>,
- pub track_to_file: Vec<usize>,
+ pub track_to_file: Vec<(usize, u64)>,
}
async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> {
@@ -142,14 +149,19 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, Strea
},
formats,
});
- track_to_file.push(i);
+ track_to_file.push((i, t.track_number));
}
}
}
let segment = StreamSegmentInfo {
name: None,
- duration: 0,
+ duration: metadata[0]
+ .info
+ .as_ref()
+ .unwrap()
+ .duration
+ .unwrap_or_default(),
tracks,
};
Ok((
@@ -173,7 +185,6 @@ async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result
async fn remux_stream(
node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
b: DuplexStream,
@@ -202,8 +213,7 @@ async fn original_stream(
b: DuplexStream,
) -> Result<()> {
let (iinfo, _info) = stream_info(info).await?;
-
- let file_index = *iinfo
+ let (file_index, _) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("unknown track"))?;
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
index fbd6382..960849c 100644
--- a/stream/src/webvtt.rs
+++ b/stream/src/webvtt.rs
@@ -6,7 +6,7 @@
use anyhow::{anyhow, Context, Result};
use jellybase::{
cache::async_cache_memory,
- common::{stream::StreamSpec, LocalTrack, Node},
+ common::{stream::StreamSpec, Node},
CONF,
};
use jellyremuxer::extract::extract_track;
@@ -17,7 +17,6 @@ use tokio::io::{AsyncWriteExt, DuplexStream};
pub async fn vtt_stream(
json: bool,
node: Arc<Node>,
- local_tracks: Vec<LocalTrack>,
spec: StreamSpec,
mut b: DuplexStream,
) -> Result<()> {