diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-07 17:08:32 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-07 17:08:32 +0200 |
commit | 0b96cae3425c8781f5b755d52a81bbc7b8b3ef64 (patch) | |
tree | e806549576603865592e50d3556d5c8bb5bfc42e | |
parent | 32c282c96980c615cba077d441c574e85ff5add5 (diff) | |
download | weareearth-0b96cae3425c8781f5b755d52a81bbc7b8b3ef64.tar weareearth-0b96cae3425c8781f5b755d52a81bbc7b8b3ef64.tar.bz2 weareearth-0b96cae3425c8781f5b755d52a81bbc7b8b3ef64.tar.zst |
add db cache
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/cache.rs | 75 | ||||
-rw-r--r-- | src/main.rs | 111 |
4 files changed, 137 insertions, 52 deletions
@@ -1837,6 +1837,7 @@ dependencies = [ "prost", "prost-build", "prost-types", + "redb", "reqwest", "tokio", "weareshared", @@ -21,7 +21,7 @@ xdg = "2.5.2" glam = "0.30.1" futures-util = "0.3.31" clap = { version = "4.5.35", features = ["derive"] } - +redb = "2.4.0" weareshared = { path = "../wearechat/shared" } [build-dependencies] diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..114ab09 --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use redb::{Database, TableDefinition}; +use std::{ + fs::{File, create_dir_all}, + io::{Read, Write}, + path::{Path, PathBuf}, +}; + +pub enum Cache { + Directory(PathBuf), + Redb(Database), +} +const T_DOWNLOAD: TableDefinition<&str, &[u8]> = TableDefinition::new("dl"); +impl Cache { + pub fn new_directory() -> Result<Self> { + let cachedir = xdg::BaseDirectories::with_prefix("weareearth") + .unwrap() + .create_cache_directory("download") + .unwrap(); + create_dir_all(cachedir.join("BulkMetadata"))?; + create_dir_all(cachedir.join("NodeData"))?; + Ok(Self::Directory(cachedir)) + } + pub fn new_db(path: &Path) -> Result<Self> { + let db = Database::open(path)?; + { + let txn = db.begin_write()?; + txn.open_table(T_DOWNLOAD)?; + txn.commit()?; + } + Ok(Self::Redb(db)) + } + + pub fn get(&self, path: &str) -> Result<Option<Vec<u8>>> { + match self { + Cache::Directory(cachedir) => { + let cachepath = cachedir.join(path); + if cachepath.exists() { + let mut buf = Vec::new(); + File::open(cachepath)?.read_to_end(&mut buf)?; + Ok(Some(buf.into())) + } else { + Ok(None) + } + } + Cache::Redb(database) => { + let txn = database.begin_read()?; + let table = txn.open_table(T_DOWNLOAD)?; + let res = table.get(path)?; + if let Some(res) = res { + Ok(Some(res.value().to_vec())) + } else { + Ok(None) + } + } + } + } + pub fn insert(&self, path: &str, data: &[u8]) -> Result<()> { + match self { + Cache::Directory(cachedir) => { + let cachepath = cachedir.join(path); + File::create(cachepath)?.write_all(data)?; + Ok(()) + } + Cache::Redb(database) => { + let txn = database.begin_write()?; + let mut table = txn.open_table(T_DOWNLOAD)?; + table.insert(path, data)?; + drop(table); + txn.commit()?; + Ok(()) + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 23d840f..8d561b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ #![feature(array_chunks)] +pub mod cache; pub mod mesh; use anyhow::{Result, bail}; +use cache::Cache; use clap::Parser; -use futures_util::{StreamExt, lock::Mutex, stream::FuturesUnordered}; -use glam::DMat4; +use futures_util::{StreamExt, stream::FuturesUnordered}; +use glam::{DMat4, Vec3}; use log::{debug, error, info}; use mesh::{convert_mesh, decode_normal_table}; use prost::{Message, bytes::Bytes}; @@ -14,15 +16,12 @@ use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, }; use std::{f32::consts::PI, path::PathBuf, pin::Pin, sync::Arc}; -use tokio::{ - fs::{File, create_dir_all}, - io::{AsyncReadExt, AsyncWriteExt}, - sync::Semaphore, -}; +use tokio::sync::Semaphore; use weareshared::{ Affine3A, + helper::AABB, packets::Resource, - resources::{MeshPart, Prefab, RespackEntry}, + resources::{Prefab, RespackEntry, SpatialIndex}, respack::save_full_respack, store::ResourceStore, }; @@ -31,6 +30,8 @@ use weareshared::{ struct Args { #[arg(short, long, default_value = "16")] par_limit: usize, + #[arg(short, long)] + db_cache: Option<PathBuf>, #[clap(subcommand)] action: Action, } @@ -46,36 +47,42 @@ async fn main() -> Result<()> { let args = Args::parse(); + let cache = if let Some(path) = args.db_cache { + Cache::new_db(&path)? + } else { + Cache::new_directory()? + }; + let c = GeClient::new(16, cache).await?; + match args.action { Action::Cache { level } => { - let c = GeClient::new(16).await?; let entry = c.planetoid_metdata().await?; let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch(); - cache_all(Arc::new(c), "".to_string(), epoch, level).await?; + cache_all( + Arc::new(c), + Arc::new(Semaphore::new(64)), + "".to_string(), + epoch, + level, + ) + .await?; } Action::Export { level } => { - let c = GeClient::new(16).await?; let entry = c.planetoid_metdata().await?; let store = Arc::new(ResourceStore::new_memory()); - let meshes = Arc::new(Mutex::new(Vec::new())); - do_node( + let (_, root) = do_node( Arc::new(c), "".to_string(), - meshes.clone(), store.clone(), entry.root_node_metadata.unwrap().bulk_metadata_epoch(), level, ) .await?; - let prefab = store.set(&Prefab { - mesh: meshes.lock().await.clone(), - ..Default::default() - })?; let entry = store.set(&RespackEntry { - c_prefab: vec![prefab], + c_spatial_index: vec![root], ..Default::default() })?; let file = std::fs::File::create("/tmp/a.respack")?; @@ -89,27 +96,28 @@ async fn main() -> Result<()> { fn do_node( c: Arc<GeClient>, path: String, - meshes: Arc<Mutex<Vec<(Affine3A, Resource<MeshPart>)>>>, store: Arc<ResourceStore>, epoch: u32, level: usize, -) -> Pin<Box<dyn Future<Output = Result<()>>>> { +) -> Pin<Box<dyn Future<Output = Result<(AABB, Resource<SpatialIndex>)>>>> { Box::pin(async move { let bulk = c.bulk_metadata(&path, epoch).await?; let mut fu = FuturesUnordered::new(); + let mut meshes = Vec::new(); + let mut children = Vec::new(); for node_meta in &bulk.node_metadata { let (cpath, flags) = unpack_path_and_id(node_meta.path_and_flags()); // eprintln!("{path}+{cpath} {flags:?}"); let abspath = format!("{path}{cpath}"); - if flags.has_node && abspath.len() == level { + if flags.has_node { let node = c.node_data(&abspath, flags, &bulk, node_meta).await?; let transform = DMat4::from_cols_slice(&node.matrix_globe_from_mesh); let for_normals = decode_normal_table(node.for_normals()); for m in node.meshes { let mesh = convert_mesh(m, &store, &for_normals)?; - meshes.lock().await.push(( + meshes.push(( Affine3A::from_rotation_x(-PI / 2.) * Affine3A::from_mat4((transform / 500_000.).as_mat4()), mesh, @@ -117,32 +125,42 @@ fn do_node( } } if cpath.len() == 4 && flags.has_metadata && abspath.len() < level { - fu.push(do_node( - c.clone(), - abspath, - meshes.clone(), - store.clone(), - epoch, - level, - )); + fu.push(do_node(c.clone(), abspath, store.clone(), epoch, level)); } } while let Some(res) = fu.next().await { - res?; + children.push(res?); } - Ok(()) + let bounds = AABB { + min: Vec3::ZERO, + max: Vec3::MAX, + }; + let prefab = store.set(&Prefab { + mesh: meshes, + ..Default::default() + })?; + Ok(( + bounds, + store.set(&SpatialIndex { + prefab: Some(prefab), + child: children, + ..Default::default() + })?, + )) }) } fn cache_all( c: Arc<GeClient>, + par: Arc<Semaphore>, path: String, epoch: u32, level: usize, ) -> Pin<Box<dyn Future<Output = Result<()>>>> { Box::pin(async move { + let _permit = par.acquire().await?; let bulk = c.bulk_metadata(&path, epoch).await?; let mut fu = FuturesUnordered::new(); @@ -154,34 +172,28 @@ fn cache_all( c.node_data(&abspath, flags, &bulk, node_meta).await?; } if cpath.len() == 4 && flags.has_metadata && abspath.len() < level { - fu.push(cache_all(c.clone(), abspath, epoch, level)); + fu.push(cache_all(c.clone(), par.clone(), abspath, epoch, level)); } } while let Some(res) = fu.next().await { res?; } - + drop(_permit); Ok(()) }) } struct GeClient { client: Client, - cachedir: PathBuf, + cache: Cache, par_limit: Semaphore, } + impl GeClient { - pub async fn new(par_limit: usize) -> Result<Self> { - let cachedir = xdg::BaseDirectories::with_prefix("weareearth") - .unwrap() - .create_cache_directory("download") - .unwrap(); - create_dir_all(cachedir.join("BulkMetadata")).await?; - create_dir_all(cachedir.join("NodeData")).await?; + pub async fn new(par_limit: usize, cache: Cache) -> Result<Self> { Ok(Self { - cachedir, - par_limit: Semaphore::new(par_limit), + par_limit: Semaphore::new(par_limit),cache, client: Client::builder().default_headers(HeaderMap::from_iter([ (HeaderName::from_static("user-agent"), HeaderValue::from_static("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36")), (HeaderName::from_static("referer"), HeaderValue::from_static("https://earth.google.com/")) @@ -190,12 +202,9 @@ impl GeClient { } pub async fn download(&self, path: &str) -> Result<Bytes> { let _permit = self.par_limit.acquire().await?; - let cachepath = self.cachedir.join(path); - if cachepath.exists() { + if let Some(d) = self.cache.get(path)? { debug!("cached {path:?}"); - let mut buf = Vec::new(); - File::open(cachepath).await?.read_to_end(&mut buf).await?; - Ok(buf.into()) + Ok(d.into()) } else { info!("download {path:?}"); let res = self @@ -208,7 +217,7 @@ impl GeClient { bail!("error response") } let buf = res.bytes().await?; - File::create(cachepath).await?.write_all(&buf).await?; + self.cache.insert(path, &buf)?; Ok(buf) } } |