diff options
-rw-r--r-- | base/src/cache.rs | 20 | ||||
-rw-r--r-- | common/src/stream.rs | 13 | ||||
-rw-r--r-- | stream/src/segment.rs | 66 | ||||
-rw-r--r-- | transcoder/src/lib.rs | 1 | ||||
-rw-r--r-- | transcoder/src/snippet.rs | 36 |
5 files changed, 92 insertions, 44 deletions
diff --git a/base/src/cache.rs b/base/src/cache.rs index 7705a14..ee13b73 100644 --- a/base/src/cache.rs +++ b/base/src/cache.rs @@ -3,7 +3,7 @@ use anyhow::{anyhow, Context}; use base64::Engine; use bincode::{Decode, Encode}; use jellycommon::AssetLocation; -use log::info; +use log::{info, warn}; use std::{ any::Any, collections::{BTreeMap, HashMap}, @@ -54,7 +54,14 @@ where let f = tokio::fs::File::create(location.path()) .await .context("creating new cache file")?; - generate(f).await?; + match generate(f).await { + Ok(()) => (), + Err(e) => { + warn!("cache generation failed, unlinking entry"); + tokio::fs::remove_file(location.path()).await?; + return Err(e); + } + } } drop(_guard); Ok(location) @@ -70,7 +77,14 @@ where let exists = location.path().exists(); if !exists { let f = std::fs::File::create(location.path()).context("creating new cache file")?; - generate(f)?; + match generate(f) { + Ok(()) => (), + Err(e) => { + warn!("cache generation failed, unlinking entry"); + std::fs::remove_file(location.path())?; + return Err(e); + } + } } drop(_guard); Ok(location) diff --git a/common/src/stream.rs b/common/src/stream.rs index e6fe767..1fb3761 100644 --- a/common/src/stream.rs +++ b/common/src/stream.rs @@ -13,8 +13,7 @@ pub struct StreamSpec { pub tracks: Vec<usize>, pub format: StreamFormat, pub webm: Option<bool>, - pub abr: Option<usize>, - pub vbr: Option<usize>, + pub bitrate: Option<usize>, pub width: Option<usize>, pub index: Option<usize>, } @@ -39,8 +38,7 @@ impl Default for StreamSpec { format: StreamFormat::Matroska, webm: Some(true), width: None, - abr: None, - vbr: None, + bitrate: None, index: None, } } @@ -64,11 +62,8 @@ impl StreamSpec { ) .unwrap(); } - if let Some(abr) = self.abr { - write!(u, "&abr={abr}").unwrap(); - } - if let Some(vbr) = self.vbr { - write!(u, "&vbr={vbr}").unwrap(); + if let Some(bitrate) = self.bitrate { + write!(u, "&bitrate={bitrate}").unwrap(); } if let Some(index) = self.index { write!(u, "&index={index}").unwrap(); diff --git a/stream/src/segment.rs b/stream/src/segment.rs index 78afc4f..02f3100 100644 --- a/stream/src/segment.rs +++ b/stream/src/segment.rs @@ -4,39 +4,69 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ use anyhow::{anyhow, bail, Result}; -use jellybase::CONF; +use jellybase::{AssetLocationExt, CONF}; use jellycommon::{stream::StreamSpec, LocalTrack, Node}; +use jellytranscoder::snippet::{transcode, Encoding}; use log::warn; -use tokio::io::DuplexStream; +use tokio::{fs::File, io::DuplexStream}; use tokio_util::io::SyncIoBridge; pub async fn segment_stream( node: Node, track_sources: Vec<LocalTrack>, spec: StreamSpec, - b: DuplexStream, + mut b: DuplexStream, ) -> Result<()> { - let b = SyncIoBridge::new(b); - if spec.tracks.len() != 1 { bail!("unsupported number of tracks for segment, must be exactly one"); } let track = spec.tracks[0]; let n = spec.index.ok_or(anyhow!("segment index missing"))?; - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_snippet_into( - b, - &CONF.library_path, - &node.public, - track_sources, - track, - spec.webm.unwrap_or(false), - n, - ) { - warn!("segment stream error: {err}"); - } - }); + if let Some(width) = spec.width { + let location = transcode( + "", + Encoding::Video { + codec: "libsvtav1", + preset: 8, + bitrate: spec.bitrate.unwrap_or(2_000_000), + width, + }, + move |b| { + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_snippet_into( + SyncIoBridge::new(b), + &CONF.library_path, + &node.public, + track_sources, + track, + false, + n, + ) { + warn!("segment stream error: {err}"); + } + }); + }, + ) + .await?; + let mut output = File::open(location.path()).await?; + tokio::io::copy(&mut output, &mut b).await?; + } else { + let b = SyncIoBridge::new(b); + tokio::task::spawn_blocking(move || { + if let Err(err) = jellyremuxer::write_snippet_into( + b, + &CONF.library_path, + &node.public, + track_sources, + track, + spec.webm.unwrap_or(false), + n, + ) { + warn!("segment stream error: {err}"); + } + }); + } Ok(()) } diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs index 167303d..240d37a 100644 --- a/transcoder/src/lib.rs +++ b/transcoder/src/lib.rs @@ -4,5 +4,6 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ #![feature(async_closure)] +#![feature(exit_status_error)] pub mod image; pub mod snippet; diff --git a/transcoder/src/snippet.rs b/transcoder/src/snippet.rs index 56bd523..5afeb15 100644 --- a/transcoder/src/snippet.rs +++ b/transcoder/src/snippet.rs @@ -4,10 +4,14 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ -use jellybase::{cache::async_cache_file, AssetLocationExt}; +use jellybase::cache::async_cache_file; use jellycommon::AssetLocation; +use log::info; use std::process::Stdio; -use tokio::{fs::File, process::Command}; +use tokio::{ + io::copy, + process::{ChildStdin, Command}, +}; #[derive(Debug)] pub enum Encoding { @@ -19,16 +23,15 @@ pub enum Encoding { }, } -pub async fn transcode(asset: AssetLocation, enc: Encoding) -> anyhow::Result<AssetLocation> { - let original_path = asset.path(); - let asset = asset.clone(); +pub async fn transcode( + key: &str, + enc: Encoding, + input: impl FnOnce(ChildStdin), +) -> anyhow::Result<AssetLocation> { Ok(async_cache_file( - &[ - "snip-tc", - original_path.as_os_str().to_str().unwrap(), - &format!("{enc:?}"), - ], - move |output| async move { + &["snip-tc", key, &format!("{enc:?}")], + move |mut output| async move { + info!("transcoding snippet {key}"); let args = match enc { Encoding::Video { codec, @@ -50,14 +53,19 @@ pub async fn transcode(asset: AssetLocation, enc: Encoding) -> anyhow::Result<As let mut proc = Command::new("ffmpeg") .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .args(&["-f", "mkv", "-i", "pipe:0"]) + .args(&["-f", "matroska", "-i", "pipe:0"]) .args(args) - .args(&["-f", "mkv", "pipe:1"]) + .args(&["-f", "webm", "pipe:1"]) .spawn()?; - let mut stdin = proc.stdin.take().unwrap(); + let stdin = proc.stdin.take().unwrap(); let mut stdout = proc.stdout.take().unwrap(); + input(stdin); + copy(&mut stdout, &mut output).await?; + + proc.wait().await.unwrap().exit_ok()?; + info!("done"); Ok(()) }, ) |