aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--remuxer/src/snippet.rs94
-rw-r--r--stream/src/lib.rs7
-rw-r--r--stream/src/segment.rs32
3 files changed, 125 insertions, 8 deletions
diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs
index 65d63ff..186be19 100644
--- a/remuxer/src/snippet.rs
+++ b/remuxer/src/snippet.rs
@@ -2,4 +2,96 @@
This file is part of jellything (https://codeberg.org/metamuffin/jellything)
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
-*/ \ No newline at end of file
+*/
+
+use crate::ebml_track_entry;
+use crate::segment_extractor::SegmentExtractIter;
+use crate::{ebml_header, ebml_segment_info};
+use anyhow::anyhow;
+use anyhow::Context;
+use jellycommon::{seek_index::SeekIndex, LocalTrack, NodePublic};
+use jellymatroska::{read::EbmlReader, write::EbmlWriter, Master, MatroskaTag};
+use log::{debug, info};
+use std::{fs::File, io::Write, path::PathBuf};
+
+const SNIPPET_LENGTH: f64 = 2.;
+
+pub fn write_snippet_into(
+ writer: impl Write,
+ path_base: PathBuf,
+ item: NodePublic,
+ track_sources: Vec<LocalTrack>,
+ track: usize,
+ webm: bool,
+ n: usize,
+) -> anyhow::Result<()> {
+ info!("writing snippet {n} of {:?} (track #{track})", item.title);
+ let mut output = EbmlWriter::new(writer, 0);
+
+ let media_info = item.media.as_ref().unwrap();
+ let info = media_info
+ .tracks
+ .get(track)
+ .ok_or(anyhow!("track not available"))?
+ .to_owned();
+ let private = &track_sources[track];
+ let source_path = path_base.join(&private.path);
+ let mapped = 1;
+ info!("\t- {n} {source_path:?} ({} => {mapped})", private.track);
+ info!("\t {}", info);
+ let file = File::open(&source_path).context("opening source file")?;
+ let mut index = File::open(source_path.with_extension(format!("si.{}", private.track)))
+ .context("opening seek index file")?;
+ let index =
+ bincode::decode_from_std_read::<SeekIndex, _, _>(&mut index, bincode::config::standard())?;
+ debug!("\t seek index: {} blocks loaded", index.blocks.len());
+ let mut reader = EbmlReader::new(file);
+
+ // TODO maybe refactor this to approx. contant time per snippet
+ let average_kf_interval = media_info.duration / index.keyframes.len() as f64;
+ let kf_per_snip = (SNIPPET_LENGTH / average_kf_interval).ceil() as usize;
+ info!("average keyframe interval: {average_kf_interval}");
+ info!(" => keyframes per snippet {kf_per_snip}");
+
+ let start_block_index = *index
+ .keyframes
+ .get(n * kf_per_snip)
+ .ok_or(anyhow!("snippet index out of range"))?;
+ let end_block_index = *index
+ .keyframes
+ .get((n + 1) * kf_per_snip)
+ .unwrap_or(&index.blocks.len());
+ let start_block = &index.blocks[start_block_index];
+ let last_block = &index.blocks[end_block_index - 1];
+
+ reader.seek(start_block.source_off, MatroskaTag::Cluster(Master::Start))?;
+
+ output.write_tag(&ebml_header(webm))?;
+ output.write_tag(&MatroskaTag::Segment(Master::Start))?;
+ output.write_tag(&ebml_segment_info(
+ format!("{} (snippet {n})", item.title),
+ (last_block.pts - start_block.pts) as f64 / 1000.,
+ ))?;
+ output.write_tag(&MatroskaTag::Tags(Master::Collected(vec![])))?;
+ output.write_tag(&MatroskaTag::Tracks(Master::Collected(vec![
+ ebml_track_entry(mapped, &info, private.codec_private.clone()),
+ ])))?;
+
+ let mut reader = SegmentExtractIter::new(&mut reader, private.track as u64);
+
+ let mut blocks = Vec::new();
+ for i in start_block_index..end_block_index {
+ let index_block = &index.blocks[i];
+ let mut block = reader.next()?;
+
+ assert_eq!(index_block.size, block.data.len(), "seek index is wrong");
+
+ block.track = 1;
+ block.timestamp_off = (index_block.pts - start_block.pts).try_into().unwrap();
+ blocks.push(MatroskaTag::SimpleBlock(block.dump()))
+ }
+ output.write_tag(&MatroskaTag::Cluster(Master::Collected(blocks)))?;
+
+ output.write_tag(&MatroskaTag::Segment(Master::End))?;
+ Ok(())
+}
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 5f9edb9..726f1e8 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -12,6 +12,7 @@ use jellycommon::{
stream::{StreamFormat, StreamSpec},
LocalTrack, MediaSource, Node,
};
+use segment::stream_segment;
use std::{io::SeekFrom, ops::Range};
use tokio::{
fs::File,
@@ -52,9 +53,9 @@ pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result
match spec.format {
StreamFormat::Original => original_stream(track_sources, spec, range, b).await?,
StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?,
- StreamFormat::Hls => todo!(),
- StreamFormat::Jhls => todo!(),
- StreamFormat::Segment => todo!(),
+ StreamFormat::Hls => bail!("unsupported"),
+ StreamFormat::Jhls => bail!("unsupported"),
+ StreamFormat::Segment => stream_segment(node, track_sources, spec, b).await?,
}
Ok(a)
diff --git a/stream/src/segment.rs b/stream/src/segment.rs
index ed4f5ef..d09d357 100644
--- a/stream/src/segment.rs
+++ b/stream/src/segment.rs
@@ -3,16 +3,40 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
+use anyhow::{anyhow, bail, Result};
+use jellybase::CONF;
use jellycommon::{stream::StreamSpec, LocalTrack, Node};
-use std::ops::Range;
+use log::warn;
use tokio::io::DuplexStream;
+use tokio_util::io::SyncIoBridge;
pub async fn stream_segment(
node: Node,
track_sources: Vec<LocalTrack>,
spec: StreamSpec,
- range: Range<usize>,
b: DuplexStream,
-) {
-
+) -> Result<()> {
+ let b = SyncIoBridge::new(b);
+
+ if spec.tracks.len() != 1 {
+ bail!("unsupported number of tracks for segment, must be exactly one");
+ }
+ let track = spec.tracks[0];
+ let n = spec.index.ok_or(anyhow!("segment index missing"))?;
+
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = jellyremuxer::write_snippet_into(
+ b,
+ CONF.library_path.clone(),
+ node.public.clone(),
+ track_sources,
+ track,
+ spec.webm.unwrap_or(false),
+ n,
+ ) {
+ warn!("segment stream error: {err}");
+ }
+ });
+
+ Ok(())
}