aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cache.rs21
-rw-r--r--src/client.rs15
-rw-r--r--src/main.rs2
3 files changed, 21 insertions, 17 deletions
diff --git a/src/cache.rs b/src/cache.rs
index c0e160f..67c7a01 100644
--- a/src/cache.rs
+++ b/src/cache.rs
@@ -1,10 +1,13 @@
use anyhow::Result;
-use redb::{Database, TableDefinition};
+use redb::{Database, Durability, TableDefinition};
use std::{
- fs::{File, create_dir_all},
- io::{Read, Write},
+ fs::create_dir_all,
path::{Path, PathBuf},
};
+use tokio::{
+ fs::File,
+ io::{AsyncReadExt, AsyncWriteExt},
+};
pub enum Cache {
Directory(PathBuf),
@@ -31,13 +34,13 @@ impl Cache {
Ok(Self::Redb(db))
}
- pub fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+ pub async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
match self {
Cache::Directory(cachedir) => {
let cachepath = cachedir.join(path);
if cachepath.exists() {
let mut buf = Vec::new();
- File::open(cachepath)?.read_to_end(&mut buf)?;
+ File::open(cachepath).await?.read_to_end(&mut buf).await?;
Ok(Some(buf.into()))
} else {
Ok(None)
@@ -55,15 +58,17 @@ impl Cache {
}
}
}
- pub fn insert(&self, path: &str, data: &[u8]) -> Result<()> {
+ pub async fn insert(&self, path: &str, data: &[u8]) -> Result<()> {
match self {
Cache::Directory(cachedir) => {
let cachepath = cachedir.join(path);
- File::create(cachepath)?.write_all(data)?;
+ File::create(cachepath).await?.write_all(data).await?;
Ok(())
}
Cache::Redb(database) => {
- let txn = database.begin_write()?;
+ let mut txn = database.begin_write()?;
+ txn.set_durability(Durability::Eventual);
+ txn.set_quick_repair(true);
let mut table = txn.open_table(T_DOWNLOAD)?;
table.insert(path, data)?;
drop(table);
diff --git a/src/client.rs b/src/client.rs
index 2699c26..9281bce 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,3 +1,8 @@
+use crate::{
+ Flags,
+ cache::Cache,
+ proto::{BulkMetadata, NodeData, NodeMetadata, PlanetoidMetadata},
+};
use anyhow::{Result, bail};
use log::{debug, error, info};
use prost::{Message, bytes::Bytes};
@@ -8,12 +13,6 @@ use reqwest::{
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;
-use crate::{
- Flags,
- cache::Cache,
- proto::{BulkMetadata, NodeData, NodeMetadata, PlanetoidMetadata},
-};
-
pub struct GeClient {
counter: AtomicUsize,
client: Client,
@@ -34,7 +33,7 @@ impl GeClient {
}
pub async fn download(&self, path: &str) -> Result<Bytes> {
let _permit = self.par_limit.acquire().await?;
- if let Some(d) = self.cache.get(path)? {
+ if let Some(d) = self.cache.get(path).await? {
debug!("cached {path:?}");
Ok(d.into())
} else {
@@ -50,7 +49,7 @@ impl GeClient {
bail!("error response")
}
let buf = res.bytes().await?;
- self.cache.insert(path, &buf)?;
+ self.cache.insert(path, &buf).await?;
Ok(buf)
}
}
diff --git a/src/main.rs b/src/main.rs
index bb80311..e9ae4c2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -56,7 +56,7 @@ async fn main() -> Result<()> {
let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch();
cache_all(
Arc::new(c),
- Arc::new(Semaphore::new(64)),
+ Arc::new(Semaphore::new(args.par_limit * 3 / 2)),
"".to_string(),
epoch,
level,