aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-08 15:12:43 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-08 15:12:43 +0200
commit7caef8ad89c09c172af84d592a6547882b7d62d0 (patch)
tree6baf7387a0fc22a308a7b9cbe0422d79c7ae1609
parentd8358601ff08262cc290f50cd1ddd56c115d0469 (diff)
downloadweareearth-7caef8ad89c09c172af84d592a6547882b7d62d0.tar
weareearth-7caef8ad89c09c172af84d592a6547882b7d62d0.tar.bz2
weareearth-7caef8ad89c09c172af84d592a6547882b7d62d0.tar.zst
make cache_metadata neither explode nor deadlock.
-rw-r--r--src/main.rs84
1 files changed, 42 insertions, 42 deletions
diff --git a/src/main.rs b/src/main.rs
index 8265941..6e3be3c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -11,8 +11,16 @@ use futures_util::{StreamExt, stream::FuturesUnordered};
use glam::{DMat4, DVec3};
use log::info;
use mesh::{convert_mesh, decode_normal_table};
-use std::{f32::consts::PI, path::PathBuf, pin::Pin, sync::Arc};
-use tokio::sync::Semaphore;
+use std::{
+ f32::consts::PI,
+ path::PathBuf,
+ pin::Pin,
+ sync::{Arc, atomic::AtomicU64},
+};
+use tokio::{
+ spawn,
+ sync::{Mutex, Notify, Semaphore},
+};
use weareshared::{
Affine3A,
helper::AABB,
@@ -63,7 +71,7 @@ async fn main() -> Result<()> {
let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch();
cache_all(
Arc::new(c),
- Arc::new(Semaphore::new(args.par_limit * 3 / 2)),
+ Arc::new(Semaphore::new(args.par_limit)),
"".to_string(),
epoch,
level,
@@ -74,14 +82,7 @@ async fn main() -> Result<()> {
let c = GeClient::new(16, cache).await?;
let entry = c.planetoid_metdata().await?;
let epoch = entry.root_node_metadata.unwrap().bulk_metadata_epoch();
- let count = cache_metadata(
- Arc::new(c),
- Arc::new(Semaphore::new(args.par_limit * 3 / 2)),
- "".to_string(),
- epoch,
- level,
- )
- .await?;
+ let count = cache_metadata(Arc::new(c), "".to_string(), epoch, level).await?;
info!("There are {count} nodes at level {level}");
}
Action::Export { level } => {
@@ -213,42 +214,41 @@ fn cache_all(
})
}
-fn cache_metadata(
- c: Arc<GeClient>,
- par: Arc<Semaphore>,
- path: String,
- epoch: u32,
- level: usize,
-) -> Pin<Box<dyn Future<Output = Result<u64>>>> {
- Box::pin(async move {
- let _permit = par.acquire().await?;
- let bulk = c.bulk_metadata(&path, epoch).await?;
-
- let mut fu = FuturesUnordered::new();
+async fn cache_metadata(c: Arc<GeClient>, path: String, epoch: u32, level: usize) -> Result<u64> {
+ let queue = Arc::new(Mutex::new(Vec::<String>::new()));
+ queue.lock().await.push(path);
+ let ck = Arc::new((Mutex::new(0usize), Notify::new(), AtomicU64::new(0)));
- let mut node_count = 0;
+ loop {
+ while let Some(path) = queue.lock().await.pop() {
+ let queue = queue.clone();
+ let c = c.clone();
+ let ck2 = ck.clone();
+ *ck2.0.lock().await += 1;
+ spawn(async move {
+ let bulk = c.bulk_metadata(&path, epoch).await.unwrap();
- for node_meta in &bulk.node_metadata {
- let (cpath, flags) = unpack_path_and_id(node_meta.path_and_flags());
- node_count += 1;
- if cpath.len() == 4 && flags.has_metadata && cpath.len() + path.len() < level {
- let abspath = format!("{path}{cpath}");
- fu.push(cache_metadata(
- c.clone(),
- par.clone(),
- abspath,
- epoch,
- level,
- ));
+ for node_meta in &bulk.node_metadata {
+ let (cpath, flags) = unpack_path_and_id(node_meta.path_and_flags());
+ ck2.2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ if cpath.len() == 4 && flags.has_metadata && cpath.len() + path.len() < level {
+ let abspath = format!("{path}{cpath}");
+ queue.lock().await.push(abspath)
+ }
+ }
+ *ck2.0.lock().await -= 1;
+ ck2.1.notify_waiters();
+ });
+ while *ck.0.lock().await > 128 {
+ ck.1.notified().await;
}
}
- drop(_permit);
-
- while let Some(res) = fu.next().await {
- node_count += res?;
+ if *ck.0.lock().await == 0 && queue.lock().await.is_empty() {
+ break;
}
- Ok(node_count)
- })
+ }
+
+ Ok(ck.2.load(std::sync::atomic::Ordering::Relaxed))
}
#[derive(Debug, Clone, Copy)]