aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-09-30 22:19:19 +0200
committermetamuffin <metamuffin@disroot.org>2023-09-30 22:19:19 +0200
commitd546caa3f5053ade763430490911fefd6257af9f (patch)
tree5834ea5aa352239ab9a3f57ee96dee20af51ca77
parentc8fe73a7b160d4ada3136de9c87ad2eb0091ff7b (diff)
downloadjellything-d546caa3f5053ade763430490911fefd6257af9f.tar
jellything-d546caa3f5053ade763430490911fefd6257af9f.tar.bz2
jellything-d546caa3f5053ade763430490911fefd6257af9f.tar.zst
make cache async and fix parallel write bug
-rw-r--r--Cargo.lock216
-rw-r--r--Cargo.toml1
-rw-r--r--base/Cargo.toml2
-rw-r--r--base/src/lib.rs48
-rw-r--r--client/Cargo.toml1
-rw-r--r--client/src/lib.rs5
-rw-r--r--remuxer/src/snippet.rs4
-rw-r--r--server/Cargo.toml3
-rw-r--r--server/src/import.rs39
-rw-r--r--server/src/routes/ui/assets.rs7
-rw-r--r--server/src/routes/ui/layout.rs2
-rw-r--r--transcoder/Cargo.toml1
-rw-r--r--transcoder/src/bin/reproduce_decode_error.rs4
-rw-r--r--transcoder/src/image.rs89
-rw-r--r--transcoder/src/lib.rs1
15 files changed, 140 insertions, 283 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a0381f5..11a476d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -195,75 +195,6 @@ dependencies = [
]
[[package]]
-name = "async-channel"
-version = "1.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
-dependencies = [
- "concurrent-queue",
- "event-listener",
- "futures-core",
-]
-
-[[package]]
-name = "async-executor"
-version = "1.5.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb"
-dependencies = [
- "async-lock",
- "async-task",
- "concurrent-queue",
- "fastrand",
- "futures-lite",
- "slab",
-]
-
-[[package]]
-name = "async-global-executor"
-version = "2.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776"
-dependencies = [
- "async-channel",
- "async-executor",
- "async-io",
- "async-lock",
- "blocking",
- "futures-lite",
- "once_cell",
-]
-
-[[package]]
-name = "async-io"
-version = "1.13.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
-dependencies = [
- "async-lock",
- "autocfg",
- "cfg-if",
- "concurrent-queue",
- "futures-lite",
- "log",
- "parking",
- "polling",
- "rustix",
- "slab",
- "socket2 0.4.9",
- "waker-fn",
-]
-
-[[package]]
-name = "async-lock"
-version = "2.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
-dependencies = [
- "event-listener",
-]
-
-[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -275,32 +206,6 @@ dependencies = [
]
[[package]]
-name = "async-std"
-version = "1.12.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d"
-dependencies = [
- "async-channel",
- "async-global-executor",
- "async-io",
- "async-lock",
- "crossbeam-utils",
- "futures-channel",
- "futures-core",
- "futures-io",
- "futures-lite",
- "gloo-timers",
- "kv-log-macro",
- "log",
- "memchr",
- "once_cell",
- "pin-project-lite",
- "pin-utils",
- "slab",
- "wasm-bindgen-futures",
-]
-
-[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -323,12 +228,6 @@ dependencies = [
]
[[package]]
-name = "async-task"
-version = "4.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
-
-[[package]]
name = "async-trait"
version = "0.1.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -346,12 +245,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
[[package]]
-name = "atomic-waker"
-version = "1.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
-
-[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -495,21 +388,6 @@ dependencies = [
]
[[package]]
-name = "blocking"
-version = "1.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65"
-dependencies = [
- "async-channel",
- "async-lock",
- "async-task",
- "atomic-waker",
- "fastrand",
- "futures-lite",
- "log",
-]
-
-[[package]]
name = "built"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -658,15 +536,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
-name = "concurrent-queue"
-version = "2.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c"
-dependencies = [
- "crossbeam-utils",
-]
-
-[[package]]
name = "cookie"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -914,12 +783,6 @@ dependencies = [
]
[[package]]
-name = "event-listener"
-version = "2.5.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
-
-[[package]]
name = "exr"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1088,21 +951,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
-name = "futures-lite"
-version = "1.13.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
-dependencies = [
- "fastrand",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
-[[package]]
name = "futures-macro"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1221,18 +1069,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
-name = "gloo-timers"
-version = "0.2.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
-dependencies = [
- "futures-channel",
- "futures-core",
- "js-sys",
- "wasm-bindgen",
-]
-
-[[package]]
name = "h2"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1551,11 +1387,13 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
name = "jellybase"
version = "0.1.0"
dependencies = [
+ "anyhow",
"base64",
"jellycommon",
"log",
"serde_json",
"sha2",
+ "tokio",
]
[[package]]
@@ -1567,6 +1405,7 @@ dependencies = [
"log",
"reqwest",
"serde_json",
+ "tokio",
]
[[package]]
@@ -1623,7 +1462,6 @@ dependencies = [
"anyhow",
"argon2",
"async-recursion",
- "async-std",
"base64",
"bincode 2.0.0-rc.3",
"chrono",
@@ -1680,6 +1518,7 @@ dependencies = [
"ravif",
"rayon",
"rgb",
+ "tokio",
]
[[package]]
@@ -1710,15 +1549,6 @@ dependencies = [
]
[[package]]
-name = "kv-log-macro"
-version = "1.0.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
-dependencies = [
- "log",
-]
-
-[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1767,9 +1597,6 @@ name = "log"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
-dependencies = [
- "value-bag",
-]
[[package]]
name = "loom"
@@ -2131,12 +1958,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
-name = "parking"
-version = "2.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
-
-[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2282,22 +2103,6 @@ dependencies = [
]
[[package]]
-name = "polling"
-version = "2.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
-dependencies = [
- "autocfg",
- "bitflags 1.3.2",
- "cfg-if",
- "concurrent-queue",
- "libc",
- "log",
- "pin-project-lite",
- "windows-sys 0.48.0",
-]
-
-[[package]]
name = "polyval"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3124,6 +2929,7 @@ dependencies = [
"libc",
"mio",
"num_cpus",
+ "parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.4",
@@ -3406,12 +3212,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
-name = "value-bag"
-version = "1.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3"
-
-[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3457,12 +3257,6 @@ dependencies = [
]
[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
-[[package]]
name = "want"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index ac93eff..9ca423a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@ resolver = "2"
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "2cf38a5aa37fe046e46b03740835787f0396307b" }
log = "0.4.20"
anyhow = "1.0.75"
+tokio = { version = "1.32.0", features = ["full"] }
[profile.dev.package.rav1e]
opt-level = 3
diff --git a/base/Cargo.toml b/base/Cargo.toml
index 86675b9..f49db80 100644
--- a/base/Cargo.toml
+++ b/base/Cargo.toml
@@ -9,3 +9,5 @@ serde_json = "1.0.107"
log = { workspace = true }
sha2 = "0.10.8"
base64 = "0.21.4"
+tokio = {workspace=true}
+anyhow = "1.0.75"
diff --git a/base/src/lib.rs b/base/src/lib.rs
index 46ab576..b97f924 100644
--- a/base/src/lib.rs
+++ b/base/src/lib.rs
@@ -6,7 +6,8 @@
#![feature(lazy_cell)]
use base64::Engine;
use jellycommon::{config::GlobalConfig, AssetLocation};
-use std::{fs::File, path::PathBuf, sync::LazyLock};
+use std::{fs::File, future::Future, path::PathBuf, sync::LazyLock};
+use tokio::sync::Mutex;
pub static CONF: LazyLock<GlobalConfig> = LazyLock::new(|| {
serde_json::from_reader(
@@ -20,7 +21,7 @@ pub static CONF: LazyLock<GlobalConfig> = LazyLock::new(|| {
.unwrap()
});
-pub fn cache_file(seed: &[&str]) -> AssetLocation {
+pub fn cache_location(seed: &[&str]) -> (usize, AssetLocation) {
use sha2::Digest;
let mut d = sha2::Sha512::new();
for s in seed {
@@ -28,9 +29,50 @@ pub fn cache_file(seed: &[&str]) -> AssetLocation {
d.update(b"\0");
}
let d = d.finalize();
+ let n = d[0] as usize | (d[1] as usize) << 8 | (d[2] as usize) << 16 | (d[3] as usize) << 24;
let fname = base64::engine::general_purpose::URL_SAFE.encode(d);
let fname = &fname[..22]; // about 128 bits
- AssetLocation::Cache(fname.into())
+ (n, AssetLocation::Cache(fname.into()))
+}
+
+const CACHE_GENERATION_BUCKET_COUNT: usize = 1024;
+pub static CACHE_GENERATION_LOCKS: LazyLock<[Mutex<()>; CACHE_GENERATION_BUCKET_COUNT]> =
+ LazyLock::new(|| [(); CACHE_GENERATION_BUCKET_COUNT].map(|_| Mutex::new(())));
+
+pub async fn async_cache_file<Fun, Fut>(
+ seed: &[&str],
+ generate: Fun,
+) -> Result<AssetLocation, anyhow::Error>
+where
+ Fun: FnOnce(tokio::fs::File) -> Fut,
+ Fut: Future<Output = Result<(), anyhow::Error>>,
+{
+ let (bucket, location) = cache_location(seed);
+ // we need a lock even if it exists since somebody might be still in the process of writing.
+ let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].lock();
+ let exists = tokio::fs::try_exists(location.path()).await?;
+ if !exists {
+ let f = tokio::fs::File::create(location.path()).await?;
+ generate(f).await?;
+ }
+ drop(_guard);
+ Ok(location)
+}
+
+pub fn cache_file<Fun>(seed: &[&str], mut generate: Fun) -> Result<AssetLocation, anyhow::Error>
+where
+ Fun: FnMut(std::fs::File) -> Result<(), anyhow::Error>,
+{
+ let (bucket, location) = cache_location(seed);
+ // we need a lock even if it exists since somebody might be still in the process of writing.
+ let _guard = CACHE_GENERATION_LOCKS[bucket % CACHE_GENERATION_BUCKET_COUNT].lock();
+ let exists = location.path().exists();
+ if !exists {
+ let f = std::fs::File::create(location.path())?;
+ generate(f)?;
+ }
+ drop(_guard);
+ Ok(location)
}
pub trait AssetLocationExt {
diff --git a/client/Cargo.toml b/client/Cargo.toml
index ea89057..05dd2d4 100644
--- a/client/Cargo.toml
+++ b/client/Cargo.toml
@@ -9,3 +9,4 @@ log = { workspace = true }
reqwest = { version = "0.11.20", features = ["json"] }
anyhow = "1.0.75"
serde_json = "1.0.107"
+tokio = { workspace = true }
diff --git a/client/src/lib.rs b/client/src/lib.rs
index 200c869..92545a9 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -12,6 +12,7 @@ use reqwest::{
use serde_json::json;
use std::time::Duration;
use stream::StreamSpec;
+use tokio::io::AsyncWriteExt;
pub use jellycommon::*;
@@ -93,7 +94,7 @@ impl Session {
&self,
id: &str,
role: &str,
- mut writer: impl std::io::Write,
+ mut writer: impl tokio::io::AsyncWrite + std::marker::Unpin,
) -> Result<()> {
debug!("downloading asset {role:?} for {id:?}");
let mut r = self
@@ -102,7 +103,7 @@ impl Session {
.send()
.await?;
while let Some(chunk) = r.chunk().await? {
- writer.write_all(&chunk)?;
+ writer.write_all(&chunk).await?;
}
Ok(())
}
diff --git a/remuxer/src/snippet.rs b/remuxer/src/snippet.rs
index f23e19e..cd965ba 100644
--- a/remuxer/src/snippet.rs
+++ b/remuxer/src/snippet.rs
@@ -68,7 +68,7 @@ pub fn write_snippet_into(
let private = &track_sources[track];
let source_path = path_base.join(&private.path);
let mapped = 1;
- info!("\t- {n} {source_path:?} ({} => {mapped})", private.track);
+ info!("\t- {track} {source_path:?} ({} => {mapped})", private.track);
info!("\t {}", info);
let file = File::open(&source_path).context("opening source file")?;
let mut index = File::open(source_path.with_extension(format!("si.{}", private.track)))
@@ -110,7 +110,7 @@ pub fn write_snippet_into(
let mut reader = SegmentExtractIter::new(&mut reader, private.track as u64);
- let mut blocks = vec![MatroskaTag::Timestamp(start_block.pts)];
+ let mut blocks = vec![MatroskaTag::Timestamp(0)];
for i in start_block_index..end_block_index {
let index_block = &index.blocks[i];
let mut block = reader.next()?;
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 291f56b..fbb9f63 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -25,10 +25,9 @@ vte = "0.12.0"
argon2 = "0.5.2"
aes-gcm-siv = "0.11.1"
-async-std = "1.12.0"
async-recursion = "1.0.5"
futures = "0.3.28"
-tokio = { version = "1.32.0", features = ["io-util"] }
+tokio = { workspace = true }
tokio-util = { version = "0.7.9", features = ["io", "io-util"] }
markup = "0.13.1"
diff --git a/server/src/import.rs b/server/src/import.rs
index 7e62b47..8d8198a 100644
--- a/server/src/import.rs
+++ b/server/src/import.rs
@@ -7,11 +7,17 @@ use crate::{database::Database, federation::Federation, CONF};
use anyhow::{anyhow, bail, Context, Ok};
use async_recursion::async_recursion;
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
-use jellybase::{cache_file, AssetLocationExt};
+use jellybase::async_cache_file;
use jellyclient::Session;
use jellycommon::{AssetLocation, MediaSource, Node, NodePrivate, RemoteImportOptions};
use log::{debug, error, info};
-use std::{ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::LazyLock};
+use std::{
+ ffi::OsStr,
+ fs::File,
+ os::unix::prelude::OsStrExt,
+ path::PathBuf,
+ sync::{Arc, LazyLock},
+};
use tokio::sync::Semaphore;
static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
@@ -133,7 +139,7 @@ static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(
async fn import_remote(
mut opts: RemoteImportOptions,
db: &Database,
- session: &Session,
+ session: &Arc<Session>,
identifier: String,
parent: Option<String>,
) -> anyhow::Result<Vec<String>> {
@@ -149,11 +155,11 @@ async fn import_remote(
.context("fetching remote node")?;
if node.federated.is_some() {
- return Ok(vec![]) // node is federated, lets not import it
+ return Ok(vec![]); // node is federated, lets not import it
}
- let poster = cache_federation_asset(session, &opts.id, "poster").await?;
- let backdrop = cache_federation_asset(session, &opts.id, "backdrop").await?;
+ let poster = cache_federation_asset(session.to_owned(), opts.id.clone(), "poster").await?;
+ let backdrop = cache_federation_asset(session.to_owned(), opts.id.clone(), "backdrop").await?;
drop(_permit);
@@ -215,15 +221,16 @@ async fn import_remote(
}
async fn cache_federation_asset(
- session: &Session,
- identifier: &String,
- role: &str,
+ session: Arc<Session>,
+ identifier: String,
+ role: &'static str,
) -> anyhow::Result<AssetLocation> {
- let poster = cache_file(&["federation-asset", role, identifier]);
- if !poster.path().exists() {
- session
- .node_asset(&identifier, role, File::create(&poster.path())?)
- .await?;
- }
- Ok(poster)
+ async_cache_file(
+ &["federation-asset", role, &identifier.clone()],
+ move |out| async move {
+ let session = session;
+ session.node_asset(identifier.as_str(), role, out).await
+ },
+ )
+ .await
}
diff --git a/server/src/routes/ui/assets.rs b/server/src/routes/ui/assets.rs
index 992e3da..8c0496e 100644
--- a/server/src/routes/ui/assets.rs
+++ b/server/src/routes/ui/assets.rs
@@ -8,7 +8,7 @@ use crate::{
routes::ui::{account::session::Session, error::MyError, CacheControlFile},
};
use anyhow::anyhow;
-use async_std::task::spawn_blocking;
+use jellybase::AssetLocationExt;
use jellycommon::AssetLocation;
use log::info;
use rocket::{get, http::ContentType, FromFormField, State, UriDisplayQuery};
@@ -53,11 +53,10 @@ pub async fn r_item_assets(
));
// fit the resolution into a finite set so the maximum cache is finite too.
let width = 2usize.pow(width.unwrap_or(2048).clamp(128, 8196).ilog2());
- let path =
- spawn_blocking(move || jellytranscoder::image::transcode(asset, 50., 5, width)).await?;
+ let path = jellytranscoder::image::transcode(asset, 50., 5, width).await?;
info!("loading asset from {path:?}");
Ok((
ContentType::AVIF,
- CacheControlFile::new(File::open(path).await?).await,
+ CacheControlFile::new(File::open(path.path()).await?).await,
))
}
diff --git a/server/src/routes/ui/layout.rs b/server/src/routes/ui/layout.rs
index fdda3e4..1c47247 100644
--- a/server/src/routes/ui/layout.rs
+++ b/server/src/routes/ui/layout.rs
@@ -16,7 +16,7 @@ use crate::{
},
uri,
};
-use async_std::task::block_on;
+use futures::executor::block_on;
use jellybase::CONF;
use markup::{DynRender, Render};
use rocket::{
diff --git a/transcoder/Cargo.toml b/transcoder/Cargo.toml
index e39b07c..ecb2bc8 100644
--- a/transcoder/Cargo.toml
+++ b/transcoder/Cargo.toml
@@ -18,3 +18,4 @@ rav1e = { version = "0.6.6", default-features = false, features = [
rayon = "1.8.0"
imgref = "1.9.4"
ravif = "0.11.3"
+tokio = { workspace = true }
diff --git a/transcoder/src/bin/reproduce_decode_error.rs b/transcoder/src/bin/reproduce_decode_error.rs
index 84fa6fd..3e0ca3a 100644
--- a/transcoder/src/bin/reproduce_decode_error.rs
+++ b/transcoder/src/bin/reproduce_decode_error.rs
@@ -1,11 +1,13 @@
use jellytranscoder::image::transcode;
-fn main() {
+#[tokio::main]
+async fn main() {
transcode(
jellycommon::AssetLocation::Cache(std::env::args().nth(2).unwrap().into()),
1.0,
1,
1,
)
+ .await
.unwrap();
}
diff --git a/transcoder/src/image.rs b/transcoder/src/image.rs
index 6da1be7..3865348 100644
--- a/transcoder/src/image.rs
+++ b/transcoder/src/image.rs
@@ -1,53 +1,60 @@
use anyhow::Context;
use image::{imageops::FilterType, ImageFormat};
-use jellybase::{cache_file, AssetLocationExt};
+use jellybase::{async_cache_file, AssetLocationExt};
use jellycommon::AssetLocation;
use log::{debug, info};
use rgb::FromSlice;
-use std::{
- fs::File,
- io::{BufReader, Write},
- path::PathBuf,
-};
+use std::{fs::File, io::BufReader};
+use tokio::io::AsyncWriteExt;
-pub fn transcode(
+pub async fn transcode(
asset: AssetLocation,
quality: f32,
speed: u8,
width: usize,
-) -> anyhow::Result<PathBuf> {
+) -> anyhow::Result<AssetLocation> {
let original_path = asset.path();
- let path = cache_file(&[
- original_path.as_os_str().to_str().unwrap(),
- &format!("{width} {quality} {speed}"),
- ])
- .path();
- if !path.exists() {
- info!("encoding {path:?} (speed={speed}, quality={quality}, width={width})");
- // TODO shouldn't be neccessary with guessed format.
- let file = BufReader::new(File::open(&original_path).context("opening source")?);
- let mut reader = image::io::Reader::new(file);
- reader.set_format(ImageFormat::Avif);
- let reader = reader.with_guessed_format().context("guessing format")?;
- debug!("guessed format (or fallback): {:?}", reader.format());
- let original = reader.decode().context("decoding image")?.to_rgba8();
- let image = image::imageops::resize(
- &original,
- width as u32,
- width as u32 * original.height() / original.width(),
- FilterType::Lanczos3,
- );
- let pixels = image.to_vec();
- let encoded = ravif::Encoder::new()
- .with_speed(speed.clamp(1, 10))
- .with_quality(quality.clamp(1., 100.))
- .encode_rgba(imgref::Img::new(
- pixels.as_rgba(),
- image.width() as usize,
- image.height() as usize,
- ))?;
- info!("transcode finished");
- File::create(&path)?.write_all(&encoded.avif_file)?;
- }
- Ok(path)
+ let asset = asset.clone();
+ Ok(async_cache_file(
+ &[
+ original_path.as_os_str().to_str().unwrap(),
+ &format!("{width} {quality} {speed}"),
+ ],
+ move |mut output| async move {
+ let encoded = tokio::task::spawn_blocking(move || {
+ let original_path = asset.path();
+ info!(
+ "encoding {original_path:?} (speed={speed}, quality={quality}, width={width})"
+ );
+ // TODO shouldn't be neccessary with guessed format.
+ let file = BufReader::new(File::open(&original_path).context("opening source")?);
+ let mut reader = image::io::Reader::new(file);
+ reader.set_format(ImageFormat::Avif);
+ let reader = reader.with_guessed_format().context("guessing format")?;
+ debug!("guessed format (or fallback): {:?}", reader.format());
+ let original = reader.decode().context("decoding image")?.to_rgba8();
+ let image = image::imageops::resize(
+ &original,
+ width as u32,
+ width as u32 * original.height() / original.width(),
+ FilterType::Lanczos3,
+ );
+ let pixels = image.to_vec();
+ let encoded = ravif::Encoder::new()
+ .with_speed(speed.clamp(1, 10))
+ .with_quality(quality.clamp(1., 100.))
+ .encode_rgba(imgref::Img::new(
+ pixels.as_rgba(),
+ image.width() as usize,
+ image.height() as usize,
+ ))?;
+ info!("transcode finished");
+ Ok::<_, anyhow::Error>(encoded)
+ })
+ .await??;
+ output.write_all(&encoded.avif_file).await?;
+ Ok(())
+ },
+ )
+ .await?)
}
diff --git a/transcoder/src/lib.rs b/transcoder/src/lib.rs
index e74a7f5..010a7dd 100644
--- a/transcoder/src/lib.rs
+++ b/transcoder/src/lib.rs
@@ -3,4 +3,5 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
+#![feature(async_closure)]
pub mod image;