diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cache.rs | 70 | ||||
-rw-r--r-- | src/main.rs | 6 |
2 files changed, 68 insertions, 8 deletions
diff --git a/src/cache.rs b/src/cache.rs index 67c7a01..a4d6dfb 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,8 +1,11 @@ use anyhow::Result; +use futures_util::{StreamExt, stream::FuturesUnordered}; +use log::{debug, info}; use redb::{Database, Durability, TableDefinition}; use std::{ fs::create_dir_all, path::{Path, PathBuf}, + sync::atomic::{AtomicUsize, Ordering}, }; use tokio::{ fs::File, @@ -11,7 +14,7 @@ use tokio::{ pub enum Cache { Directory(PathBuf), - Redb(Database), + Redb { db: Database, counter: AtomicUsize }, } const T_DOWNLOAD: TableDefinition<&str, &[u8]> = TableDefinition::new("dl"); impl Cache { @@ -25,13 +28,58 @@ impl Cache { Ok(Self::Directory(cachedir)) } pub fn new_db(path: &Path) -> Result<Self> { + info!("opening db..."); let db = Database::create(path)?; + info!("done"); { let txn = db.begin_write()?; txn.open_table(T_DOWNLOAD)?; txn.commit()?; } - Ok(Self::Redb(db)) + Ok(Self::Redb { + db, + counter: AtomicUsize::new(0), + }) + } + + pub async fn transfer_entries(&self, other: &Cache) -> Result<()> { + match self { + Cache::Directory(path) => { + let mut fu = FuturesUnordered::new(); + let mut counter = 0; + for dir in ["NodeData", "BulkMetadata"] { + info!("entering {dir:?}"); + for f in path.join(dir).read_dir()? { + let f = f?; + + if fu.len() > 128 { + if let Some(x) = fu.next().await { + x? + }; + } + counter += 1; + if counter % 1_000 == 0 { + debug!("{counter} items copied"); + } + + fu.push(async move { + let mut buf = Vec::new(); + File::open(f.path()).await?.read_to_end(&mut buf).await?; + other + .insert(&format!("{dir}/{}", f.file_name().to_string_lossy()), &buf) + .await?; + Ok::<_, anyhow::Error>(()) + }); + } + } + while let Some(x) = fu.next().await { + x? + } + info!("transfer done"); + } + _ => unimplemented!(), + } + Ok(()) } pub async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> { @@ -46,8 +94,8 @@ impl Cache { Ok(None) } } - Cache::Redb(database) => { - let txn = database.begin_read()?; + Cache::Redb { db, .. } => { + let txn = db.begin_read()?; let table = txn.open_table(T_DOWNLOAD)?; let res = table.get(path)?; if let Some(res) = res { @@ -65,10 +113,16 @@ impl Cache { File::create(cachepath).await?.write_all(data).await?; Ok(()) } - Cache::Redb(database) => { - let mut txn = database.begin_write()?; - txn.set_durability(Durability::Eventual); - txn.set_quick_repair(true); + Cache::Redb { db, counter } => { + let n = counter.fetch_add(1, Ordering::Relaxed); + let mut txn = db.begin_write()?; + if n % 100_000 == 0 { + info!("flushing db"); + txn.set_durability(Durability::Immediate); + txn.set_quick_repair(true); + } else { + txn.set_durability(Durability::None) + }; let mut table = txn.open_table(T_DOWNLOAD)?; table.insert(path, data)?; drop(table); diff --git a/src/main.rs b/src/main.rs index 4a395ad..7ea1afa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,6 +35,7 @@ struct Args { enum Action { Cache { level: usize }, Export { level: usize }, + CacheFsToDb { db: PathBuf }, } #[tokio::main] @@ -84,6 +85,11 @@ async fn main() -> Result<()> { let file = std::fs::File::create("/tmp/a.respack")?; save_full_respack(file, &store, Some(entry))?; } + Action::CacheFsToDb { db: dbpath } => { + let source = Cache::new_directory()?; + let dest = Cache::new_db(&dbpath)?; + source.transfer_entries(&dest).await?; + } } Ok(()) |