aboutsummaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-10-30 19:35:00 +0100
committermetamuffin <metamuffin@disroot.org>2025-10-30 19:35:02 +0100
commit2558e05b3661d3812ca6d417e5604da615124644 (patch)
tree30e09a9b63f6a66d1081c84f513541fc5446d539 /server
parentd861319090fa6378aabf4fd102566083b915c1a5 (diff)
downloadhurrycurry-2558e05b3661d3812ca6d417e5604da615124644.tar
hurrycurry-2558e05b3661d3812ca6d417e5604da615124644.tar.bz2
hurrycurry-2558e05b3661d3812ca6d417e5604da615124644.tar.zst
Add connection keepalives; Disconnect reasions
Diffstat (limited to 'server')
-rw-r--r--server/protocol/src/lib.rs3
-rw-r--r--server/src/main.rs32
-rw-r--r--server/src/server.rs8
-rw-r--r--server/src/state.rs48
4 files changed, 45 insertions, 46 deletions
diff --git a/server/protocol/src/lib.rs b/server/protocol/src/lib.rs
index 15911dd3..6b4e97f1 100644
--- a/server/protocol/src/lib.rs
+++ b/server/protocol/src/lib.rs
@@ -43,6 +43,8 @@ fn test_version_parse() {
let _ = *VERSION;
}
+pub const KEEPALIVE_INTERVAL: f32 = 1.;
+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[serde(transparent)]
pub struct PlayerID(#[serde(deserialize_with = "deser_i64")] pub i64);
@@ -142,6 +144,7 @@ pub enum PacketS {
Idle {
paused: bool,
},
+ Keepalive,
Ready,
/// For use in replay sessions only
diff --git a/server/src/main.rs b/server/src/main.rs
index 3fbefaf8..3bf39f5a 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -33,7 +33,7 @@ use tokio::{
net::{TcpListener, TcpStream},
spawn,
sync::{RwLock, broadcast},
- time::interval,
+ time::{interval, sleep},
};
use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
@@ -51,9 +51,9 @@ pub(crate) struct Args {
/// Map name to use as lobby
#[arg(long, default_value = "lobby")]
lobby: String,
- /// Inactivity kick timeout in seconds
+ /// Inactivity timeout in seconds
#[arg(long, default_value_t = 60.)]
- inactivity_kick_timeout: f32,
+ inactivity_timeout: f32,
/// Registers this server to the public server registry
#[arg(long)]
#[cfg(feature = "register")]
@@ -125,7 +125,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
let (tx, _) = broadcast::channel::<PacketC>(128 * 1024);
let config = ServerConfig {
- inactivity_kick_timeout: args.inactivity_kick_timeout,
+ inactivity_timeout: args.inactivity_timeout,
lobby: args.lobby,
};
@@ -214,14 +214,9 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
Ok(packet) => {
if send_packet(id, &mut sock, packet).await { break; };
},
- Err(e) => {
- broadcast_rx = broadcast_rx.resubscribe();
- warn!("{id} Client was lagging; resubscribed: {e}");
- let packet = PacketC::ServerMessage {
- message: trm!("s.state.overflow_resubscribe"),
- error: true,
- };
- if send_packet(id, &mut sock, packet).await { break; };
+ Err(_) => {
+ warn!("{id} Broadcast packet channel overflowed");
+ state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.channel_overflow")));
}
},
Some(Ok(message)) = sock.next() => {
@@ -229,7 +224,8 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) {
Ok(p) => p,
Err(e) => {
- warn!("{id} Invalid json packet: {e}");
+ warn!("{id} Invalid packet: {e}");
+ state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.invalid_packet", s = e.to_string())));
break;
}
},
@@ -243,7 +239,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
if matches!(
packet,
- PacketS::Movement { .. } | PacketS::ReplayTick { .. }
+ PacketS::Movement { .. } | PacketS::ReplayTick { .. } | PacketS::Keepalive
) {
trace!("{id} <- {packet:?}");
} else {
@@ -252,7 +248,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
let packet_out = match state.write().await.packet_in_outer(id, packet) {
Ok(packets) => packets,
Err(e) => {
- warn!("Client error: {e}");
+ warn!("{id} Packet error: {e}");
vec![PacketC::ServerMessage {
message: e.into(),
error: true,
@@ -265,10 +261,8 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
}
};
}
- state
- .write()
- .await
- .disconnect(id, hurrycurry_protocol::Message::Text(String::default()));
+ state.write().await.disconnect(id, None);
+ sleep(Duration::from_millis(100)).await; // avoids potential godot bug where disconnect packets are lost
});
}
Ok(())
diff --git a/server/src/server.rs b/server/src/server.rs
index 47f556e2..fc2df029 100644
--- a/server/src/server.rs
+++ b/server/src/server.rs
@@ -49,7 +49,7 @@ pub struct ConnectionData {
pub players: HashSet<PlayerID>,
pub idle: bool,
pub ready: bool,
- pub last_player_input: f32,
+ pub keepalive_timer: f32,
pub replies: mpsc::Sender<PacketC>,
}
@@ -60,7 +60,7 @@ pub enum AnnounceState {
}
pub struct ServerConfig {
- pub inactivity_kick_timeout: f32,
+ pub inactivity_timeout: f32,
pub lobby: String,
}
@@ -114,7 +114,7 @@ impl Server {
impl Default for ServerConfig {
fn default() -> Self {
Self {
- inactivity_kick_timeout: 60.,
+ inactivity_timeout: 60.,
lobby: "lobby".to_string(),
}
}
@@ -642,7 +642,7 @@ impl Server {
self.score_changed = true;
}
PacketS::ReplayTick { .. } => return Err(tre!("s.error.packet_not_supported")),
- PacketS::Idle { .. } | PacketS::Ready => (),
+ PacketS::Idle { .. } | PacketS::Ready | PacketS::Keepalive => (),
PacketS::Debug(d) => self.packet_out.push_back(PacketC::Debug(d)),
}
Ok(())
diff --git a/server/src/state.rs b/server/src/state.rs
index 1b42243f..23339543 100644
--- a/server/src/state.rs
+++ b/server/src/state.rs
@@ -21,8 +21,8 @@ use crate::{
};
use anyhow::Result;
use hurrycurry_locale::{TrError, tre, trm};
-use hurrycurry_protocol::{Menu, Message, PacketC, PacketS, PlayerID, VERSION};
-use log::{debug, info, trace};
+use hurrycurry_protocol::{KEEPALIVE_INTERVAL, Menu, Message, PacketC, PacketS, PlayerID, VERSION};
+use log::{debug, info, trace, warn};
use std::{
collections::HashSet,
time::{Duration, Instant},
@@ -31,15 +31,16 @@ use tokio::sync::{broadcast, mpsc};
impl Server {
pub fn tick_outer(&mut self, dt: f32) -> anyhow::Result<()> {
- let mut idle_kick = Vec::new();
+ let mut keepalive_kick = Vec::new();
for (cid, conn) in &mut self.connections {
- conn.last_player_input += dt;
- if conn.last_player_input > self.config.inactivity_kick_timeout {
- idle_kick.push(*cid);
+ conn.keepalive_timer += dt;
+ if conn.keepalive_timer > KEEPALIVE_INTERVAL + 10. {
+ keepalive_kick.push(*cid);
}
}
- for cid in idle_kick {
- self.disconnect(cid, trm!("s.disconnect_reason.inactivity_kick"));
+ for cid in keepalive_kick {
+ warn!("{cid} Client did not send keepalive in time");
+ self.disconnect(cid, Some(trm!("s.disconnect_reason.keepalive_timer")));
}
if !self.paused {
@@ -111,7 +112,7 @@ impl Server {
idle: false,
ready: false,
players: HashSet::new(),
- last_player_input: 0.,
+ keepalive_timer: 0.,
replies: replies_tx,
},
);
@@ -119,9 +120,11 @@ impl Server {
(init, broadcast_rx, replies_rx)
}
- pub fn disconnect(&mut self, conn: ConnectionID, reason: Message) {
+ pub fn disconnect(&mut self, conn: ConnectionID, reason: Option<Message>) {
if let Some(cd) = self.connections.get(&conn) {
- let _ = cd.replies.try_send(PacketC::Disconnect { reason });
+ if let Some(reason) = reason {
+ let _ = cd.replies.try_send(PacketC::Disconnect { reason });
+ }
for player in cd.players.clone() {
let _ = self.packet_in_outer(conn, PacketS::Leave { player });
}
@@ -135,8 +138,11 @@ impl Server {
conn: ConnectionID,
packet: PacketS,
) -> Result<Vec<PacketC>, TrError> {
+ let Some(conn_data) = self.connections.get_mut(&conn) else {
+ return Ok(vec![]);
+ };
if let Some(p) = get_packet_player(&packet)
- && !self.connections.get(&conn).unwrap().players.contains(&p)
+ && !conn_data.players.contains(&p)
{
return Err(tre!("s.error.packet_sender_invalid"));
}
@@ -159,11 +165,11 @@ impl Server {
}
}
PacketS::Ready => {
- self.connections.get_mut(&conn).unwrap().ready = true;
+ conn_data.ready = true;
self.update_paused();
}
PacketS::Idle { paused } => {
- self.connections.get_mut(&conn).unwrap().idle = *paused;
+ conn_data.idle = *paused;
self.update_paused();
}
PacketS::Leave { player } => {
@@ -174,17 +180,12 @@ impl Server {
.remove(player);
}
PacketS::Join { .. } => {
- if self.connections.get_mut(&conn).unwrap().players.len() > 8 {
+ if conn_data.players.len() > 8 {
return Err(tre!("s.error.conn_too_many_players"));
}
}
- PacketS::Interact { .. }
- | PacketS::Communicate { .. }
- | PacketS::Movement { boost: true, .. } => {
- self.connections.get_mut(&conn).unwrap().last_player_input = 0.;
- }
- PacketS::Movement { dir, .. } if dir.length() > 0.5 => {
- self.connections.get_mut(&conn).unwrap().last_player_input = 0.;
+ PacketS::Keepalive => {
+ conn_data.keepalive_timer = 0.;
}
_ => (),
}
@@ -280,6 +281,7 @@ fn get_packet_player(packet: &PacketS) -> Option<PlayerID> {
| PacketS::Ready
| PacketS::ApplyScore(_)
| PacketS::ReplayTick { .. }
- | PacketS::Debug(_) => None,
+ | PacketS::Debug(_)
+ | PacketS::Keepalive => None,
}
}