From 9958d376bb65dbe86d169d57c9d909a3f7c0b6f5 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 17 Oct 2025 19:40:05 +0200 Subject: Fix connection test caching --- server/registry/src/conn_test.rs | 56 +++++++++++++++++++++++----------------- server/registry/src/main.rs | 3 ++- 2 files changed, 34 insertions(+), 25 deletions(-) (limited to 'server') diff --git a/server/registry/src/conn_test.rs b/server/registry/src/conn_test.rs index 809abc78..e4f4fa63 100644 --- a/server/registry/src/conn_test.rs +++ b/server/registry/src/conn_test.rs @@ -15,8 +15,7 @@ along with this program. If not, see . */ -use crate::MAX_ADDR_CACHE; -use anyhow::{Result, bail}; +use anyhow::{Context, Result, bail}; use hurrycurry_client_lib::network::tokio::Network; use hurrycurry_protocol::PacketC; use log::info; @@ -25,7 +24,11 @@ use std::{ net::SocketAddr, time::{Duration, Instant}, }; -use tokio::{net::TcpStream, sync::RwLock, time::timeout}; +use tokio::{ + net::TcpStream, + sync::{RwLock, Semaphore}, + time::{sleep, timeout}, +}; #[derive(Debug, Clone, Copy)] struct ConnectTest { @@ -34,6 +37,17 @@ struct ConnectTest { } static CONNECT_OK: RwLock> = RwLock::const_new(BTreeMap::new()); +static CONNECT_SEMAPHORE: Semaphore = Semaphore::const_new(10); + +pub(crate) async fn prune_connect_cache_task() { + loop { + sleep(Duration::from_secs(600)).await; + CONNECT_OK + .write() + .await + .retain(|_, v| v.verified_at.elapsed().as_secs() < 3600); + } +} pub(crate) async fn test_connect(addr: SocketAddr, uri: &str) -> Result<(u32, u32), &'static str> { let r = CONNECT_OK.read().await.get(&addr).copied(); @@ -41,37 +55,31 @@ pub(crate) async fn test_connect(addr: SocketAddr, uri: &str) -> Result<(u32, u3 Ok(r.version) } else { // TODO locks to prevent parallel tests for same addr and dos attempts + let _permit = CONNECT_SEMAPHORE.acquire().await; let res = timeout(Duration::from_secs(10), test_connect_inner(addr, uri)) .await .map_err(|_| "connect timeout")?; info!("connect result: {res:?}"); + drop(_permit); + let version = res.map_err(|_| "server unreachable")?; - { - let mut g = CONNECT_OK.write().await; - // TODO dont cache forever - g.insert( - addr, - ConnectTest { - verified_at: Instant::now(), - version, - }, - ); - while g.len() > MAX_ADDR_CACHE { - // TODO perf maybe later - if let Some(key) = g.iter().min_by_key(|(_, v)| v.verified_at).map(|(k, _)| *k) { - g.remove(&key); - } - } - } - info!("cache updated"); + let mut g = CONNECT_OK.write().await; + g.insert( + addr, + ConnectTest { + verified_at: Instant::now(), + version, + }, + ); Ok(version) } } + async fn test_connect_inner(addr: SocketAddr, uri: &str) -> Result<(u32, u32)> { info!("test connect {addr} {uri:?}"); - let stream = TcpStream::connect(addr).await?; - let net = Network::connect_raw(stream, uri).await?; - let packet = net.receive().await?; + let stream = TcpStream::connect(addr).await.context("connect")?; + let net = Network::connect_raw(stream, uri).await.context("upgrade")?; + let packet = net.receive().await.context("receive")?; match packet { Some(PacketC::Version { minor, major, .. }) => Ok((major, minor)), _ => bail!("bad initial packet"), diff --git a/server/registry/src/main.rs b/server/registry/src/main.rs index 24ce6621..3cd252d9 100644 --- a/server/registry/src/main.rs +++ b/server/registry/src/main.rs @@ -21,6 +21,7 @@ pub mod list; pub mod lobby; pub mod register; +use crate::conn_test::prune_connect_cache_task; use hurrycurry_protocol::registry::Entry; use list::{generate_html_list, generate_json_list, r_list}; use lobby::lobby_wrapper; @@ -38,7 +39,6 @@ use std::{ }; use tokio::{sync::RwLock, time::interval}; -const MAX_ADDR_CACHE: usize = 4096; const MAX_SERVERS: usize = 128; fn main() { @@ -64,6 +64,7 @@ fn main() { .unwrap() .block_on(async move { tokio::task::spawn(Registry::update_loop(registry.clone())); + tokio::task::spawn(prune_connect_cache_task()); tokio::task::spawn(lobby_wrapper(registry.clone(), lobby_addr)); rocket::build() .configure(Config { -- cgit v1.3