diff options
Diffstat (limited to 'stream')
-rw-r--r-- | stream/Cargo.toml | 2 | ||||
-rw-r--r-- | stream/src/cues.rs | 57 | ||||
-rw-r--r-- | stream/src/fragment.rs | 262 | ||||
-rw-r--r-- | stream/src/fragment_index.rs | 53 | ||||
-rw-r--r-- | stream/src/hls.rs | 86 | ||||
-rw-r--r-- | stream/src/lib.rs | 110 | ||||
-rw-r--r-- | stream/src/metadata.rs | 26 | ||||
-rw-r--r-- | stream/src/stream_info.rs | 79 | ||||
-rw-r--r-- | stream/types/src/lib.rs | 78 |
9 files changed, 376 insertions, 377 deletions
diff --git a/stream/Cargo.toml b/stream/Cargo.toml index fb8cfe2..8e71e1c 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -12,7 +12,5 @@ jellystream-types = { path = "types" } log = { workspace = true } anyhow = { workspace = true } -tokio = { version = "1.43.0", features = ["io-util"] } -tokio-util = { version = "0.7.13", features = ["io", "io-util"] } serde_json = "1.0.138" serde = { version = "1.0.217", features = ["derive"] } diff --git a/stream/src/cues.rs b/stream/src/cues.rs new file mode 100644 index 0000000..b486a6f --- /dev/null +++ b/stream/src/cues.rs @@ -0,0 +1,57 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ + +use anyhow::{anyhow, Result}; +use jellycache::cache_memory; +use jellyremuxer::demuxers::create_demuxer_autodetect; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, fs::File, path::Path, sync::Arc}; + +#[derive(Serialize, Deserialize, Default)] +pub struct TrackStat { + pub num_blocks: usize, + pub total_size: u64, +} + +#[derive(Serialize, Deserialize)] +pub struct GeneratedCue { + pub position: u64, + pub time: u64, +} + +#[derive(Serialize, Deserialize)] +pub struct StatsAndCues { + pub stats: BTreeMap<u64, TrackStat>, + pub cues: Vec<GeneratedCue>, +} + +pub fn generate_cues(path: &Path) -> Result<Arc<StatsAndCues>> { + cache_memory("generated-cues", path, move || { + let media = File::open(path)?; + let mut media = + create_demuxer_autodetect(Box::new(media))?.ok_or(anyhow!("media format unknown"))?; + + let info = media.info()?; + media.seek_cluster(None)?; + + let mut stats = BTreeMap::<u64, TrackStat>::new(); + let mut cues = Vec::new(); + + while let Some((position, cluster)) = media.read_cluster()? { + cues.push(GeneratedCue { + position, + time: cluster.timestamp * info.timestamp_scale, + }); + for block in cluster.simple_blocks { + let e = stats.entry(block.track).or_default(); + e.num_blocks += 1; + e.total_size += block.data.len() as u64; + } + } + + Ok(StatsAndCues { stats, cues }) + }) +} diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index 89ce94f..3b4bb0f 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,44 +3,39 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{stream_info, SMediaInfo}; -use anyhow::{anyhow, bail, Result}; -use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; -use jellystream_types::StreamContainer; -use jellytranscoder::fragment::transcode; -use log::warn; +use crate::{cues::generate_cues, stream_info, SMediaInfo}; +use anyhow::{anyhow, Result}; +use jellyremuxer::{ + demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat, +}; +use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum}; use std::{ - io::{Cursor, Seek, SeekFrom}, + fs::File, + io::{Cursor, Read}, sync::Arc, }; -use tokio::{fs::File, io::DuplexStream}; -use tokio_util::io::SyncIoBridge; -pub async fn fragment_stream( - mut b: DuplexStream, +pub fn fragment_stream( info: Arc<SMediaInfo>, - track: usize, - segment: usize, - index: usize, - format_num: usize, + track: TrackNum, + index: IndexNum, + format_num: FormatNum, container: StreamContainer, -) -> Result<()> { - let (iinfo, info) = stream_info(info).await?; +) -> Result<Box<dyn Read + Send + Sync>> { + let (iinfo, info) = stream_info(info)?; + let (file_index, track_num) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("track not found"))?; - let path = iinfo.paths[file_index].clone(); - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; - let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; + let media_path = iinfo.paths[file_index].clone(); + let track = info.tracks.get(track).ok_or(anyhow!("track not found"))?; let format = track .formats .get(format_num) .ok_or(anyhow!("format not found"))?; - let orig_track = iinfo.metadata[file_index] + + let mk_track = iinfo.metadata[file_index] .tracks .as_ref() .unwrap() @@ -49,97 +44,138 @@ pub async fn fragment_stream( .find(|t| t.track_number == track_num) .unwrap(); + let mk_info = matroska::Info { + duration: Some(info.duration), + timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale, + ..Default::default() + }; + let mk_tracks = matroska::Tracks { + entries: vec![mk_track.to_owned()], + }; + + let cue_stat = generate_cues(&media_path)?; + let cluster_offset = cue_stat + .cues + .get(index) + .ok_or(anyhow!("fragment index out of range"))? + .position; + + let cluster = { + let media_file = File::open(media_path)?; + let mut media = create_demuxer_autodetect(Box::new(media_file))? + .ok_or(anyhow!("media container unknown"))?; + media.seek_cluster(Some(cluster_offset))?; + media + .read_cluster()? + .ok_or(anyhow!("cluster unexpectedly missing"))? + .1 + }; + + let jr_container = match container { + StreamContainer::WebM => ContainerFormat::Webm, + StreamContainer::Matroska => ContainerFormat::Matroska, + StreamContainer::WebVTT => todo!(), + StreamContainer::MPEG4 => todo!(), + StreamContainer::JVTT => todo!(), + }; + if format.remux { - match container { - StreamContainer::WebM | StreamContainer::Matroska => { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &path, - track_num, - container == StreamContainer::WebM, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - } - StreamContainer::MPEG4 => { - tokio::task::spawn_blocking(move || { - let mut buf = Cursor::new(Vec::new()); - if let Err(err) = jellyremuxer::write_fragment_into( - &mut buf, - &path, - track_num, - false, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - buf.seek(SeekFrom::Start(0)).unwrap(); - if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { - warn!("mpeg4 transmux failed: {err}"); - } - }); - } - StreamContainer::JVTT => {} - _ => bail!("not yet supported"), - } + let mut out = Vec::new(); + write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?; + Ok(Box::new(Cursor::new(out))) } else { - let location = transcode( - track.kind, - orig_track, - format, - &format!("{path:?} {track_num} {index}"), - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &path, - track_num, - false, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - - let mut frag = File::open(location.abs()).await?; - match container { - StreamContainer::WebM => { - tokio::task::spawn_blocking(move || { - if let Err(err) = - matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - { - warn!("webm transmux failed: {err}"); - } - }); - } - StreamContainer::Matroska => { - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } - StreamContainer::MPEG4 => { - tokio::task::spawn_blocking(move || { - if let Err(err) = - matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - { - warn!("mpeg4 transmux failed: {err}"); - } - }); - } - _ => bail!("unsupported"), - } + todo!() } - Ok(()) + // if format.remux { + // match container { + // StreamContainer::WebM | StreamContainer::Matroska => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &path, + // track_num, + // container == StreamContainer::WebM, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // } + // StreamContainer::MPEG4 => { + // tokio::task::spawn_blocking(move || { + // let mut buf = Cursor::new(Vec::new()); + // if let Err(err) = jellyremuxer::write_fragment_into( + // &mut buf, + // &path, + // track_num, + // false, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // buf.seek(SeekFrom::Start(0)).unwrap(); + // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { + // warn!("mpeg4 transmux failed: {err}"); + // } + // }); + // } + // StreamContainer::JVTT => {} + // _ => bail!("not yet supported"), + // } + // } else { + // let location = transcode( + // track.kind, + // orig_track, + // format, + // &format!("{path:?} {track_num} {index}"), + // move |b| { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &path, + // track_num, + // false, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // }, + // ) + // .await?; + + // let mut frag = File::open(location.abs()).await?; + // match container { + // StreamContainer::WebM => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = + // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + // { + // warn!("webm transmux failed: {err}"); + // } + // }); + // } + // StreamContainer::Matroska => { + // tokio::task::spawn(async move { + // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { + // warn!("cannot write stream: {err}") + // } + // }); + // } + // StreamContainer::MPEG4 => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = + // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + // { + // warn!("mpeg4 transmux failed: {err}"); + // } + // }); + // } + // _ => bail!("unsupported"), + // } + // } } diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs index cb54948..9d82cd7 100644 --- a/stream/src/fragment_index.rs +++ b/stream/src/fragment_index.rs @@ -3,30 +3,45 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{stream_info, SMediaInfo}; +use crate::{cues::generate_cues, stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellystream_types::{SegmentNum, TrackNum}; -use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; +use jellystream_types::TrackNum; +use std::{ + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; -pub async fn fragment_index_stream( - mut b: DuplexStream, - info: Arc<SMediaInfo>, - _segment: SegmentNum, - track: TrackNum, -) -> Result<()> { - let (iinfo, _info) = stream_info(info).await?; - let (file_index, track_num) = *iinfo +pub fn fragment_index(info: Arc<SMediaInfo>, track: TrackNum) -> Result<Vec<Range<f64>>> { + let (iinfo, info) = stream_info(info)?; + let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("track not found"))?; - let fragments = tokio::task::spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) - }) - .await??; + let cue_stat = generate_cues(&iinfo.paths[file_index])?; + + Ok(cue_stat + .cues + .iter() + .map(|c| c.time as f64 / 1_000_000_000.) + .zip( + cue_stat + .cues + .iter() + .skip(1) + .map(|c| c.time as f64 / 1_000_000_000.) + .chain([info.duration]), + ) + .map(|(start, end)| start..end) + .collect()) +} - let out = serde_json::to_string(&fragments)?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) +pub fn fragment_index_stream( + info: Arc<SMediaInfo>, + track: TrackNum, +) -> Result<Box<dyn Read + Send + Sync>> { + Ok(Box::new(Cursor::new(serde_json::to_string( + &fragment_index(info, track)?, + )?))) } diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 949ddb4..2c91365 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -4,60 +4,30 @@ Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{stream_info, SMediaInfo}; -use anyhow::{anyhow, Result}; -use jellystream_types::{FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum}; -use std::{fmt::Write, ops::Range, sync::Arc}; -use tokio::{ - io::{AsyncWriteExt, DuplexStream}, - task::spawn_blocking, +use crate::{fragment_index::fragment_index, stream_info, SMediaInfo}; +use anyhow::Result; +use jellystream_types::{FormatNum, StreamContainer, StreamSpec, TrackKind, TrackNum}; +use std::{ + fmt::Write, + io::{Cursor, Read}, + ops::Range, + sync::Arc, }; -pub async fn hls_supermultivariant_stream( - mut b: DuplexStream, +pub fn hls_multivariant_stream( info: Arc<SMediaInfo>, container: StreamContainer, -) -> Result<()> { - let (_iinfo, info) = stream_info(info).await?; - let mut out = String::new(); - writeln!(out, "#EXTM3U")?; - writeln!(out, "#EXT-X-VERSION:4")?; - for (i, _seg) in info.segments.iter().enumerate() { - let uri = format!( - "stream{}", - StreamSpec::HlsMultiVariant { - segment: i, - container, - } - .to_query() - ); - writeln!(out, "{uri}")?; - } - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) -} - -pub async fn hls_multivariant_stream( - mut b: DuplexStream, - info: Arc<SMediaInfo>, - segment: SegmentNum, - container: StreamContainer, -) -> Result<()> { - let (_iinfo, info) = stream_info(info).await?; - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; +) -> Result<Box<dyn Read + Send + Sync>> { + let (_iinfo, info) = stream_info(info)?; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-VERSION:4")?; // writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?; - for (i, t) in seg.tracks.iter().enumerate() { + for (i, t) in info.tracks.iter().enumerate() { let uri = format!( "stream{}", StreamSpec::HlsVariant { - segment, track: i, container, format: 0 @@ -73,37 +43,23 @@ pub async fn hls_multivariant_stream( writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?; writeln!(out, "{uri}")?; } - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) + + Ok(Box::new(Cursor::new(out))) } -pub async fn hls_variant_stream( - mut b: DuplexStream, +pub fn hls_variant_stream( info: Arc<SMediaInfo>, - segment: SegmentNum, track: TrackNum, format: FormatNum, container: StreamContainer, -) -> Result<()> { - let (iinfo, info) = stream_info(info).await?; - let (file_index, track_num) = *iinfo - .track_to_file - .get(track) - .ok_or(anyhow!("track not found"))?; - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; - - let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) - }) - .await??; +) -> Result<Box<dyn Read + Send + Sync>> { + let frags = fragment_index(info.clone(), track)?; + let (_, info) = stream_info(info)?; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?; - writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?; + writeln!(out, "#EXT-X-TARGETDURATION:{}", info.duration)?; writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; @@ -113,7 +69,6 @@ pub async fn hls_variant_stream( out, "stream{}", StreamSpec::Fragment { - segment, track, index, container, @@ -125,6 +80,5 @@ pub async fn hls_variant_stream( writeln!(out, "#EXT-X-ENDLIST")?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) + Ok(Box::new(Cursor::new(out))) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 5b4e8ed..60c283c 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -4,30 +4,29 @@ Copyright (C) 2025 metamuffin <metamuffin.org> */ #![feature(iterator_try_collect)] +pub mod cues; mod fragment; mod fragment_index; mod hls; +pub mod metadata; mod stream_info; mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; -use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; +use hls::{hls_multivariant_stream, hls_variant_stream}; use jellystream_types::{StreamContainer, StreamSpec}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeSet, - io::SeekFrom, + fs::File, + io::{Read, Seek, SeekFrom}, ops::Range, path::PathBuf, sync::{Arc, LazyLock, Mutex}, }; use stream_info::{stream_info, write_stream_info}; -use tokio::{ - fs::File, - io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, -}; #[rustfmt::skip] #[derive(Debug, Deserialize, Serialize, Default)] @@ -60,98 +59,69 @@ pub struct StreamHead { } pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let cons = |ct: &'static str, rs: bool| StreamHead { - content_type: ct, - range_supported: rs, - }; + use StreamContainer::*; + use StreamSpec::*; let container_ct = |x: StreamContainer| match x { - StreamContainer::WebM => "video/webm", - StreamContainer::Matroska => "video/x-matroska", - StreamContainer::WebVTT => "text/vtt", - StreamContainer::JVTT => "application/jellything-vtt+json", - StreamContainer::MPEG4 => "video/mp4", + WebM => "video/webm", + Matroska => "video/x-matroska", + WebVTT => "text/vtt", + JVTT => "application/jellything-vtt+json", + MPEG4 => "video/mp4", }; - match spec { - StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), - StreamSpec::Original { .. } => cons("video/x-matroska", true), - StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), - StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), - StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), + let range_supported = matches!(spec, Remux { .. } | Original { .. }); + let content_type = match spec { + Original { .. } => "video/x-matroska", + HlsMultiVariant { .. } => "application/vnd.apple.mpegurl", + HlsVariant { .. } => "application/vnd.apple.mpegurl", + Info { .. } => "application/jellything-stream-info+json", + FragmentIndex { .. } => "application/jellything-frag-index+json", + Fragment { container, .. } => container_ct(*container), + Remux { container, .. } => container_ct(*container), + }; + StreamHead { + content_type, + range_supported, } } -pub async fn stream( +pub fn stream( info: Arc<SMediaInfo>, spec: StreamSpec, - range: Range<usize>, -) -> Result<DuplexStream> { - let (a, b) = duplex(4096); - + range: Range<u64>, +) -> Result<Box<dyn Read + Send + Sync>> { match spec { - StreamSpec::Original { track } => original_stream(info, track, range, b).await?, - StreamSpec::HlsSuperMultiVariant { container } => { - hls_supermultivariant_stream(b, info, container).await?; - } - StreamSpec::HlsMultiVariant { segment, container } => { - hls_multivariant_stream(b, info, segment, container).await? - } + StreamSpec::Original { track } => original_stream(info, track, range), + StreamSpec::HlsMultiVariant { container } => hls_multivariant_stream(info, container), StreamSpec::HlsVariant { - segment, track, container, format, - } => hls_variant_stream(b, info, segment, track, format, container).await?, - StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, - StreamSpec::FragmentIndex { segment, track } => { - fragment_index_stream(b, info, segment, track).await? - } + } => hls_variant_stream(info, track, format, container), + StreamSpec::Info => write_stream_info(info), + StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track), StreamSpec::Fragment { - segment, track, index, container, format, - } => fragment_stream(b, info, track, segment, index, format, container).await?, + } => fragment_stream(info, track, index, format, container), _ => bail!("todo"), } - - Ok(a) } -async fn original_stream( +fn original_stream( info: Arc<SMediaInfo>, track: usize, - range: Range<usize>, - b: DuplexStream, -) -> Result<()> { - let (iinfo, _info) = stream_info(info).await?; + range: Range<u64>, +) -> Result<Box<dyn Read+ Send + Sync>> { + let (iinfo, _info) = stream_info(info)?; let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; - let mut file = File::open(&iinfo.paths[file_index]) - .await - .context("opening source")?; + let mut file = File::open(&iinfo.paths[file_index]).context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) - .await .context("seek source")?; - tokio::task::spawn(copy_stream(file, b, range.end - range.start)); - - Ok(()) -} - -async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { - let mut buf = [0u8; 4096]; - loop { - let size = inp.read(&mut buf[..amount.min(4096)]).await?; - if size == 0 { - break Ok(()); - } - out.write_all(&buf[..size]).await?; - amount -= size; - } + Ok(Box::new(file.take(range.end - range.start))) } diff --git a/stream/src/metadata.rs b/stream/src/metadata.rs new file mode 100644 index 0000000..9bfa3aa --- /dev/null +++ b/stream/src/metadata.rs @@ -0,0 +1,26 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ + +use anyhow::{anyhow, Result}; +use jellycache::cache_memory; +use jellyremuxer::{demuxers::create_demuxer_autodetect, matroska::Segment}; +use std::{fs::File, path::Path, sync::Arc}; + +pub fn read_metadata(path: &Path) -> Result<Arc<Segment>> { + cache_memory("mkmeta-v4", path, move || { + let media = File::open(path)?; + let mut media = + create_demuxer_autodetect(Box::new(media))?.ok_or(anyhow!("media format unknown"))?; + + let info = media.info()?; + let tracks = media.tracks()?; + Ok(Segment { + info, + tracks, + ..Default::default() + }) + }) +} diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index 560ec9b..7ebc399 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -3,59 +3,51 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -use crate::{SMediaInfo, CONF}; +use crate::{cues::generate_cues, metadata::read_metadata, SMediaInfo, CONF}; use anyhow::Result; -use jellyremuxer::{ - metadata::{matroska_metadata, MatroskaMetadata, MatroskaTrackEntry}, - seek_index::get_track_sizes, -}; +use jellyremuxer::matroska::{self, Segment, TrackEntry, TrackType}; use jellystream_types::{ - StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, TrackKind, + StreamContainer, StreamFormatInfo, StreamInfo, StreamTrackInfo, TrackKind, }; -use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; -use tokio::{ - io::{AsyncWriteExt, DuplexStream}, - spawn, - task::spawn_blocking, +use std::{ + io::{Cursor, Read}, + path::PathBuf, + sync::Arc, }; -async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<MatroskaMetadata>> { - spawn_blocking(move || matroska_metadata(&path)).await? -} - -async fn async_get_track_sizes(path: PathBuf) -> Result<BTreeMap<u64, usize>> { - spawn_blocking(move || get_track_sizes(&path)).await? -} - pub(crate) struct InternalStreamInfo { pub paths: Vec<PathBuf>, - pub metadata: Vec<Arc<MatroskaMetadata>>, + pub metadata: Vec<Arc<Segment>>, pub track_to_file: Vec<(usize, u64)>, } // TODO cache mem -pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> { +pub(crate) fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> { let mut tracks = Vec::new(); let mut track_to_file = Vec::new(); let mut metadata_arr = Vec::new(); let mut paths = Vec::new(); for (i, path) in info.files.iter().enumerate() { - let metadata = async_matroska_metadata(path.clone()).await?; - let sizes = async_get_track_sizes(path.clone()).await?; + let metadata = read_metadata(&path)?; + let cue_stat = generate_cues(&path)?; if let Some(t) = &metadata.tracks { - let duration = media_duration(&metadata); + let duration = media_duration(&metadata.info); for t in &t.entries { - let bitrate = - sizes.get(&t.track_number).copied().unwrap_or_default() as f64 / duration * 8.; + let byterate = cue_stat + .stats + .get(&t.track_number) + .map(|e| e.total_size) + .unwrap_or_default() as f64 + / duration; tracks.push(StreamTrackInfo { name: None, kind: match t.track_type { - 1 => TrackKind::Video, - 2 => TrackKind::Audio, - 17 => TrackKind::Subtitle, + matroska::TrackType::Video => TrackKind::Video, + matroska::TrackType::Audio => TrackKind::Audio, + matroska::TrackType::Subtitle => TrackKind::Subtitle, _ => todo!(), }, - formats: stream_formats(t, bitrate), + formats: stream_formats(t, byterate * 8.), }); track_to_file.push((i, t.track_number)); } @@ -64,11 +56,7 @@ pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStream paths.push(path.to_owned()); } - let segment = StreamSegmentInfo { - name: None, - duration: media_duration(&metadata_arr[0]), - tracks, - }; + let duration = media_duration(&metadata_arr[0].info); // TODO different durations?! Ok(( InternalStreamInfo { metadata: metadata_arr, @@ -77,12 +65,13 @@ pub(crate) async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStream }, StreamInfo { name: info.title.clone(), - segments: vec![segment], + duration, + tracks, }, )) } -fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> { +fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> { let mut formats = Vec::new(); formats.push(StreamFormatInfo { codec: t.codec_id.to_string(), @@ -97,7 +86,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamForma }); match t.track_type { - 1 => { + TrackType::Video => { let sw = t.video.as_ref().unwrap().pixel_width; let sh = t.video.as_ref().unwrap().pixel_height; for (w, br) in [ @@ -136,7 +125,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamForma } } } - 2 => { + TrackType::Audio => { for br in [256e3, 128e3, 64e3] { formats.push(StreamFormatInfo { codec: "A_OPUS".to_string(), @@ -151,7 +140,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec<StreamForma }); } } - 17 => {} + TrackType::Subtitle => {} _ => {} } @@ -168,13 +157,11 @@ fn containers_by_codec(codec: &str) -> Vec<StreamContainer> { } } -pub(crate) async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> { - let (_, info) = stream_info(info).await?; - spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await }); - Ok(()) +pub(crate) fn write_stream_info(info: Arc<SMediaInfo>) -> Result<Box<dyn Read + Send + Sync>> { + let (_, info) = stream_info(info)?; + Ok(Box::new(Cursor::new(serde_json::to_vec(&info)?))) } -fn media_duration(m: &MatroskaMetadata) -> f64 { - let info = m.info.as_ref().unwrap(); +fn media_duration(info: &matroska::Info) -> f64 { (info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000. } diff --git a/stream/types/src/lib.rs b/stream/types/src/lib.rs index a90db03..a031f3a 100644 --- a/stream/types/src/lib.rs +++ b/stream/types/src/lib.rs @@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, fmt::Display, str::FromStr}; -pub type SegmentNum = usize; pub type TrackNum = usize; pub type FormatNum = usize; pub type IndexNum = usize; @@ -27,28 +26,19 @@ pub enum StreamSpec { Original { track: TrackNum, }, - HlsSuperMultiVariant { - container: StreamContainer, - }, HlsMultiVariant { - segment: SegmentNum, container: StreamContainer, }, HlsVariant { - segment: SegmentNum, track: TrackNum, container: StreamContainer, format: FormatNum, }, - Info { - segment: Option<u64>, - }, + Info, FragmentIndex { - segment: SegmentNum, track: TrackNum, }, Fragment { - segment: SegmentNum, track: TrackNum, index: IndexNum, container: StreamContainer, @@ -65,12 +55,6 @@ pub enum StreamSpec { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct StreamInfo { pub name: Option<String>, - pub segments: Vec<StreamSegmentInfo>, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct StreamSegmentInfo { - pub name: Option<String>, pub duration: f64, pub tracks: Vec<StreamTrackInfo>, } @@ -128,35 +112,25 @@ impl StreamSpec { ) } StreamSpec::Original { track } => format!("?original&track={track}"), - StreamSpec::HlsSuperMultiVariant { container } => { - format!("?hlssupermultivariant&container={container}") - } - StreamSpec::HlsMultiVariant { segment, container } => { - format!("?hlsmultivariant&segment={segment}&container={container}") + StreamSpec::HlsMultiVariant { container } => { + format!("?hlsmultivariant&container={container}") } StreamSpec::HlsVariant { - segment, track, container, format, - } => format!( - "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}" - ), - StreamSpec::Info { - segment: Some(segment), - } => format!("?info&segment={segment}"), - StreamSpec::Info { segment: None } => "?info".to_string(), - StreamSpec::FragmentIndex { segment, track } => { - format!("?fragmentindex&segment={segment}&track={track}") + } => format!("?hlsvariant&track={track}&container={container}&format={format}"), + StreamSpec::Info => "?info".to_string(), + StreamSpec::FragmentIndex { track } => { + format!("?fragmentindex&track={track}") } StreamSpec::Fragment { - segment, track, index, container, format, } => format!( - "?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}" + "?fragment&track={track}&index={index}&container={container}&format={format}" ), } } @@ -173,32 +147,24 @@ impl StreamSpec { ) } StreamSpec::Original { track } => format!("?original&t={track}"), - StreamSpec::HlsSuperMultiVariant { container } => { - format!("?hlssupermultivariant&c={container}") - } - StreamSpec::HlsMultiVariant { segment, container } => { - format!("?hlsmultivariant&s={segment}&c={container}") + StreamSpec::HlsMultiVariant { container } => { + format!("?hlsmultivariant&c={container}") } StreamSpec::HlsVariant { - segment, track, container, format, - } => format!("?hlsvariant&s={segment}&t={track}&c={container}&f={format}"), - StreamSpec::Info { - segment: Some(segment), - } => format!("?info&s={segment}"), - StreamSpec::Info { segment: None } => "?info".to_string(), - StreamSpec::FragmentIndex { segment, track } => { - format!("?fragmentindex&s={segment}&t={track}") + } => format!("?hlsvariant&t={track}&c={container}&f={format}"), + StreamSpec::Info => "?info".to_string(), + StreamSpec::FragmentIndex { track } => { + format!("?fragmentindex&t={track}") } StreamSpec::Fragment { - segment, track, index, container, format, - } => format!("?fragment&s={segment}&t={track}&i={index}&c={container}&f={format}"), + } => format!("?fragment&t={track}&i={index}&c={container}&f={format}"), } } pub fn from_query_kv(query: &BTreeMap<String, String>) -> Result<Self, &'static str> { @@ -207,7 +173,7 @@ impl StreamSpec { .get(k) .or(query.get(ks)) .ok_or(k) - .and_then(|a| a.parse().map_err(|_| "invalid number")) + .and_then(|a| a.parse::<usize>().map_err(|_| "invalid number")) }; let get_container = || { query @@ -217,28 +183,19 @@ impl StreamSpec { .and_then(|s| s.parse().map_err(|()| "unknown container")) }; if query.contains_key("info") { - Ok(Self::Info { - segment: get_num("segment", "s").ok(), - }) - } else if query.contains_key("hlssupermultivariant") { - Ok(Self::HlsSuperMultiVariant { - container: get_container().ok().unwrap_or(StreamContainer::Matroska), - }) + Ok(Self::Info) } else if query.contains_key("hlsmultivariant") { Ok(Self::HlsMultiVariant { - segment: get_num("segment", "s")? as SegmentNum, container: get_container()?, }) } else if query.contains_key("hlsvariant") { Ok(Self::HlsVariant { - segment: get_num("segment", "s")? as SegmentNum, track: get_num("track", "t")? as TrackNum, format: get_num("format", "f")? as FormatNum, container: get_container()?, }) } else if query.contains_key("fragment") { Ok(Self::Fragment { - segment: get_num("segment", "s")? as SegmentNum, track: get_num("track", "t")? as TrackNum, format: get_num("format", "f")? as FormatNum, index: get_num("index", "i")? as IndexNum, @@ -246,7 +203,6 @@ impl StreamSpec { }) } else if query.contains_key("fragmentindex") { Ok(Self::FragmentIndex { - segment: get_num("segment", "s")? as SegmentNum, track: get_num("track", "t")? as TrackNum, }) } else { |