use anyhow::Result; use futures_util::{StreamExt, stream::FuturesUnordered}; use log::{debug, info}; use std::{ fs::create_dir_all, path::{Path, PathBuf}, sync::atomic::{AtomicUsize, Ordering}, }; use tokio::{ fs::File, io::{AsyncReadExt, AsyncWriteExt}, }; pub enum Cache { Directory(PathBuf), Redb { db: redb::Database, counter: AtomicUsize, }, Rocksdb { db: rocksdb::DBWithThreadMode, }, } const T_DOWNLOAD: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new("dl"); impl Cache { pub fn new_directory() -> Result { let cachedir = xdg::BaseDirectories::with_prefix("weareearth") .unwrap() .create_cache_directory("download") .unwrap(); create_dir_all(cachedir.join("BulkMetadata"))?; create_dir_all(cachedir.join("NodeData"))?; Ok(Self::Directory(cachedir)) } pub fn new_redb(path: &Path) -> Result { info!("opening redb..."); let db = redb::Database::create(path)?; info!("done"); { let txn = db.begin_write()?; txn.open_table(T_DOWNLOAD)?; txn.commit()?; } Ok(Self::Redb { db, counter: AtomicUsize::new(0), }) } pub fn new_rocksdb(path: &Path) -> Result { info!("opening rocksdb..."); let mut opts = rocksdb::Options::default(); opts.increase_parallelism(8); opts.create_if_missing(true); let db = rocksdb::DBWithThreadMode::open(&opts, path)?; info!("done"); Ok(Self::Rocksdb { db }) } pub async fn transfer_entries(&self, other: &Cache) -> Result<()> { match self { Cache::Directory(path) => { let mut fu = FuturesUnordered::new(); let mut counter = 0; for dir in ["NodeData", "BulkMetadata"] { info!("entering {dir:?}"); for f in path.join(dir).read_dir()? { let f = f?; if fu.len() > 128 { if let Some(x) = fu.next().await { x? }; } counter += 1; if counter % 1_000 == 0 { debug!("{counter} items copied"); } fu.push(async move { let mut buf = Vec::new(); File::open(f.path()).await?.read_to_end(&mut buf).await?; other .insert(&format!("{dir}/{}", f.file_name().to_string_lossy()), &buf) .await?; Ok::<_, anyhow::Error>(()) }); } } while let Some(x) = fu.next().await { x? } info!("transfer done"); } _ => unimplemented!(), } Ok(()) } pub async fn get(&self, path: &str) -> Result>> { match self { Cache::Directory(cachedir) => { let cachepath = cachedir.join(path); if cachepath.exists() { let mut buf = Vec::new(); File::open(cachepath).await?.read_to_end(&mut buf).await?; Ok(Some(buf.into())) } else { Ok(None) } } Cache::Redb { db, .. } => { let txn = db.begin_read()?; let table = txn.open_table(T_DOWNLOAD)?; let res = table.get(path)?; if let Some(res) = res { Ok(Some(res.value().to_vec())) } else { Ok(None) } } Cache::Rocksdb { db } => Ok(db.get(path)?), } } pub async fn insert(&self, path: &str, data: &[u8]) -> Result<()> { match self { Cache::Directory(cachedir) => { let cachepath = cachedir.join(path); File::create(cachepath).await?.write_all(data).await?; Ok(()) } Cache::Redb { db, counter } => { let n = counter.fetch_add(1, Ordering::Relaxed); let mut txn = db.begin_write()?; if n % 100_000 == 0 { info!("flushing db"); txn.set_durability(redb::Durability::Immediate); txn.set_quick_repair(true); } else { txn.set_durability(redb::Durability::None) }; let mut table = txn.open_table(T_DOWNLOAD)?; table.insert(path, data)?; drop(table); txn.commit()?; Ok(()) } Cache::Rocksdb { db } => { db.put(path, data)?; Ok(()) } } } }