aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cache.rs32
-rw-r--r--src/main.rs20
2 files changed, 36 insertions, 16 deletions
diff --git a/src/cache.rs b/src/cache.rs
index a4d6dfb..ebf5e82 100644
--- a/src/cache.rs
+++ b/src/cache.rs
@@ -1,7 +1,6 @@
use anyhow::Result;
use futures_util::{StreamExt, stream::FuturesUnordered};
use log::{debug, info};
-use redb::{Database, Durability, TableDefinition};
use std::{
fs::create_dir_all,
path::{Path, PathBuf},
@@ -14,9 +13,15 @@ use tokio::{
pub enum Cache {
Directory(PathBuf),
- Redb { db: Database, counter: AtomicUsize },
+ Redb {
+ db: redb::Database,
+ counter: AtomicUsize,
+ },
+ Rocksdb {
+ db: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
+ },
}
-const T_DOWNLOAD: TableDefinition<&str, &[u8]> = TableDefinition::new("dl");
+const T_DOWNLOAD: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new("dl");
impl Cache {
pub fn new_directory() -> Result<Self> {
let cachedir = xdg::BaseDirectories::with_prefix("weareearth")
@@ -27,9 +32,9 @@ impl Cache {
create_dir_all(cachedir.join("NodeData"))?;
Ok(Self::Directory(cachedir))
}
- pub fn new_db(path: &Path) -> Result<Self> {
- info!("opening db...");
- let db = Database::create(path)?;
+ pub fn new_redb(path: &Path) -> Result<Self> {
+ info!("opening redb...");
+ let db = redb::Database::create(path)?;
info!("done");
{
let txn = db.begin_write()?;
@@ -41,6 +46,12 @@ impl Cache {
counter: AtomicUsize::new(0),
})
}
+ pub fn new_rocksdb(path: &Path) -> Result<Self> {
+ info!("opening rocksdb...");
+ let db = rocksdb::DBWithThreadMode::open_default(path)?;
+ info!("done");
+ Ok(Self::Rocksdb { db })
+ }
pub async fn transfer_entries(&self, other: &Cache) -> Result<()> {
match self {
@@ -104,6 +115,7 @@ impl Cache {
Ok(None)
}
}
+ Cache::Rocksdb { db } => Ok(db.get(path)?),
}
}
pub async fn insert(&self, path: &str, data: &[u8]) -> Result<()> {
@@ -118,10 +130,10 @@ impl Cache {
let mut txn = db.begin_write()?;
if n % 100_000 == 0 {
info!("flushing db");
- txn.set_durability(Durability::Immediate);
+ txn.set_durability(redb::Durability::Immediate);
txn.set_quick_repair(true);
} else {
- txn.set_durability(Durability::None)
+ txn.set_durability(redb::Durability::None)
};
let mut table = txn.open_table(T_DOWNLOAD)?;
table.insert(path, data)?;
@@ -129,6 +141,10 @@ impl Cache {
txn.commit()?;
Ok(())
}
+ Cache::Rocksdb { db } => {
+ db.put(path, data)?;
+ Ok(())
+ }
}
}
}
diff --git a/src/main.rs b/src/main.rs
index 7ea1afa..66a011f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -26,7 +26,9 @@ struct Args {
#[arg(short, long, default_value = "16")]
par_limit: usize,
#[arg(short, long)]
- db_cache: Option<PathBuf>,
+ redb_cache: Option<PathBuf>,
+ #[arg(short, long)]
+ rocksdb_cache: Option<PathBuf>,
#[clap(subcommand)]
action: Action,
}
@@ -35,7 +37,7 @@ struct Args {
enum Action {
Cache { level: usize },
Export { level: usize },
- CacheFsToDb { db: PathBuf },
+ CacheFsToDb,
}
#[tokio::main]
@@ -44,15 +46,17 @@ async fn main() -> Result<()> {
let args = Args::parse();
- let cache = if let Some(path) = args.db_cache {
- Cache::new_db(&path)?
+ let cache = if let Some(path) = args.rocksdb_cache {
+ Cache::new_rocksdb(&path)?
+ } else if let Some(path) = args.redb_cache {
+ Cache::new_redb(&path)?
} else {
Cache::new_directory()?
};
- let c = GeClient::new(16, cache).await?;
match args.action {
Action::Cache { level } => {
+ let c = GeClient::new(16, cache).await?;
let entry = c.planetoid_metdata().await?;
let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch();
cache_all(
@@ -65,6 +69,7 @@ async fn main() -> Result<()> {
.await?;
}
Action::Export { level } => {
+ let c = GeClient::new(16, cache).await?;
let entry = c.planetoid_metdata().await?;
let store = Arc::new(ResourceStore::new_memory());
@@ -85,10 +90,9 @@ async fn main() -> Result<()> {
let file = std::fs::File::create("/tmp/a.respack")?;
save_full_respack(file, &store, Some(entry))?;
}
- Action::CacheFsToDb { db: dbpath } => {
+ Action::CacheFsToDb => {
let source = Cache::new_directory()?;
- let dest = Cache::new_db(&dbpath)?;
- source.transfer_entries(&dest).await?;
+ source.transfer_entries(&cache).await?;
}
}