diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 84 |
1 files changed, 42 insertions, 42 deletions
diff --git a/src/main.rs b/src/main.rs index 8265941..6e3be3c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,8 +11,16 @@ use futures_util::{StreamExt, stream::FuturesUnordered}; use glam::{DMat4, DVec3}; use log::info; use mesh::{convert_mesh, decode_normal_table}; -use std::{f32::consts::PI, path::PathBuf, pin::Pin, sync::Arc}; -use tokio::sync::Semaphore; +use std::{ + f32::consts::PI, + path::PathBuf, + pin::Pin, + sync::{Arc, atomic::AtomicU64}, +}; +use tokio::{ + spawn, + sync::{Mutex, Notify, Semaphore}, +}; use weareshared::{ Affine3A, helper::AABB, @@ -63,7 +71,7 @@ async fn main() -> Result<()> { let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch(); cache_all( Arc::new(c), - Arc::new(Semaphore::new(args.par_limit * 3 / 2)), + Arc::new(Semaphore::new(args.par_limit)), "".to_string(), epoch, level, @@ -74,14 +82,7 @@ async fn main() -> Result<()> { let c = GeClient::new(16, cache).await?; let entry = c.planetoid_metdata().await?; let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch(); - let count = cache_metadata( - Arc::new(c), - Arc::new(Semaphore::new(args.par_limit * 3 / 2)), - "".to_string(), - epoch, - level, - ) - .await?; + let count = cache_metadata(Arc::new(c), "".to_string(), epoch, level).await?; info!("There are {count} nodes at level {level}"); } Action::Export { level } => { @@ -213,42 +214,41 @@ fn cache_all( }) } -fn cache_metadata( - c: Arc<GeClient>, - par: Arc<Semaphore>, - path: String, - epoch: u32, - level: usize, -) -> Pin<Box<dyn Future<Output = Result<u64>>>> { - Box::pin(async move { - let _permit = par.acquire().await?; - let bulk = c.bulk_metadata(&path, epoch).await?; - - let mut fu = FuturesUnordered::new(); +async fn cache_metadata(c: Arc<GeClient>, path: String, epoch: u32, level: usize) -> Result<u64> { + let queue = Arc::new(Mutex::new(Vec::<String>::new())); + queue.lock().await.push(path); + let ck = Arc::new((Mutex::new(0usize), Notify::new(), AtomicU64::new(0))); - let mut node_count = 0; + loop { + while let Some(path) = queue.lock().await.pop() { + let queue = queue.clone(); + let c = c.clone(); + let ck2 = ck.clone(); + *ck2.0.lock().await += 1; + spawn(async move { + let bulk = c.bulk_metadata(&path, epoch).await.unwrap(); - for node_meta in &bulk.node_metadata { - let (cpath, flags) = unpack_path_and_id(node_meta.path_and_flags()); - node_count += 1; - if cpath.len() == 4 && flags.has_metadata && cpath.len() + path.len() < level { - let abspath = format!("{path}{cpath}"); - fu.push(cache_metadata( - c.clone(), - par.clone(), - abspath, - epoch, - level, - )); + for node_meta in &bulk.node_metadata { + let (cpath, flags) = unpack_path_and_id(node_meta.path_and_flags()); + ck2.2.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if cpath.len() == 4 && flags.has_metadata && cpath.len() + path.len() < level { + let abspath = format!("{path}{cpath}"); + queue.lock().await.push(abspath) + } + } + *ck2.0.lock().await -= 1; + ck2.1.notify_waiters(); + }); + while *ck.0.lock().await > 128 { + ck.1.notified().await; } } - drop(_permit); - - while let Some(res) = fu.next().await { - node_count += res?; + if *ck.0.lock().await == 0 && queue.lock().await.is_empty() { + break; } - Ok(node_count) - }) + } + + Ok(ck.2.load(std::sync::atomic::Ordering::Relaxed)) } #[derive(Debug, Clone, Copy)] |