diff options
author | metamuffin <metamuffin@disroot.org> | 2023-09-30 22:19:19 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-09-30 22:19:19 +0200 |
commit | d546caa3f5053ade763430490911fefd6257af9f (patch) | |
tree | 5834ea5aa352239ab9a3f57ee96dee20af51ca77 | |
parent | c8fe73a7b160d4ada3136de9c87ad2eb0091ff7b (diff) | |
download | jellything-d546caa3f5053ade763430490911fefd6257af9f.tar jellything-d546caa3f5053ade763430490911fefd6257af9f.tar.bz2 jellything-d546caa3f5053ade763430490911fefd6257af9f.tar.zst |
make cache async and fix parallel write bug
-rw-r--r-- | Cargo.lock | 216 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | base/Cargo.toml | 2 | ||||
-rw-r--r-- | base/src/lib.rs | 48 | ||||
-rw-r--r-- | client/Cargo.toml | 1 | ||||
-rw-r--r-- | client/src/lib.rs | 5 | ||||
-rw-r--r-- | remuxer/src/snippet.rs | 4 | ||||
-rw-r--r-- | server/Cargo.toml | 3 | ||||
-rw-r--r-- | server/src/import.rs | 39 | ||||
-rw-r--r-- | server/src/routes/ui/assets.rs | 7 | ||||
-rw-r--r-- | server/src/routes/ui/layout.rs | 2 | ||||
-rw-r--r-- | transcoder/Cargo.toml | 1 | ||||
-rw-r--r-- | transcoder/src/bin/reproduce_decode_error.rs | 4 | ||||
-rw-r--r-- | transcoder/src/image.rs | 89 | ||||
-rw-r--r-- | transcoder/src/lib.rs | 1 |
15 files changed, 140 insertions, 283 deletions
@@ -195,75 +195,6 @@ dependencies = [ ] [[package]] -name = "async-channel" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-executor" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" -dependencies = [ - "async-lock", - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" -dependencies = [ - "async-channel", - "async-executor", - "async-io", - "async-lock", - "blocking", - "futures-lite", - "once_cell", -] - -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock", - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-lite", - "log", - "parking", - "polling", - "rustix", - "slab", - "socket2 0.4.9", - "waker-fn", -] - -[[package]] -name = "async-lock" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" -dependencies = [ - "event-listener", -] - -[[package]] name = "async-recursion" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -275,32 +206,6 @@ dependencies = [ ] [[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-channel", - "async-global-executor", - "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] name = "async-stream" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -323,12 +228,6 @@ dependencies = [ ] [[package]] -name = "async-task" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" - -[[package]] name = "async-trait" version = "0.1.68" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -346,12 +245,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" [[package]] -name = "atomic-waker" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" - -[[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -495,21 +388,6 @@ dependencies = [ ] [[package]] -name = "blocking" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" -dependencies = [ - "async-channel", - "async-lock", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "log", -] - -[[package]] name = "built" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -658,15 +536,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] -name = "concurrent-queue" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" -dependencies = [ - "crossbeam-utils", -] - -[[package]] name = "cookie" version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -914,12 +783,6 @@ dependencies = [ ] [[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - -[[package]] name = "exr" version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1088,21 +951,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - -[[package]] name = "futures-macro" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1221,18 +1069,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] -name = "gloo-timers" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - -[[package]] name = "h2" version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1551,11 +1387,13 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" name = "jellybase" version = "0.1.0" dependencies = [ + "anyhow", "base64", "jellycommon", "log", "serde_json", "sha2", + "tokio", ] [[package]] @@ -1567,6 +1405,7 @@ dependencies = [ "log", "reqwest", "serde_json", + "tokio", ] [[package]] @@ -1623,7 +1462,6 @@ dependencies = [ "anyhow", "argon2", "async-recursion", - "async-std", "base64", "bincode 2.0.0-rc.3", "chrono", @@ -1680,6 +1518,7 @@ dependencies = [ "ravif", "rayon", "rgb", + "tokio", ] [[package]] @@ -1710,15 +1549,6 @@ dependencies = [ ] [[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - -[[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1767,9 +1597,6 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -dependencies = [ - "value-bag", -] [[package]] name = "loom" @@ -2131,12 +1958,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] -name = "parking" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" - -[[package]] name = "parking_lot" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2282,22 +2103,6 @@ dependencies = [ ] [[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - -[[package]] name = "polyval" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3124,6 +2929,7 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2 0.5.4", @@ -3406,12 +3212,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] -name = "value-bag" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" - -[[package]] name = "vcpkg" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3457,12 +3257,6 @@ dependencies = [ ] [[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - -[[package]] name = "want" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -16,6 +16,7 @@ resolver = "2" rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "2cf38a5aa37fe046e46b03740835787f0396307b" } log = "0.4.20" anyhow = "1.0.75" +tokio = { version = "1.32.0", features = ["full"] } [profile.dev.package.rav1e] opt-level = 3 diff --git a/base/Cargo.toml b/base/Cargo.toml index 86675b9..f49db80 100644 --- a/base/Cargo.toml +++ b/base/Cargo.toml @@ -9,3 +9,5 @@ serde_json = "1.0.107" log = { workspace = true } sha2 = "0.10.8" base64 = "0.21.4" +tokio = {workspace=true} +anyhow = "1.0.75" diff --git a/base/src/lib.rs b/base/src/lib.rs index 46ab576..b97f924 100644 --- a/base/src/lib.rs +++ b/base/src/lib.rs @@ -6,7 +6,8 @@ #![feature(lazy_cell)] use base64::Engine; use jellycommon::{config::GlobalConfig, AssetLocation}; -use std::{fs::File, path::PathBuf, sync::LazyLock}; +use std::{fs::File, future::Future, path::PathBuf, sync::LazyLock}; +use tokio::sync::Mutex; pub static CONF: LazyLock<GlobalConfig> = LazyLock::new(|| { serde_json::from_reader( @@ -20,7 +21,7 @@ pub static CONF: LazyLock<GlobalConfig> = LazyLock::new(|| { .unwrap() }); -pub fn cache_file(seed: &[&str]) -> AssetLocation { +pub fn cache_location(seed: &[&str]) -> (usize, AssetLocation) { use sha2::Digest; let mut d = sha2::Sha512::new(); for s in seed { @@ -28,9 +29,50 @@ pub fn cache_file(seed: &[&str]) -> AssetLocation { d.update(b"\0"); } let d = d.finalize(); + let n = d[0] as usize | (d[1] as usize) << 8 | (d[2] as usize) << 16 | (d[3] as usize) << 24; let fname = base64::engine::general_purpose::URL_SAFE.encode(d); let fname = &fname[..22]; // about 128 bits - AssetLocation::Cache(fname.into()) + (n, AssetLocation::Cache(fname.into())) +} + +const CACHE_GENERATION_BUCKET_COUNT: usize = 1024; +pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> = + LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(()))); + +pub async fn async_cache_file<Fun, Fut>( + seed: &[&str], + generate: Fun, +) -> Result<AssetLocation, anyhow::Error> +where + Fun: FnOnce(tokio::fs::File) -> Fut, + Fut: Future<Output = Result<(), anyhow::Error>>, +{ + let (bucket, location) = cache_location(seed); + // we need a lock even if it exists since somebody might be still in the process of writing. + let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].lock(); + let exists = tokio::fs::try_exists(location.path()).await?; + if !exists { + let f = tokio::fs::File::create(location.path()).await?; + generate(f).await?; + } + drop(_guard); + Ok(location) +} + +pub fn cache_file<Fun>(seed: &[&str], mut generate: Fun) -> Result<AssetLocation, anyhow::Error> +where + Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>, +{ + let (bucket, location) = cache_location(seed); + // we need a lock even if it exists since somebody might be still in the process of writing. + let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].lock(); + let exists = location.path().exists(); + if !exists { + let f = std::fs::File::create(location.path())?; + generate(f)?; + } + drop(_guard); + Ok(location) } pub trait AssetLocationExt { diff --git a/client/Cargo.toml b/client/Cargo.toml index ea89057..05dd2d4 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -9,3 +9,4 @@ log = { workspace = true } reqwest = { version = "0.11.20", features = ["json"] } anyhow = "1.0.75" serde_json = "1.0.107" +tokio = { workspace = true } diff --git a/client/src/lib.rs b/client/src/lib.rs index 200c869..92545a9 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -12,6 +12,7 @@ use reqwest::{ use serde_json::json; use std::time::Duration; use stream::StreamSpec; +use tokio::io::AsyncWriteExt; pub use jellycommon::*; @@ -93,7 +94,7 @@ impl Session { &self, id: &str, role: &str, - mut writer: impl std::io::Write, + mut writer: impl tokio::io::AsyncWrite + std::marker::Unpin, ) -> Result<()> { debug!("downloading asset {role:?} for {id:?}"); let mut r = self @@ -102,7 +103,7 @@ impl Session { .send() .await?; while let Some(chunk) = r.chunk().await? { - writer.write_all(&chunk)?; + writer.write_all(&chunk).await?; } Ok(()) } diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs index f23e19e..cd965ba 100644 --- a/remuxer/src/snippet.rs +++ b/remuxer/src/snippet.rs @@ -68,7 +68,7 @@ pub fn write_snippet_into( let private = &track_sources[track]; let source_path = path_base.join(&private.path); let mapped = 1; - info!("\t- {n} {source_path:?} ({} => {mapped})", private.track); + info!("\t- {track} {source_path:?} ({} => {mapped})", private.track); info!("\t {}", info); let file = File::open(&source_path).context("opening source file")?; let mut index = File::open(source_path.with_extension(format!("si.{}", private.track))) @@ -110,7 +110,7 @@ pub fn write_snippet_into( let mut reader = SegmentExtractIter::new(&mut reader, private.track as u64); - let mut blocks = vec![MatroskaTag::Timestamp(start_block.pts)]; + let mut blocks = vec![MatroskaTag::Timestamp(0)]; for i in start_block_index..end_block_index { let index_block = &index.blocks[i]; let mut block = reader.next()?; diff --git a/server/Cargo.toml b/server/Cargo.toml index 291f56b..fbb9f63 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -25,10 +25,9 @@ vte = "0.12.0" argon2 = "0.5.2" aes-gcm-siv = "0.11.1" -async-std = "1.12.0" async-recursion = "1.0.5" futures = "0.3.28" -tokio = { version = "1.32.0", features = ["io-util"] } +tokio = { workspace = true } tokio-util = { version = "0.7.9", features = ["io", "io-util"] } markup = "0.13.1" diff --git a/server/src/import.rs b/server/src/import.rs index 7e62b47..8d8198a 100644 --- a/server/src/import.rs +++ b/server/src/import.rs @@ -7,11 +7,17 @@ use crate::{database::Database, federation::Federation, CONF}; use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; -use jellybase::{cache_file, AssetLocationExt}; +use jellybase::async_cache_file; use jellyclient::Session; use jellycommon::{AssetLocation, MediaSource, Node, NodePrivate, RemoteImportOptions}; use log::{debug, error, info}; -use std::{ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::LazyLock}; +use std::{ + ffi::OsStr, + fs::File, + os::unix::prelude::OsStrExt, + path::PathBuf, + sync::{Arc, LazyLock}, +}; use tokio::sync::Semaphore; static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); @@ -133,7 +139,7 @@ static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new( async fn import_remote( mut opts: RemoteImportOptions, db: &Database, - session: &Session, + session: &Arc<Session>, identifier: String, parent: Option<String>, ) -> anyhow::Result<Vec<String>> { @@ -149,11 +155,11 @@ async fn import_remote( .context("fetching remote node")?; if node.federated.is_some() { - return Ok(vec![]) // node is federated, lets not import it + return Ok(vec![]); // node is federated, lets not import it } - let poster = cache_federation_asset(session, &opts.id, "poster").await?; - let backdrop = cache_federation_asset(session, &opts.id, "backdrop").await?; + let poster = cache_federation_asset(session.to_owned(), opts.id.clone(), "poster").await?; + let backdrop = cache_federation_asset(session.to_owned(), opts.id.clone(), "backdrop").await?; drop(_permit); @@ -215,15 +221,16 @@ async fn import_remote( } async fn cache_federation_asset( - session: &Session, - identifier: &String, - role: &str, + session: Arc<Session>, + identifier: String, + role: &'static str, ) -> anyhow::Result<AssetLocation> { - let poster = cache_file(&["federation-asset", role, identifier]); - if !poster.path().exists() { - session - .node_asset(&identifier, role, File::create(&poster.path())?) - .await?; - } - Ok(poster) + async_cache_file( + &["federation-asset", role, &identifier.clone()], + move |out| async move { + let session = session; + session.node_asset(identifier.as_str(), role, out).await + }, + ) + .await } diff --git a/server/src/routes/ui/assets.rs b/server/src/routes/ui/assets.rs index 992e3da..8c0496e 100644 --- a/server/src/routes/ui/assets.rs +++ b/server/src/routes/ui/assets.rs @@ -8,7 +8,7 @@ use crate::{ routes::ui::{account::session::Session, error::MyError, CacheControlFile}, }; use anyhow::anyhow; -use async_std::task::spawn_blocking; +use jellybase::AssetLocationExt; use jellycommon::AssetLocation; use log::info; use rocket::{get, http::ContentType, FromFormField, State, UriDisplayQuery}; @@ -53,11 +53,10 @@ pub async fn r_item_assets( )); // fit the resolution into a finite set so the maximum cache is finite too. let width = 2usize.pow(width.unwrap_or(2048).clamp(128, 8196).ilog2()); - let path = - spawn_blocking(move || jellytranscoder::image::transcode(asset, 50., 5, width)).await?; + let path = jellytranscoder::image::transcode(asset, 50., 5, width).await?; info!("loading asset from {path:?}"); Ok(( ContentType::AVIF, - CacheControlFile::new(File::open(path).await?).await, + CacheControlFile::new(File::open(path.path()).await?).await, )) } diff --git a/server/src/routes/ui/layout.rs b/server/src/routes/ui/layout.rs index fdda3e4..1c47247 100644 --- a/server/src/routes/ui/layout.rs +++ b/server/src/routes/ui/layout.rs @@ -16,7 +16,7 @@ use crate::{ }, uri, }; -use async_std::task::block_on; +use futures::executor::block_on; use jellybase::CONF; use markup::{DynRender, Render}; use rocket::{ diff --git a/transcoder/Cargo.toml b/transcoder/Cargo.toml index e39b07c..ecb2bc8 100644 --- a/transcoder/Cargo.toml +++ b/transcoder/Cargo.toml @@ -18,3 +18,4 @@ rav1e = { version = "0.6.6", default-features = false, features = [ rayon = "1.8.0" imgref = "1.9.4" ravif = "0.11.3" +tokio = { workspace = true } diff --git a/transcoder/src/bin/reproduce_decode_error.rs b/transcoder/src/bin/reproduce_decode_error.rs index 84fa6fd..3e0ca3a 100644 --- a/transcoder/src/bin/reproduce_decode_error.rs +++ b/transcoder/src/bin/reproduce_decode_error.rs @@ -1,11 +1,13 @@ use jellytranscoder::image::transcode; -fn main() { +#[tokio::main] +async fn main() { transcode( jellycommon::AssetLocation::Cache(std::env::args().nth(2).unwrap().into()), 1.0, 1, 1, ) + .await .unwrap(); } diff --git a/transcoder/src/image.rs b/transcoder/src/image.rs index 6da1be7..3865348 100644 --- a/transcoder/src/image.rs +++ b/transcoder/src/image.rs @@ -1,53 +1,60 @@ use anyhow::Context; use image::{imageops::FilterType, ImageFormat}; -use jellybase::{cache_file, AssetLocationExt}; +use jellybase::{async_cache_file, AssetLocationExt}; use jellycommon::AssetLocation; use log::{debug, info}; use rgb::FromSlice; -use std::{ - fs::File, - io::{BufReader, Write}, - path::PathBuf, -}; +use std::{fs::File, io::BufReader}; +use tokio::io::AsyncWriteExt; -pub fn transcode( +pub async fn transcode( asset: AssetLocation, quality: f32, speed: u8, width: usize, -) -> anyhow::Result<PathBuf> { +) -> anyhow::Result<AssetLocation> { let original_path = asset.path(); - let path = cache_file(&[ - original_path.as_os_str().to_str().unwrap(), - &format!("{width} {quality} {speed}"), - ]) - .path(); - if !path.exists() { - info!("encoding {path:?} (speed={speed}, quality={quality}, width={width})"); - // TODO shouldn't be neccessary with guessed format. - let file = BufReader::new(File::open(&original_path).context("opening source")?); - let mut reader = image::io::Reader::new(file); - reader.set_format(ImageFormat::Avif); - let reader = reader.with_guessed_format().context("guessing format")?; - debug!("guessed format (or fallback): {:?}", reader.format()); - let original = reader.decode().context("decoding image")?.to_rgba8(); - let image = image::imageops::resize( - &original, - width as u32, - width as u32 * original.height() / original.width(), - FilterType::Lanczos3, - ); - let pixels = image.to_vec(); - let encoded = ravif::Encoder::new() - .with_speed(speed.clamp(1, 10)) - .with_quality(quality.clamp(1., 100.)) - .encode_rgba(imgref::Img::new( - pixels.as_rgba(), - image.width() as usize, - image.height() as usize, - ))?; - info!("transcode finished"); - File::create(&path)?.write_all(&encoded.avif_file)?; - } - Ok(path) + let asset = asset.clone(); + Ok(async_cache_file( + &[ + original_path.as_os_str().to_str().unwrap(), + &format!("{width} {quality} {speed}"), + ], + move |mut output| async move { + let encoded = tokio::task::spawn_blocking(move || { + let original_path = asset.path(); + info!( + "encoding {original_path:?} (speed={speed}, quality={quality}, width={width})" + ); + // TODO shouldn't be neccessary with guessed format. + let file = BufReader::new(File::open(&original_path).context("opening source")?); + let mut reader = image::io::Reader::new(file); + reader.set_format(ImageFormat::Avif); + let reader = reader.with_guessed_format().context("guessing format")?; + debug!("guessed format (or fallback): {:?}", reader.format()); + let original = reader.decode().context("decoding image")?.to_rgba8(); + let image = image::imageops::resize( + &original, + width as u32, + width as u32 * original.height() / original.width(), + FilterType::Lanczos3, + ); + let pixels = image.to_vec(); + let encoded = ravif::Encoder::new() + .with_speed(speed.clamp(1, 10)) + .with_quality(quality.clamp(1., 100.)) + .encode_rgba(imgref::Img::new( + pixels.as_rgba(), + image.width() as usize, + image.height() as usize, + ))?; + info!("transcode finished"); + Ok::<_, anyhow::Error>(encoded) + }) + .await??; + output.write_all(&encoded.avif_file).await?; + Ok(()) + }, + ) + .await?) } diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs index e74a7f5..010a7dd 100644 --- a/transcoder/src/lib.rs +++ b/transcoder/src/lib.rs @@ -3,4 +3,5 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ +#![feature(async_closure)] pub mod image; |