aboutsummaryrefslogtreecommitdiff
path: root/src/cache.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/cache.rs')
-rw-r--r--src/cache.rs70
1 files changed, 62 insertions, 8 deletions
diff --git a/src/cache.rs b/src/cache.rs
index 67c7a01..a4d6dfb 100644
--- a/src/cache.rs
+++ b/src/cache.rs
@@ -1,8 +1,11 @@
use anyhow::Result;
+use futures_util::{StreamExt, stream::FuturesUnordered};
+use log::{debug, info};
use redb::{Database, Durability, TableDefinition};
use std::{
fs::create_dir_all,
path::{Path, PathBuf},
+ sync::atomic::{AtomicUsize, Ordering},
};
use tokio::{
fs::File,
@@ -11,7 +14,7 @@ use tokio::{
pub enum Cache {
Directory(PathBuf),
- Redb(Database),
+ Redb { db: Database, counter: AtomicUsize },
}
const T_DOWNLOAD: TableDefinition<&str, &[u8]> = TableDefinition::new("dl");
impl Cache {
@@ -25,13 +28,58 @@ impl Cache {
Ok(Self::Directory(cachedir))
}
pub fn new_db(path: &Path) -> Result<Self> {
+ info!("opening db...");
let db = Database::create(path)?;
+ info!("done");
{
let txn = db.begin_write()?;
txn.open_table(T_DOWNLOAD)?;
txn.commit()?;
}
- Ok(Self::Redb(db))
+ Ok(Self::Redb {
+ db,
+ counter: AtomicUsize::new(0),
+ })
+ }
+
+ pub async fn transfer_entries(&self, other: &Cache) -> Result<()> {
+ match self {
+ Cache::Directory(path) => {
+ let mut fu = FuturesUnordered::new();
+ let mut counter = 0;
+ for dir in ["NodeData", "BulkMetadata"] {
+ info!("entering {dir:?}");
+ for f in path.join(dir).read_dir()? {
+ let f = f?;
+
+ if fu.len() > 128 {
+ if let Some(x) = fu.next().await {
+ x?
+ };
+ }
+ counter += 1;
+ if counter % 1_000 == 0 {
+ debug!("{counter} items copied");
+ }
+
+ fu.push(async move {
+ let mut buf = Vec::new();
+ File::open(f.path()).await?.read_to_end(&mut buf).await?;
+ other
+ .insert(&format!("{dir}/{}", f.file_name().to_string_lossy()), &buf)
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ });
+ }
+ }
+ while let Some(x) = fu.next().await {
+ x?
+ }
+ info!("transfer done");
+ }
+ _ => unimplemented!(),
+ }
+ Ok(())
}
pub async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
@@ -46,8 +94,8 @@ impl Cache {
Ok(None)
}
}
- Cache::Redb(database) => {
- let txn = database.begin_read()?;
+ Cache::Redb { db, .. } => {
+ let txn = db.begin_read()?;
let table = txn.open_table(T_DOWNLOAD)?;
let res = table.get(path)?;
if let Some(res) = res {
@@ -65,10 +113,16 @@ impl Cache {
File::create(cachepath).await?.write_all(data).await?;
Ok(())
}
- Cache::Redb(database) => {
- let mut txn = database.begin_write()?;
- txn.set_durability(Durability::Eventual);
- txn.set_quick_repair(true);
+ Cache::Redb { db, counter } => {
+ let n = counter.fetch_add(1, Ordering::Relaxed);
+ let mut txn = db.begin_write()?;
+ if n % 100_000 == 0 {
+ info!("flushing db");
+ txn.set_durability(Durability::Immediate);
+ txn.set_quick_repair(true);
+ } else {
+ txn.set_durability(Durability::None)
+ };
let mut table = txn.open_table(T_DOWNLOAD)?;
table.insert(path, data)?;
drop(table);