diff options
-rw-r--r-- | base/src/cache.rs | 57 | ||||
-rw-r--r-- | common/src/lib.rs | 6 | ||||
-rw-r--r-- | import/src/acoustid.rs | 2 | ||||
-rw-r--r-- | import/src/tmdb.rs | 70 | ||||
-rw-r--r-- | import/src/trakt.rs | 55 | ||||
-rw-r--r-- | remuxer/src/metadata.rs | 24 | ||||
-rw-r--r-- | remuxer/src/seek_index.rs | 6 | ||||
-rw-r--r-- | server/src/routes/ui/admin/mod.rs | 2 | ||||
-rw-r--r-- | server/src/routes/ui/assets.rs | 6 | ||||
-rw-r--r-- | stream/src/fragment.rs | 2 | ||||
-rw-r--r-- | transcoder/src/fragment.rs | 120 | ||||
-rw-r--r-- | transcoder/src/image.rs | 14 | ||||
-rw-r--r-- | transcoder/src/thumbnail.rs | 41 |
13 files changed, 198 insertions, 207 deletions
diff --git a/base/src/cache.rs b/base/src/cache.rs index 0b28e1b..40c6e7b 100644 --- a/base/src/cache.rs +++ b/base/src/cache.rs @@ -10,11 +10,13 @@ use bincode::{Decode, Encode}; use log::{info, warn}; use rand::random; use serde::Serialize; +use sha2::Sha512; use std::{ any::Any, collections::{BTreeMap, HashMap}, fs::rename, future::Future, + hash::{Hash, Hasher}, io::Seek, path::PathBuf, sync::{ @@ -36,19 +38,28 @@ impl CachePath { } } -pub fn cache_location(seed: &[&str]) -> (usize, CachePath) { +pub fn cache_location(kind: &str, key: impl Hash) -> (usize, CachePath) { use sha2::Digest; - let mut d = sha2::Sha512::new(); - for s in seed { - d.update(s.as_bytes()); - d.update(b"\0"); + struct ShaHasher(Sha512); + impl Hasher for ShaHasher { + fn finish(&self) -> u64 { + unreachable!() + } + fn write(&mut self, bytes: &[u8]) { + self.0.update(bytes); + } } - let d = d.finalize(); + let mut d = ShaHasher(sha2::Sha512::new()); + d.0.update(kind); + d.0.update(b"\0"); + key.hash(&mut d); + + let d = d.0.finalize(); let n = d[0] as usize | ((d[1] as usize) << 8) | ((d[2] as usize) << 16) | ((d[3] as usize) << 24); let fname = base64::engine::general_purpose::URL_SAFE.encode(d); - let fname = &fname[..22]; - let fname = format!("{}-{}", seed[0], fname); // about 128 bits + let fname = &fname[..30]; // 180 bits + let fname = format!("{}-{}", kind, fname); (n, CachePath(fname.into())) } @@ -57,14 +68,15 @@ pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_ LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); pub async fn async_cache_file<Fun, Fut>( - seed: &[&str], + kind: &str, + key: impl Hash, generate: Fun, ) -> Result<CachePath, anyhow::Error> where Fun: FnOnce(tokio::fs::File) -> Fut, Fut: Future<Output = Result<(), anyhow::Error>>, { - let (bucket, location) = cache_location(seed); + let (bucket, location) = cache_location(kind, key); // we need a lock even if it exists since somebody might be still in the process of writing. let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT] .lock() @@ -95,11 +107,15 @@ where thread_local! { pub static WITHIN_CACHE_FILE: AtomicBool = const { AtomicBool::new(false) }; } -pub fn cache_file<Fun>(seed: &[&str], mut generate: Fun) -> Result<CachePath, anyhow::Error> +pub fn cache_file<Fun>( + kind: &str, + key: impl Hash, + mut generate: Fun, +) -> Result<CachePath, anyhow::Error> where Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, { - let (bucket, location) = cache_location(seed); + let (bucket, location) = cache_location(kind, key); // we need a lock even if it exists since somebody might be still in the process of writing. let already_within = WITHIN_CACHE_FILE.with(|a| a.swap(true, Ordering::Relaxed)); let _guard = if already_within { @@ -139,12 +155,16 @@ pub static CACHE_IN_MEMORY_OBJECTS: LazyLock<RwLock<HashMap<PathBuf, InMemoryCac LazyLock::new(|| RwLock::new(HashMap::new())); pub static CACHE_IN_MEMORY_SIZE: AtomicUsize = AtomicUsize::new(0); -pub fn cache_memory<Fun, T>(seed: &[&str], mut generate: Fun) -> Result<Arc<T>, anyhow::Error> +pub fn cache_memory<Fun, T>( + kind: &str, + key: impl Hash, + mut generate: Fun, +) -> Result<Arc<T>, anyhow::Error> where Fun: FnMut() -> Result<T, anyhow::Error>, T: Encode + Decode + Send + Sync + 'static, { - let (_, location) = cache_location(seed); + let (_, location) = cache_location(kind, &key); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location.abs()) { @@ -158,7 +178,7 @@ where } } - let location = cache_file(seed, move |mut file| { + let location = cache_file(kind, &key, move |mut file| { let object = generate()?; bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) .context("encoding cache object")?; @@ -189,7 +209,8 @@ where } pub async fn async_cache_memory<Fun, Fut, T>( - seed: &[&str], + kind: &str, + key: impl Hash, generate: Fun, ) -> Result<Arc<T>, anyhow::Error> where @@ -197,7 +218,7 @@ where Fut: Future<Output = Result<T, anyhow::Error>>, T: Encode + Decode + Send + Sync + 'static, { - let (_, location) = cache_location(seed); + let (_, location) = cache_location(kind, &key); { let mut g = CACHE_IN_MEMORY_OBJECTS.write().unwrap(); if let Some(entry) = g.get_mut(&location.abs()) { @@ -211,7 +232,7 @@ where } } - let location = async_cache_file(seed, move |mut file| async move { + let location = async_cache_file(kind, &key, move |mut file| async move { let object = generate().await?; let data = bincode::encode_to_vec(&object, bincode::config::standard()) .context("encoding cache object")?; diff --git a/common/src/lib.rs b/common/src/lib.rs index 003a798..f413d97 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -21,7 +21,7 @@ use std::{ path::PathBuf, }; -#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodeID(pub [u8; 32]); pub enum NodeIDOrSlug { @@ -238,7 +238,7 @@ pub enum SourceTrackKind { Subtitles, } -#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Encode, Decode)] #[serde(rename_all = "snake_case")] pub enum TraktKind { Movie, @@ -249,7 +249,7 @@ pub enum TraktKind { User, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize)] pub enum TmdbKind { Tv, Movie, diff --git a/import/src/acoustid.rs b/import/src/acoustid.rs index fe47290..7406976 100644 --- a/import/src/acoustid.rs +++ b/import/src/acoustid.rs @@ -17,7 +17,7 @@ pub(crate) struct Fingerprint { } pub(crate) async fn acoustid_fingerprint(path: &Path) -> Result<Arc<Fingerprint>> { - async_cache_memory(&["fpcalc", &path.to_string_lossy()], || async move { + async_cache_memory("fpcalc", path, || async move { let child = Command::new("fpcalc") .arg("-json") .arg(path) diff --git a/import/src/tmdb.rs b/import/src/tmdb.rs index 1e7849f..3b3e7ed 100644 --- a/import/src/tmdb.rs +++ b/import/src/tmdb.rs @@ -44,48 +44,42 @@ impl Tmdb { } } pub async fn search(&self, kind: TmdbKind, query: &str) -> anyhow::Result<Arc<TmdbQuery>> { - async_cache_memory( - &["api-tmdb-search", query, &format!("{kind}")], - || async move { - info!("searching tmdb: {query:?}"); - Ok(self - .client - .get(format!( - "https://api.themoviedb.org/3/search/{kind}?query={}?api_key={}", - query.replace(" ", "+"), - self.key - )) - .send() - .await? - .error_for_status()? - .json::<TmdbQuery>() - .await?) - }, - ) + async_cache_memory("api-tmdb-search", (kind, query), || async move { + info!("searching tmdb: {query:?}"); + Ok(self + .client + .get(format!( + "https://api.themoviedb.org/3/search/{kind}?query={}?api_key={}", + query.replace(" ", "+"), + self.key + )) + .send() + .await? + .error_for_status()? + .json::<TmdbQuery>() + .await?) + }) .await } pub async fn details(&self, kind: TmdbKind, id: u64) -> anyhow::Result<Arc<TmdbDetails>> { - async_cache_memory( - &["api-tmdb-details", &format!("{kind} {id}")], - || async move { - info!("fetching details: {id:?}"); - Ok(self - .client - .get(format!( - "https://api.themoviedb.org/3/{kind}/{id}?api_key={}", - self.key, - )) - .send() - .await? - .error_for_status()? - .json() - .await?) - }, - ) + async_cache_memory("api-tmdb-details", (kind, id), || async move { + info!("fetching details: {id:?}"); + Ok(self + .client + .get(format!( + "https://api.themoviedb.org/3/{kind}/{id}?api_key={}", + self.key, + )) + .send() + .await? + .error_for_status()? + .json() + .await?) + }) .await } pub async fn person_image(&self, id: u64) -> anyhow::Result<Arc<TmdbPersonImage>> { - async_cache_memory(&["api-tmdb-search", &format!("{id}")], || async move { + async_cache_memory("api-tmdb-search", id, || async move { Ok(self .client .get(format!( @@ -101,7 +95,7 @@ impl Tmdb { .await } pub async fn image(&self, path: &str) -> anyhow::Result<CachePath> { - async_cache_file(&["api-tmdb-image", path], |mut file| async move { + async_cache_file("api-tmdb-image", path, |mut file| async move { info!("downloading image {path:?}"); let mut res = self .image_client @@ -123,7 +117,7 @@ impl Tmdb { season: usize, episode: usize, ) -> anyhow::Result<Arc<TmdbEpisode>> { - async_cache_memory(&["api-tmdb-episode-details", &format!("{series_id} {season} {episode}")], || async move { + async_cache_memory("api-tmdb-episode-details", (series_id,season,episode), || async move { info!("tmdb episode details {series_id} S={season} E={episode}"); Ok(self .image_client diff --git a/import/src/trakt.rs b/import/src/trakt.rs index 52a5cb0..2f8618d 100644 --- a/import/src/trakt.rs +++ b/import/src/trakt.rs @@ -48,7 +48,7 @@ impl Trakt { kinds: &[TraktKind], query: &str, ) -> anyhow::Result<Arc<Vec<TraktSearchResult>>> { - async_cache_memory(&["api-trakt-lookup", query], || async move { + async_cache_memory("api-trakt-lookup", (kinds, query), || async move { let url = format!( "https://api.trakt.tv/search/{}?query={}&extended=full", kinds @@ -66,38 +66,32 @@ impl Trakt { } pub async fn lookup(&self, kind: TraktKind, id: u64) -> anyhow::Result<Arc<TraktMediaObject>> { - async_cache_memory( - &["api-trakt-lookup", &format!("{kind} {id}")], - || async move { - info!("trakt lookup {kind:?}:{id:?}"); - let url = format!("https://api.trakt.tv/{}/{id}?extended=full", kind.plural()); - let res = self.client.get(url).send().await?.error_for_status()?; - Ok(res.json().await?) - }, - ) + async_cache_memory("api-trakt-lookup", (kind, id), || async move { + info!("trakt lookup {kind:?}:{id:?}"); + let url = format!("https://api.trakt.tv/{}/{id}?extended=full", kind.plural()); + let res = self.client.get(url).send().await?.error_for_status()?; + Ok(res.json().await?) + }) .await .context("trakt lookup") } pub async fn people(&self, kind: TraktKind, id: u64) -> anyhow::Result<Arc<TraktPeople>> { - async_cache_memory( - &["api-trakt-people", &format!("{kind} {id}")], - || async move { - info!("trakt people {kind:?}:{id:?}"); - let url = format!( - "https://api.trakt.tv/{}/{id}/people?extended=full", - kind.plural() - ); - let res = self.client.get(url).send().await?.error_for_status()?; - Ok(res.json().await?) - }, - ) + async_cache_memory("api-trakt-people", (kind, id), || async move { + info!("trakt people {kind:?}:{id:?}"); + let url = format!( + "https://api.trakt.tv/{}/{id}/people?extended=full", + kind.plural() + ); + let res = self.client.get(url).send().await?.error_for_status()?; + Ok(res.json().await?) + }) .await .context("trakt people") } pub async fn show_seasons(&self, id: u64) -> anyhow::Result<Arc<Vec<TraktSeason>>> { - async_cache_memory(&["api-trakt-seasons", &id.to_string()], || async move { + async_cache_memory("api-trakt-seasons", id, || async move { info!("trakt seasons {id:?}"); let url = format!("https://api.trakt.tv/shows/{id}/seasons?extended=full"); let res = self.client.get(url).send().await?.error_for_status()?; @@ -112,15 +106,12 @@ impl Trakt { id: u64, season: usize, ) -> anyhow::Result<Arc<Vec<TraktEpisode>>> { - async_cache_memory( - &["api-trakt-episodes", &id.to_string(), &season.to_string()], - || async move { - info!("trakt episodes {id:?} season={season}"); - let url = format!("https://api.trakt.tv/shows/{id}/seasons/{season}?extended=full"); - let res = self.client.get(url).send().await?.error_for_status()?; - Ok(res.json().await?) - }, - ) + async_cache_memory("api-trakt-episodes", (id, season), || async move { + info!("trakt episodes {id:?} season={season}"); + let url = format!("https://api.trakt.tv/shows/{id}/seasons/{season}?extended=full"); + let res = self.client.get(url).send().await?.error_for_status()?; + Ok(res.json().await?) + }) .await .context("trakt show season episodes") } diff --git a/remuxer/src/metadata.rs b/remuxer/src/metadata.rs index c8a5f8f..4a496fe 100644 --- a/remuxer/src/metadata.rs +++ b/remuxer/src/metadata.rs @@ -33,20 +33,17 @@ pub struct MatroskaMetadata { pub infojson: Option<Vec<u8>>, } pub fn checked_matroska_metadata(path: &Path) -> Result<Arc<Option<MatroskaMetadata>>> { - cache_memory( - &["mkmeta-check-v1", path.to_string_lossy().as_ref()], - || { - let mut magic = [0; 4]; - File::open(path)?.read_exact(&mut magic).ok(); - if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { - return Ok(None); - } - Ok(Some((*matroska_metadata(path)?).clone())) - }, - ) + cache_memory("mkmeta-check-v1", path, || { + let mut magic = [0; 4]; + File::open(path)?.read_exact(&mut magic).ok(); + if !matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { + return Ok(None); + } + Ok(Some((*matroska_metadata(path)?).clone())) + }) } pub fn matroska_metadata(path: &Path) -> Result<Arc<MatroskaMetadata>> { - cache_memory(&["mkmeta-v3", path.to_string_lossy().as_ref()], || { + cache_memory("mkmeta-v3", path, || { info!("reading {path:?}"); let mut file = BufReader::new(File::open(path)?); let mut file = file.by_ref().take(u64::MAX); @@ -86,7 +83,8 @@ pub fn matroska_metadata(path: &Path) -> Result<Arc<MatroskaMetadata>> { | "cover.avif" => { cover = Some( AssetInner::Cache(cache_file( - &["att-cover", path.to_string_lossy().as_ref()], + "att-cover", + path, move |mut file| { file.write_all(&f.data)?; Ok(()) diff --git a/remuxer/src/seek_index.rs b/remuxer/src/seek_index.rs index 82f62fb..1e1ce02 100644 --- a/remuxer/src/seek_index.rs +++ b/remuxer/src/seek_index.rs @@ -15,11 +15,8 @@ use jellymatroska::{ use log::{debug, info, trace, warn}; use std::{collections::BTreeMap, fs::File, io::BufReader, path::Path, sync::Arc}; -pub const SEEK_INDEX_VERSION: u32 = 0x5eef1de4; - #[derive(Debug, Clone, Decode, Encode)] pub struct SeekIndex { - pub version: u32, pub blocks: Vec<BlockIndex>, pub keyframes: Vec<usize>, } @@ -35,7 +32,6 @@ pub struct BlockIndex { impl Default for SeekIndex { fn default() -> Self { Self { - version: SEEK_INDEX_VERSION, blocks: Vec::new(), keyframes: Vec::new(), } @@ -43,7 +39,7 @@ impl Default for SeekIndex { } pub fn get_seek_index(path: &Path) -> anyhow::Result<Arc<BTreeMap<u64, Arc<SeekIndex>>>> { - cache_memory(&["seekindex", path.to_str().unwrap()], move || { + cache_memory("seekindex-v1", path, move || { info!("generating seek index for {path:?}"); let input = File::open(path).context("opening source file")?; let mut input = EbmlReader::new(BufReader::new(input)); diff --git a/server/src/routes/ui/admin/mod.rs b/server/src/routes/ui/admin/mod.rs index 15cc5c0..f44b36c 100644 --- a/server/src/routes/ui/admin/mod.rs +++ b/server/src/routes/ui/admin/mod.rs @@ -225,7 +225,7 @@ pub async fn r_admin_transcode_posters( continue; } let source = resolve_asset(asset).await.context("resolving asset")?; - jellytranscoder::image::transcode(source, AVIF_QUALITY, AVIF_SPEED, 1024) + jellytranscoder::image::transcode(&source, AVIF_QUALITY, AVIF_SPEED, 1024) .await .context("transcoding asset")?; } diff --git a/server/src/routes/ui/assets.rs b/server/src/routes/ui/assets.rs index cfaa68e..c661771 100644 --- a/server/src/routes/ui/assets.rs +++ b/server/src/routes/ui/assets.rs @@ -31,7 +31,7 @@ pub async fn r_asset( let session = fed.get_session(&host).await?; let asset = base64::engine::general_purpose::URL_SAFE.encode(asset); - async_cache_file(&["fed-asset", &asset], |out| async { + async_cache_file("fed-asset", &asset, |out| async { session.asset(out, &asset, width).await }) .await? @@ -40,7 +40,7 @@ pub async fn r_asset( // fit the resolution into a finite set so the maximum cache is finite too. let width = 2usize.pow(width.clamp(128, 2048).ilog2()); - jellytranscoder::image::transcode(source, AVIF_QUALITY, AVIF_SPEED, width) + jellytranscoder::image::transcode(&source, AVIF_QUALITY, AVIF_SPEED, width) .await .context("transcoding asset")? }; @@ -186,7 +186,7 @@ pub async fn r_node_thumbnail( ) .await?; - async_cache_file(&["fed-thumb", &format!("{id} {t}")], |out| { + async_cache_file("fed-thumb", (id, t as i64), |out| { session.node_thumbnail(out, id.into(), 2048, t) }) .await? diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index dfe101e..38987b9 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -53,9 +53,9 @@ pub async fn fragment_stream( }); } else { let location = transcode( - &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source track.kind, format, + &format!("{path:?} {track_num} {index}"), move |b| { tokio::task::spawn_blocking(move || { if let Err(err) = jellyremuxer::write_fragment_into( diff --git a/transcoder/src/fragment.rs b/transcoder/src/fragment.rs index 88a311e..3e07ad7 100644 --- a/transcoder/src/fragment.rs +++ b/transcoder/src/fragment.rs @@ -3,7 +3,6 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ - use crate::LOCAL_VIDEO_TRANSCODING_TASKS; use jellybase::{ cache::{async_cache_file, CachePath}, @@ -21,78 +20,75 @@ use tokio::{ // TODO with an implementation that cant handle it (SVT-AV1 is such an impl). pub async fn transcode( - key: &str, kind: TrackKind, format: &StreamFormatInfo, + input_key: &str, input: impl FnOnce(ChildStdin), ) -> anyhow::Result<CachePath> { - async_cache_file( - &["frag-tc", key, &format!("{format:?}")], - move |mut output| async move { - let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; - debug!("transcoding fragment with {format:?}"); + let template = match format.codec.as_str() { + "V_MPEG4/ISO/AVC" => CONF.encoders.avc.as_ref(), + "V_MPEGH/ISO/HEVC" => CONF.encoders.hevc.as_ref(), + "V_VP8" => CONF.encoders.vp8.as_ref(), + "V_VP9" => CONF.encoders.vp9.as_ref(), + "V_AV1" => CONF.encoders.av1.as_ref(), + _ => None, + } + .or(CONF.encoders.generic.as_ref()) + .cloned() + .unwrap_or("ffmpeg %i %f %e %o".to_owned()); - let template = match format.codec.as_str() { - "V_MPEG4/ISO/AVC" => CONF.encoders.avc.as_ref(), - "V_MPEGH/ISO/HEVC" => CONF.encoders.hevc.as_ref(), - "V_VP8" => CONF.encoders.vp8.as_ref(), - "V_VP9" => CONF.encoders.vp9.as_ref(), - "V_AV1" => CONF.encoders.av1.as_ref(), - _ => None, - } - .or(CONF.encoders.generic.as_ref()) - .cloned() - .unwrap_or("ffmpeg %i %f %e %o".to_owned()); + let filter = match kind { + TrackKind::Video => format!("-vf scale={}:-1", format.width.unwrap()), + TrackKind::Audio => format!(""), + TrackKind::Subtitle => String::new(), + }; + let typechar = match kind { + TrackKind::Video => "v", + TrackKind::Audio => "a", + TrackKind::Subtitle => "s", + }; + let fallback_encoder = match format.codec.as_str() { + "A_OPUS" => "libopus", + "V_VP8" => "libvpx", + "V_VP9" => "libvpx-vp9", + "V_AV1" => "libaom", // svtav1 is x86 only :( + "V_MPEG4/ISO/AVC" => "libx264", + "V_MPEGH/ISO/HEVC" => "libx265", + _ => "", + }; - let filter = match kind { - TrackKind::Video => format!("-vf scale={}:-1", format.width.unwrap()), - TrackKind::Audio => format!(""), - TrackKind::Subtitle => String::new(), - }; - let typechar = match kind { - TrackKind::Video => "v", - TrackKind::Audio => "a", - TrackKind::Subtitle => "s", - }; - let fallback_encoder = match format.codec.as_str() { - "A_OPUS" => "libopus", - "V_VP8" => "libvpx", - "V_VP9" => "libvpx-vp9", - "V_AV1" => "libaom", // svtav1 is x86 only :( - "V_MPEG4/ISO/AVC" => "libx264", - "V_MPEGH/ISO/HEVC" => "libx265", - _ => "", - }; + let args = template + .replace("%i", "-f matroska -i pipe:0 -copyts") + .replace("%o", "-f matroska pipe:1") + .replace("%f", &filter) + .replace("%e", "-c:%t %c -b:%t %r") + .replace("%t", typechar) + .replace("%c", fallback_encoder) + .replace("%r", &(format.bitrate as i64).to_string()) + .replace(" ", " "); - let args = template - .replace("%i", "-f matroska -i pipe:0 -copyts") - .replace("%o", "-f matroska pipe:1") - .replace("%f", &filter) - .replace("%e", "-c:%t %c -b:%t %r") - .replace("%t", typechar) - .replace("%c", fallback_encoder) - .replace("%r", &(format.bitrate as i64).to_string()) - .replace(" ", " "); + async_cache_file("frag-tc", (input_key, &args), async |mut output| { + let _permit = LOCAL_VIDEO_TRANSCODING_TASKS.acquire().await?; + debug!("transcoding fragment with {format:?}"); - info!("encoding with {:?}", args); + info!("encoding with {:?}", args); - let mut args = args.split(" "); - let mut proc = Command::new(args.next().unwrap()) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .args(args) - .spawn()?; + let mut args = args.split(" "); + let mut proc = Command::new(args.next().unwrap()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .args(args) + .spawn()?; - let stdin = proc.stdin.take().unwrap(); - let mut stdout = proc.stdout.take().unwrap(); + let stdin = proc.stdin.take().unwrap(); + let mut stdout = proc.stdout.take().unwrap(); - input(stdin); - copy(&mut stdout, &mut output).await?; + input(stdin); + copy(&mut stdout, &mut output).await?; - proc.wait().await.unwrap().exit_ok()?; - info!("done"); - Ok(()) - }, - ) + proc.wait().await.unwrap().exit_ok()?; + info!("done"); + Ok(()) + }) .await } diff --git a/transcoder/src/image.rs b/transcoder/src/image.rs index 28b253a..c6e1367 100644 --- a/transcoder/src/image.rs +++ b/transcoder/src/image.rs @@ -12,25 +12,23 @@ use rgb::FromSlice; use std::{ fs::File, io::{BufReader, Read, Seek, SeekFrom}, - path::PathBuf, + path::Path, }; use tokio::io::AsyncWriteExt; pub async fn transcode( - path: PathBuf, + path: &Path, quality: f32, speed: u8, width: usize, ) -> anyhow::Result<CachePath> { async_cache_file( - &[ - "image-tc", - path.clone().as_os_str().to_str().unwrap(), - &format!("{width} {quality} {speed}"), - ], - move |mut output| async move { + "image-tc", + (path, width, quality as i32, speed), + |mut output| async move { let _permit = LOCAL_IMAGE_TRANSCODING_TASKS.acquire().await?; info!("encoding {path:?} (speed={speed}, quality={quality}, width={width})"); + let path = path.to_owned(); let encoded = tokio::task::spawn_blocking(move || { let mut file = BufReader::new(File::open(&path).context("opening source")?); diff --git a/transcoder/src/thumbnail.rs b/transcoder/src/thumbnail.rs index c8bfb1c..caef397 100644 --- a/transcoder/src/thumbnail.rs +++ b/transcoder/src/thumbnail.rs @@ -5,30 +5,27 @@ use std::{path::Path, process::Stdio}; use tokio::{io::copy, process::Command}; pub async fn create_thumbnail(path: &Path, time: f64) -> anyhow::Result<CachePath> { - async_cache_file( - &["thumb", path.to_str().unwrap(), &format!("{time}")], - move |mut output| async move { - let _permit = LOCAL_IMAGE_TRANSCODING_TASKS.acquire().await?; - info!("creating thumbnail of {path:?} at {time}s",); + async_cache_file("thumb", (path, time as i64), move |mut output| async move { + let _permit = LOCAL_IMAGE_TRANSCODING_TASKS.acquire().await?; + info!("creating thumbnail of {path:?} at {time}s",); - let mut proc = Command::new("ffmpeg") - .stdout(Stdio::piped()) - .args(["-ss", &format!("{time}")]) - .args(["-f", "matroska", "-i", path.to_str().unwrap()]) - .args(["-frames:v", "1"]) - .args(["-c:v", "qoi"]) - .args(["-f", "image2"]) - .args(["-update", "1"]) - .arg("pipe:1") - .spawn()?; + let mut proc = Command::new("ffmpeg") + .stdout(Stdio::piped()) + .args(["-ss", &format!("{time}")]) + .args(["-f", "matroska", "-i", path.to_str().unwrap()]) + .args(["-frames:v", "1"]) + .args(["-c:v", "qoi"]) + .args(["-f", "image2"]) + .args(["-update", "1"]) + .arg("pipe:1") + .spawn()?; - let mut stdout = proc.stdout.take().unwrap(); - copy(&mut stdout, &mut output).await?; + let mut stdout = proc.stdout.take().unwrap(); + copy(&mut stdout, &mut output).await?; - proc.wait().await.unwrap().exit_ok()?; - info!("done"); - Ok(()) - }, - ) + proc.wait().await.unwrap().exit_ok()?; + info!("done"); + Ok(()) + }) .await } |