diff options
| author | metamuffin <metamuffin@disroot.org> | 2025-10-30 19:35:00 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2025-10-30 19:35:02 +0100 |
| commit | 2558e05b3661d3812ca6d417e5604da615124644 (patch) | |
| tree | 30e09a9b63f6a66d1081c84f513541fc5446d539 /server | |
| parent | d861319090fa6378aabf4fd102566083b915c1a5 (diff) | |
| download | hurrycurry-2558e05b3661d3812ca6d417e5604da615124644.tar hurrycurry-2558e05b3661d3812ca6d417e5604da615124644.tar.bz2 hurrycurry-2558e05b3661d3812ca6d417e5604da615124644.tar.zst | |
Add connection keepalives; Disconnect reasions
Diffstat (limited to 'server')
| -rw-r--r-- | server/protocol/src/lib.rs | 3 | ||||
| -rw-r--r-- | server/src/main.rs | 32 | ||||
| -rw-r--r-- | server/src/server.rs | 8 | ||||
| -rw-r--r-- | server/src/state.rs | 48 |
4 files changed, 45 insertions, 46 deletions
diff --git a/server/protocol/src/lib.rs b/server/protocol/src/lib.rs index 15911dd3..6b4e97f1 100644 --- a/server/protocol/src/lib.rs +++ b/server/protocol/src/lib.rs @@ -43,6 +43,8 @@ fn test_version_parse() { let _ = *VERSION; } +pub const KEEPALIVE_INTERVAL: f32 = 1.; + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] #[serde(transparent)] pub struct PlayerID(#[serde(deserialize_with = "deser_i64")] pub i64); @@ -142,6 +144,7 @@ pub enum PacketS { Idle { paused: bool, }, + Keepalive, Ready, /// For use in replay sessions only diff --git a/server/src/main.rs b/server/src/main.rs index 3fbefaf8..3bf39f5a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,7 +33,7 @@ use tokio::{ net::{TcpListener, TcpStream}, spawn, sync::{RwLock, broadcast}, - time::interval, + time::{interval, sleep}, }; use tokio_tungstenite::{WebSocketStream, tungstenite::Message}; @@ -51,9 +51,9 @@ pub(crate) struct Args { /// Map name to use as lobby #[arg(long, default_value = "lobby")] lobby: String, - /// Inactivity kick timeout in seconds + /// Inactivity timeout in seconds #[arg(long, default_value_t = 60.)] - inactivity_kick_timeout: f32, + inactivity_timeout: f32, /// Registers this server to the public server registry #[arg(long)] #[cfg(feature = "register")] @@ -125,7 +125,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { let (tx, _) = broadcast::channel::<PacketC>(128 * 1024); let config = ServerConfig { - inactivity_kick_timeout: args.inactivity_kick_timeout, + inactivity_timeout: args.inactivity_timeout, lobby: args.lobby, }; @@ -214,14 +214,9 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { Ok(packet) => { if send_packet(id, &mut sock, packet).await { break; }; }, - Err(e) => { - broadcast_rx = broadcast_rx.resubscribe(); - warn!("{id} Client was lagging; resubscribed: {e}"); - let packet = PacketC::ServerMessage { - message: trm!("s.state.overflow_resubscribe"), - error: true, - }; - if send_packet(id, &mut sock, packet).await { break; }; + Err(_) => { + warn!("{id} Broadcast packet channel overflowed"); + state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.channel_overflow"))); } }, Some(Ok(message)) = sock.next() => { @@ -229,7 +224,8 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) { Ok(p) => p, Err(e) => { - warn!("{id} Invalid json packet: {e}"); + warn!("{id} Invalid packet: {e}"); + state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.invalid_packet", s = e.to_string()))); break; } }, @@ -243,7 +239,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { if matches!( packet, - PacketS::Movement { .. } | PacketS::ReplayTick { .. } + PacketS::Movement { .. } | PacketS::ReplayTick { .. } | PacketS::Keepalive ) { trace!("{id} <- {packet:?}"); } else { @@ -252,7 +248,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { let packet_out = match state.write().await.packet_in_outer(id, packet) { Ok(packets) => packets, Err(e) => { - warn!("Client error: {e}"); + warn!("{id} Packet error: {e}"); vec![PacketC::ServerMessage { message: e.into(), error: true, @@ -265,10 +261,8 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { } }; } - state - .write() - .await - .disconnect(id, hurrycurry_protocol::Message::Text(String::default())); + state.write().await.disconnect(id, None); + sleep(Duration::from_millis(100)).await; // avoids potential godot bug where disconnect packets are lost }); } Ok(()) diff --git a/server/src/server.rs b/server/src/server.rs index 47f556e2..fc2df029 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -49,7 +49,7 @@ pub struct ConnectionData { pub players: HashSet<PlayerID>, pub idle: bool, pub ready: bool, - pub last_player_input: f32, + pub keepalive_timer: f32, pub replies: mpsc::Sender<PacketC>, } @@ -60,7 +60,7 @@ pub enum AnnounceState { } pub struct ServerConfig { - pub inactivity_kick_timeout: f32, + pub inactivity_timeout: f32, pub lobby: String, } @@ -114,7 +114,7 @@ impl Server { impl Default for ServerConfig { fn default() -> Self { Self { - inactivity_kick_timeout: 60., + inactivity_timeout: 60., lobby: "lobby".to_string(), } } @@ -642,7 +642,7 @@ impl Server { self.score_changed = true; } PacketS::ReplayTick { .. } => return Err(tre!("s.error.packet_not_supported")), - PacketS::Idle { .. } | PacketS::Ready => (), + PacketS::Idle { .. } | PacketS::Ready | PacketS::Keepalive => (), PacketS::Debug(d) => self.packet_out.push_back(PacketC::Debug(d)), } Ok(()) diff --git a/server/src/state.rs b/server/src/state.rs index 1b42243f..23339543 100644 --- a/server/src/state.rs +++ b/server/src/state.rs @@ -21,8 +21,8 @@ use crate::{ }; use anyhow::Result; use hurrycurry_locale::{TrError, tre, trm}; -use hurrycurry_protocol::{Menu, Message, PacketC, PacketS, PlayerID, VERSION}; -use log::{debug, info, trace}; +use hurrycurry_protocol::{KEEPALIVE_INTERVAL, Menu, Message, PacketC, PacketS, PlayerID, VERSION}; +use log::{debug, info, trace, warn}; use std::{ collections::HashSet, time::{Duration, Instant}, @@ -31,15 +31,16 @@ use tokio::sync::{broadcast, mpsc}; impl Server { pub fn tick_outer(&mut self, dt: f32) -> anyhow::Result<()> { - let mut idle_kick = Vec::new(); + let mut keepalive_kick = Vec::new(); for (cid, conn) in &mut self.connections { - conn.last_player_input += dt; - if conn.last_player_input > self.config.inactivity_kick_timeout { - idle_kick.push(*cid); + conn.keepalive_timer += dt; + if conn.keepalive_timer > KEEPALIVE_INTERVAL + 10. { + keepalive_kick.push(*cid); } } - for cid in idle_kick { - self.disconnect(cid, trm!("s.disconnect_reason.inactivity_kick")); + for cid in keepalive_kick { + warn!("{cid} Client did not send keepalive in time"); + self.disconnect(cid, Some(trm!("s.disconnect_reason.keepalive_timer"))); } if !self.paused { @@ -111,7 +112,7 @@ impl Server { idle: false, ready: false, players: HashSet::new(), - last_player_input: 0., + keepalive_timer: 0., replies: replies_tx, }, ); @@ -119,9 +120,11 @@ impl Server { (init, broadcast_rx, replies_rx) } - pub fn disconnect(&mut self, conn: ConnectionID, reason: Message) { + pub fn disconnect(&mut self, conn: ConnectionID, reason: Option<Message>) { if let Some(cd) = self.connections.get(&conn) { - let _ = cd.replies.try_send(PacketC::Disconnect { reason }); + if let Some(reason) = reason { + let _ = cd.replies.try_send(PacketC::Disconnect { reason }); + } for player in cd.players.clone() { let _ = self.packet_in_outer(conn, PacketS::Leave { player }); } @@ -135,8 +138,11 @@ impl Server { conn: ConnectionID, packet: PacketS, ) -> Result<Vec<PacketC>, TrError> { + let Some(conn_data) = self.connections.get_mut(&conn) else { + return Ok(vec![]); + }; if let Some(p) = get_packet_player(&packet) - && !self.connections.get(&conn).unwrap().players.contains(&p) + && !conn_data.players.contains(&p) { return Err(tre!("s.error.packet_sender_invalid")); } @@ -159,11 +165,11 @@ impl Server { } } PacketS::Ready => { - self.connections.get_mut(&conn).unwrap().ready = true; + conn_data.ready = true; self.update_paused(); } PacketS::Idle { paused } => { - self.connections.get_mut(&conn).unwrap().idle = *paused; + conn_data.idle = *paused; self.update_paused(); } PacketS::Leave { player } => { @@ -174,17 +180,12 @@ impl Server { .remove(player); } PacketS::Join { .. } => { - if self.connections.get_mut(&conn).unwrap().players.len() > 8 { + if conn_data.players.len() > 8 { return Err(tre!("s.error.conn_too_many_players")); } } - PacketS::Interact { .. } - | PacketS::Communicate { .. } - | PacketS::Movement { boost: true, .. } => { - self.connections.get_mut(&conn).unwrap().last_player_input = 0.; - } - PacketS::Movement { dir, .. } if dir.length() > 0.5 => { - self.connections.get_mut(&conn).unwrap().last_player_input = 0.; + PacketS::Keepalive => { + conn_data.keepalive_timer = 0.; } _ => (), } @@ -280,6 +281,7 @@ fn get_packet_player(packet: &PacketS) -> Option<PlayerID> { | PacketS::Ready | PacketS::ApplyScore(_) | PacketS::ReplayTick { .. } - | PacketS::Debug(_) => None, + | PacketS::Debug(_) + | PacketS::Keepalive => None, } } |