From a1d2c5841568f2312ae18276c0c8b9517deea78b Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 7 Apr 2025 17:24:26 +0200 Subject: async caching --- src/cache.rs | 21 +++++++++++++-------- src/client.rs | 15 +++++++-------- src/main.rs | 2 +- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index c0e160f..67c7a01 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,10 +1,13 @@ use anyhow::Result; -use redb::{Database, TableDefinition}; +use redb::{Database, Durability, TableDefinition}; use std::{ - fs::{File, create_dir_all}, - io::{Read, Write}, + fs::create_dir_all, path::{Path, PathBuf}, }; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, +}; pub enum Cache { Directory(PathBuf), @@ -31,13 +34,13 @@ impl Cache { Ok(Self::Redb(db)) } - pub fn get(&self, path: &str) -> Result>> { + pub async fn get(&self, path: &str) -> Result>> { match self { Cache::Directory(cachedir) => { let cachepath = cachedir.join(path); if cachepath.exists() { let mut buf = Vec::new(); - File::open(cachepath)?.read_to_end(&mut buf)?; + File::open(cachepath).await?.read_to_end(&mut buf).await?; Ok(Some(buf.into())) } else { Ok(None) @@ -55,15 +58,17 @@ impl Cache { } } } - pub fn insert(&self, path: &str, data: &[u8]) -> Result<()> { + pub async fn insert(&self, path: &str, data: &[u8]) -> Result<()> { match self { Cache::Directory(cachedir) => { let cachepath = cachedir.join(path); - File::create(cachepath)?.write_all(data)?; + File::create(cachepath).await?.write_all(data).await?; Ok(()) } Cache::Redb(database) => { - let txn = database.begin_write()?; + let mut txn = database.begin_write()?; + txn.set_durability(Durability::Eventual); + txn.set_quick_repair(true); let mut table = txn.open_table(T_DOWNLOAD)?; table.insert(path, data)?; drop(table); diff --git a/src/client.rs b/src/client.rs index 2699c26..9281bce 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,8 @@ +use crate::{ + Flags, + cache::Cache, + proto::{BulkMetadata, NodeData, NodeMetadata, PlanetoidMetadata}, +}; use anyhow::{Result, bail}; use log::{debug, error, info}; use prost::{Message, bytes::Bytes}; @@ -8,12 +13,6 @@ use reqwest::{ use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::Semaphore; -use crate::{ - Flags, - cache::Cache, - proto::{BulkMetadata, NodeData, NodeMetadata, PlanetoidMetadata}, -}; - pub struct GeClient { counter: AtomicUsize, client: Client, @@ -34,7 +33,7 @@ impl GeClient { } pub async fn download(&self, path: &str) -> Result { let _permit = self.par_limit.acquire().await?; - if let Some(d) = self.cache.get(path)? { + if let Some(d) = self.cache.get(path).await? { debug!("cached {path:?}"); Ok(d.into()) } else { @@ -50,7 +49,7 @@ impl GeClient { bail!("error response") } let buf = res.bytes().await?; - self.cache.insert(path, &buf)?; + self.cache.insert(path, &buf).await?; Ok(buf) } } diff --git a/src/main.rs b/src/main.rs index bb80311..e9ae4c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -56,7 +56,7 @@ async fn main() -> Result<()> { let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch(); cache_all( Arc::new(c), - Arc::new(Semaphore::new(64)), + Arc::new(Semaphore::new(args.par_limit * 3 / 2)), "".to_string(), epoch, level, -- cgit v1.2.3-70-g09d2