aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/src/stream.rs2
-rw-r--r--common/src/user.rs2
-rw-r--r--remuxer/src/extract.rs119
-rw-r--r--remuxer/src/lib.rs1
-rw-r--r--stream/src/lib.rs4
-rw-r--r--stream/src/webvtt.rs37
-rw-r--r--transcoder/src/lib.rs1
-rw-r--r--transcoder/src/subtitles.rs48
8 files changed, 213 insertions, 1 deletions
diff --git a/common/src/stream.rs b/common/src/stream.rs
index 6c93294..aa1b931 100644
--- a/common/src/stream.rs
+++ b/common/src/stream.rs
@@ -28,6 +28,7 @@ pub enum StreamFormat {
#[cfg_attr(feature = "rocket", field(value = "hlsvariant"))] HlsVariant,
#[cfg_attr(feature = "rocket", field(value = "jhls"))] Jhls,
#[cfg_attr(feature = "rocket", field(value = "hlsseg"))] Segment,
+ #[cfg_attr(feature = "rocket", field(value = "webvtt"))] Webvtt,
}
impl Default for StreamSpec {
@@ -82,6 +83,7 @@ impl StreamFormat {
StreamFormat::HlsVariant => "hlsvariant",
StreamFormat::Jhls => "jhls",
StreamFormat::Segment => "hlsseg",
+ StreamFormat::Webvtt => "webvtt",
}
}
}
diff --git a/common/src/user.rs b/common/src/user.rs
index 2b67de5..ae9a757 100644
--- a/common/src/user.rs
+++ b/common/src/user.rs
@@ -58,7 +58,7 @@ impl UserPermission {
self,
Transcode
| FederatedContent
- | StreamFormat(Jhls | HlsMaster | HlsVariant | Matroska | Segment)
+ | StreamFormat(Jhls | HlsMaster | HlsVariant | Matroska | Segment | Webvtt)
)
}
}
diff --git a/remuxer/src/extract.rs b/remuxer/src/extract.rs
new file mode 100644
index 0000000..66c0526
--- /dev/null
+++ b/remuxer/src/extract.rs
@@ -0,0 +1,119 @@
+use crate::seek_index::get_seek_index;
+use anyhow::{anyhow, bail};
+use jellycommon::LocalTrack;
+use jellymatroska::{block::Block, read::EbmlReader, unflatten::IterWithPos, Master, MatroskaTag};
+use log::{debug, info};
+use std::{fs::File, io::BufReader, path::PathBuf};
+
+pub fn extract_track(
+ path_base: PathBuf,
+ track_info: LocalTrack,
+) -> anyhow::Result<Vec<(u64, u64, Vec<u8>)>> {
+ let source_path = path_base.join(track_info.path);
+ let file = File::open(&source_path)?;
+ let mut reader = EbmlReader::new(BufReader::new(file));
+ let index = get_seek_index(&source_path)?;
+ let index = index
+ .get(&(track_info.track as u64))
+ .ok_or(anyhow!("track missing"))?;
+
+ let mut out = Vec::new();
+ for b in &index.blocks {
+ reader.seek(b.source_off, MatroskaTag::Cluster(Master::Start))?;
+ let (duration, block) = read_group(&mut reader)?;
+ out.push((b.pts, duration, block.data))
+ }
+ Ok(out)
+}
+
+pub fn read_group(segment: &mut EbmlReader) -> anyhow::Result<(u64, Block)> {
+ let (mut dur, mut block) = (None, None);
+ loop {
+ let item = segment.next().ok_or(anyhow!("eof"))??;
+ match item {
+ MatroskaTag::Void(_) => (),
+ MatroskaTag::Crc32(_) => (),
+ MatroskaTag::Cluster(_) => (),
+ MatroskaTag::Timestamp(_) => (),
+ MatroskaTag::SimpleBlock(_buf) => {
+ // bail!("unexpected simpleblock, where a group was expected")
+ }
+ MatroskaTag::BlockGroup(Master::Start) => (),
+ MatroskaTag::BlockGroup(Master::End) => return Ok((dur.unwrap(), block.unwrap())),
+ MatroskaTag::BlockDuration(duration) => dur = Some(duration),
+ MatroskaTag::Block(buf) => block = Some(Block::parse(&buf)?),
+ MatroskaTag::Cues(_) => bail!("reached cues, this is the end"),
+ MatroskaTag::Segment(Master::End) => info!("extractor reached segment end"),
+ _ => debug!("(rs) tag ignored: {item:?}"),
+ }
+ }
+}
+
+// fn import_seek_index_segment(
+// segment: &mut Unflatten,
+// seek_index: &mut BTreeMap<u64, SeekIndex>,
+// ) -> Result<()> {
+// while let Some(Ok(Unflat { children, item, .. })) = segment.n() {
+// match item {
+// MatroskaTag::SeekHead(_) => {}
+// MatroskaTag::Info(_) => {}
+// MatroskaTag::Tags(_) => {}
+// MatroskaTag::Cues(_) => {}
+// MatroskaTag::Chapters(_) => {}
+// MatroskaTag::Tracks(_) => {}
+// MatroskaTag::Void(_) => {}
+// MatroskaTag::Cluster(_) => {
+// let mut children = children.unwrap();
+// let mut pts = 0;
+// let mut position = children.position();
+
+// loop {
+// if let Some(Ok(Unflat { children, item, .. })) = children.n() {
+// match item {
+// MatroskaTag::Timestamp(ts) => pts = ts,
+// MatroskaTag::BlockGroup(_) => {
+// trace!("group");
+// let mut children = children.unwrap();
+// // let position = children.position(); //? TODO where should this point to? cluster or block? // probably block
+// while let Some(Ok(Unflat {
+// children: _,
+// item,
+// position,
+// })) = children.n()
+// {
+// match item {
+// MatroskaTag::Block(ref buf) => {
+// let block = Block::parse(buf)?;
+// debug!(
+// "block: track={} tso={}",
+// block.track, block.timestamp_off
+// );
+// seek_index_add(seek_index, &block, position, pts);
+// }
+// _ => trace!("{item:?}"),
+// }
+// }
+// }
+// MatroskaTag::SimpleBlock(buf) => {
+// let block = Block::parse(&buf)?;
+// trace!(
+// "simple block: track={} tso={}",
+// block.track,
+// block.timestamp_off
+// );
+// trace!("{pts} {}", block.timestamp_off);
+// seek_index_add(seek_index, &block, position, pts);
+// }
+// _ => trace!("(rsc) tag ignored: {item:?}"),
+// }
+// } else {
+// break;
+// }
+// position = children.position();
+// }
+// }
+// _ => debug!("(rs) tag ignored: {item:?}"),
+// };
+// }
+// Ok(())
+// }
diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs
index e1f8c80..88f1916 100644
--- a/remuxer/src/lib.rs
+++ b/remuxer/src/lib.rs
@@ -9,6 +9,7 @@ pub mod seek_index;
pub mod segment_extractor;
pub mod snippet;
pub mod trim_writer;
+pub mod extract;
pub use remux::remux_stream_into;
pub use snippet::write_snippet_into;
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index e9b9a4b..604e3eb 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -7,6 +7,7 @@
pub mod hls;
pub mod jhls;
pub mod segment;
+pub mod webvtt;
use anyhow::{anyhow, bail, Context, Result};
use hls::{hls_master_stream, hls_variant_stream};
@@ -24,6 +25,7 @@ use tokio::{
io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
};
use tokio_util::io::SyncIoBridge;
+use webvtt::webvtt_stream;
pub struct StreamHead {
pub content_type: &'static str,
@@ -38,6 +40,7 @@ pub fn stream_head(spec: &StreamSpec) -> StreamHead {
StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
StreamFormat::HlsMaster | StreamFormat::HlsVariant => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
StreamFormat::Jhls => StreamHead { content_type: "application/jellything-jhls+json", range_supported: false },
+ StreamFormat::Webvtt => StreamHead { content_type: "text/vtt", range_supported: false },
StreamFormat::Segment => StreamHead { content_type: webm_or_mkv, range_supported: false },
}
}
@@ -69,6 +72,7 @@ pub async fn stream(
StreamFormat::HlsVariant => hls_variant_stream(node, track_sources, spec, b).await?,
StreamFormat::Jhls => jhls_stream(node, track_sources, spec, b, perms).await?,
StreamFormat::Segment => segment_stream(node, track_sources, spec, b, perms).await?,
+ StreamFormat::Webvtt => webvtt_stream(node, track_sources, spec, b).await?,
}
Ok(a)
diff --git a/stream/src/webvtt.rs b/stream/src/webvtt.rs
new file mode 100644
index 0000000..74a12a5
--- /dev/null
+++ b/stream/src/webvtt.rs
@@ -0,0 +1,37 @@
+use anyhow::{anyhow, Context, Result};
+use jellybase::CONF;
+use jellycommon::{stream::StreamSpec, LocalTrack, Node};
+use jellyremuxer::extract::extract_track;
+use jellytranscoder::subtitles::webvtt_from_ass_blocks;
+use tokio::io::{AsyncWriteExt, DuplexStream};
+
+pub async fn webvtt_stream(
+ node: Node,
+ track_sources: Vec<LocalTrack>,
+ spec: StreamSpec,
+ mut b: DuplexStream,
+) -> Result<()> {
+ // TODO cache
+
+ let local_track = track_sources
+ .get(*spec.tracks.get(0).ok_or(anyhow!("no track selected"))?)
+ .ok_or(anyhow!("track does not exist"))?
+ .clone();
+
+ let codec_private = local_track
+ .codec_private
+ .clone()
+ .ok_or(anyhow!("ASS is missing required codec private data"))?;
+
+ let ass_blocks =
+ tokio::task::spawn_blocking(move || extract_track(CONF.library_path.clone(), local_track))
+ .await??;
+
+ let webvtt = webvtt_from_ass_blocks(node.public.title, codec_private, ass_blocks)
+ .context("transcoding subtitles")?;
+
+ tokio::task::spawn(async move {
+ let _ = b.write_all(webvtt.as_bytes()).await;
+ });
+ Ok(())
+}
diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs
index 673946b..ef663fb 100644
--- a/transcoder/src/lib.rs
+++ b/transcoder/src/lib.rs
@@ -9,5 +9,6 @@
use tokio::sync::Semaphore;
pub mod image;
pub mod snippet;
+pub mod subtitles;
static LOCAL_TRANSCODING_TASKS: Semaphore = Semaphore::const_new(2);
diff --git a/transcoder/src/subtitles.rs b/transcoder/src/subtitles.rs
new file mode 100644
index 0000000..987405b
--- /dev/null
+++ b/transcoder/src/subtitles.rs
@@ -0,0 +1,48 @@
+use anyhow::anyhow;
+use std::fmt::Write;
+
+pub fn webvtt_from_ass_blocks(
+ title: String,
+ codec_private: Vec<u8>,
+ blocks: Vec<(u64, u64, Vec<u8>)>,
+) -> anyhow::Result<String> {
+ let mut out = String::new();
+
+ writeln!(out, "WEBVTT - {title}")?; // TODO ensure title does not contain "-->"
+ writeln!(out)?;
+
+ for (pts, dur, block) in blocks {
+ let block = String::from_utf8(block)?;
+ let text = convert_block(&block).ok_or(anyhow!("bad ass xD"))?;
+ writeln!(out, "{} --> {}", format_time(pts), format_time(pts + dur))?;
+ writeln!(out, "- {text}")?;
+ writeln!(out)?;
+ }
+
+ Ok(out)
+}
+
+fn format_time(t: u64) -> String {
+ let t = t as f64 / 1_000_000.;
+
+ // let mmm = (t.fract() * 1000.).floor();
+ // let mmm = ((t * 60.).fract() * 60.).floor();
+ // let mmm = ((t / 60).fract() * 60.).floor();
+
+ // format!("{hh:4}:{mm:02}:{ss:02}.{mmm:03}")
+ format!("{t}")
+}
+
+fn convert_block(s: &str) -> Option<&str> {
+ // ReadOrder, Layer, Style, Name, MarginL, MarginR, MarginV, Effect, Text
+ let (_read_order, s) = s.split_once(',')?;
+ let (_layer, s) = s.split_once(',')?;
+ let (_style, s) = s.split_once(',')?;
+ let (_name, s) = s.split_once(',')?;
+ let (_marginl, s) = s.split_once(',')?;
+ let (_marginr, s) = s.split_once(',')?;
+ let (_marginv, s) = s.split_once(',')?;
+ let (_effect, text) = s.split_once(',')?;
+
+ Some(text)
+}