diff options
| -rw-r--r-- | server/src/commands.rs | 10 | ||||
| -rw-r--r-- | server/src/main.rs | 168 | ||||
| -rw-r--r-- | server/src/server.rs | 13 | ||||
| -rw-r--r-- | server/src/state.rs | 83 |
4 files changed, 147 insertions, 127 deletions
diff --git a/server/src/commands.rs b/server/src/commands.rs index f8016c9f..a59d93a0 100644 --- a/server/src/commands.rs +++ b/server/src/commands.rs @@ -151,7 +151,7 @@ impl Server { skip_announce, } => { if !self.game.lobby { - self.tx + self.broadcast .send(PacketC::ServerMessage { message: trm!( "s.state.game_aborted", @@ -177,7 +177,7 @@ impl Server { } } Command::End => { - self.tx + self.broadcast .send(PacketC::ServerMessage { message: trm!( "s.state.game_aborted", @@ -248,7 +248,7 @@ impl Server { error: false, }), Command::Effect { name } => { - self.tx.send(PacketC::Effect { name, player }).ok(); + self.broadcast.send(PacketC::Effect { name, player }).ok(); } Command::Item { name } => { let item = self @@ -256,7 +256,7 @@ impl Server { .data .get_item_by_name(&name) .ok_or(tre!("s.error.item_not_found", s = name))?; - self.tx + self.broadcast .send(PacketC::Communicate { player, message: Some(Message::Item(item)), @@ -270,7 +270,7 @@ impl Server { .data .get_tile_by_name(&name) .ok_or(tre!("s.error.no_tile", s = name))?; - self.tx + self.broadcast .send(PacketC::Communicate { player, message: Some(Message::Tile(tile)), diff --git a/server/src/main.rs b/server/src/main.rs index e87d52b6..0236ebe4 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -27,12 +27,12 @@ use std::{ time::Duration, }; use tokio::{ - net::TcpListener, + net::{TcpListener, TcpStream}, spawn, - sync::{RwLock, broadcast, mpsc::channel}, + sync::{RwLock, broadcast}, time::interval, }; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{WebSocketStream, tungstenite::Message}; #[derive(Parser)] pub(crate) struct Args { @@ -150,7 +150,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { let ws_listener = TcpListener::bind(args.listen).await?; info!("Listening for websockets on {}", ws_listener.local_addr()?); - let (tx, rx) = broadcast::channel::<PacketC>(128 * 1024); + let (tx, _) = broadcast::channel::<PacketC>(128 * 1024); let mut state = Server::new(data_path, tx)?; state.load(state.index.generate_with_book("lobby")?, None); @@ -200,25 +200,18 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { for id in (1..).map(ConnectionID) { let (sock, addr) = ws_listener.accept().await?; - let Ok(sock) = tokio_tungstenite::accept_async(sock).await else { + let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { warn!("Invalid ws handshake"); continue; }; info!("{id} Client connected ({addr})"); - let (mut write, mut read) = sock.split(); let state = state.clone(); - let mut rx = rx.resubscribe(); - let (error_tx, mut error_rx) = channel::<PacketC>(8); - - let init = state.write().await.connect(id).await; - - // let supports_binary = Arc::new(AtomicBool::new(false)); - // let supports_binary2 = supports_binary.clone(); + let (init, mut broadcast_rx, mut replies_rx) = state.write().await.connect(id).await; spawn(async move { for p in init { - if let Err(e) = write + if let Err(e) = sock .send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&p).unwrap().into(), )) @@ -228,94 +221,93 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { return; } } + debug!("{id} client has caught up"); loop { - let Some(packet) = tokio::select!( - p = rx.recv() => Some(match p { - Ok(e) => e, + tokio::select! { + p = replies_rx.recv() => match p { + Some(packet) => { + if send_packet(id, &mut sock, packet).await { break; }; + }, + None => { + info!("{id} Server closed the connection"); + break; + } + }, + p = broadcast_rx.recv() => match p { + Ok(packet) => { + if send_packet(id, &mut sock, packet).await { break; }; + }, Err(e) => { - rx = rx.resubscribe(); + broadcast_rx = broadcast_rx.resubscribe(); warn!("{id} Client was lagging; resubscribed: {e}"); - PacketC::ServerMessage { + let packet = PacketC::ServerMessage { message: trm!("s.state.overflow_resubscribe"), error: true, - } - } - }), - p = error_rx.recv() => p - ) else { - info!("{id} Client outbound sender dropped. closing connection"); - break; - }; - // let message = if supports_binary.load(Ordering::Relaxed) { - // Message::Binary( - // bincode::encode_to_vec(&packet, BINCODE_CONFIG) - // .unwrap() - // .into(), - // ) - // } else { - let message = Message::Text(serde_json::to_string(&packet).unwrap().into()); - // }; - if let Err(e) = write.send(message).await { - warn!("{id} WebSocket error: {e}"); - break; - } - } - }); - - spawn(async move { - while let Some(Ok(message)) = read.next().await { - let packet = match message { - Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) { - Ok(p) => p, - Err(e) => { - warn!("{id} Invalid json packet: {e}"); - break; + }; + if send_packet(id, &mut sock, packet).await { break; }; } }, - Message::Binary(_packet) => { - // supports_binary2.store(true, Ordering::Relaxed); - // match bincode::decode_from_slice::<PacketS, _>(&packet, BINCODE_CONFIG) { - // Ok((p, _size)) => p, - // Err(e) => { - // warn!("Invalid binary packet: {e}"); - // break; - // } - // } - continue; - } - Message::Close(_) => break, - _ => continue, - }; + Some(Ok(message)) = sock.next() => { + let packet = match message { + Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) { + Ok(p) => p, + Err(e) => { + warn!("{id} Invalid json packet: {e}"); + break; + } + }, + Message::Binary(_packet) => continue, + Message::Close(_) => { + info!("{id} Client closed the connection"); + break + }, + _ => continue, + }; - if matches!( - packet, - PacketS::Movement { .. } | PacketS::ReplayTick { .. } - ) { - trace!("{id} <- {packet:?}"); - } else { - debug!("{id} <- {packet:?}"); - } - let packet_out = match state.write().await.packet_in_outer(id, packet).await { - Ok(packets) => packets, - Err(e) => { - warn!("Client error: {e}"); - vec![PacketC::ServerMessage { - message: e.into(), - error: true, - }] - } + if matches!( + packet, + PacketS::Movement { .. } | PacketS::ReplayTick { .. } + ) { + trace!("{id} <- {packet:?}"); + } else { + debug!("{id} <- {packet:?}"); + } + let packet_out = match state.write().await.packet_in_outer(id, packet) { + Ok(packets) => packets, + Err(e) => { + warn!("Client error: {e}"); + vec![PacketC::ServerMessage { + message: e.into(), + error: true, + }] + } + }; + for packet in packet_out { + if send_packet(id, &mut sock, packet).await { break; }; + } + } }; - for packet in packet_out { - let _ = error_tx.send(packet).await; - } } - info!("{id} Client disconnected"); - let _ = state.write().await.disconnect(id).await; + let _ = state.write().await.disconnect(id); }); } Ok(()) } +async fn send_packet( + id: ConnectionID, + sock: &mut WebSocketStream<TcpStream>, + packet: PacketC, +) -> bool { + let message = Message::Text(serde_json::to_string(&packet).unwrap().into()); + if let Err(e) = sock.send(message).await { + warn!("{id} WebSocket error: {e}"); + true + } else { + false + } +} + #[cfg(test)] mod test { use hurrycurry_protocol::{Character, PacketS, PlayerClass, PlayerID}; @@ -386,11 +378,9 @@ mod test { position: None, }, ) - .await .unwrap(); assert!( s.packet_in_outer(ConnectionID(conn.try_into().unwrap()), p) - .await .is_err(), "test {}", conn, diff --git a/server/src/server.rs b/server/src/server.rs index dc63f1b5..267b3062 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -42,13 +42,15 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::sync::broadcast::Sender; +use tokio::sync::{broadcast, mpsc}; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct ConnectionData { pub players: HashSet<PlayerID>, pub idle: bool, pub ready: bool, + pub last_player_input: f32, + pub replies: mpsc::Sender<PacketC>, } pub enum AnnounceState { @@ -59,7 +61,7 @@ pub enum AnnounceState { pub struct Server { pub tick_perf: (Duration, usize), - pub tx: Sender<PacketC>, + pub broadcast: broadcast::Sender<PacketC>, pub connections: HashMap<ConnectionID, ConnectionData>, pub paused: bool, pub announce_state: AnnounceState, @@ -130,7 +132,6 @@ impl GameServerExt for Game { timer: Option<Duration>, packet_out: &mut VecDeque<PacketC>, ) { - // TODO cleanup let players = self .players .iter() @@ -312,12 +313,12 @@ impl GameServerExt for Game { } impl Server { - pub fn new(data_path: PathBuf, tx: Sender<PacketC>) -> Result<Self> { + pub fn new(data_path: PathBuf, tx: broadcast::Sender<PacketC>) -> Result<Self> { Ok(Self { game: Game::default(), tick_perf: (Duration::ZERO, 0), index: DataIndex::new(data_path).context("Failed to load data index")?, - tx, + broadcast: tx, announce_state: AnnounceState::Done, packet_out: VecDeque::new(), connections: HashMap::new(), diff --git a/server/src/state.rs b/server/src/state.rs index 001e2bf0..a42c1663 100644 --- a/server/src/state.rs +++ b/server/src/state.rs @@ -23,10 +23,25 @@ use anyhow::Result; use hurrycurry_locale::{TrError, tre, trm}; use hurrycurry_protocol::{Menu, Message, PacketC, PacketS, PlayerID, VERSION}; use log::{debug, info, trace}; -use std::time::{Duration, Instant}; +use std::{ + collections::HashSet, + time::{Duration, Instant}, +}; +use tokio::sync::{broadcast, mpsc}; impl Server { pub fn tick_outer(&mut self, dt: f32) -> anyhow::Result<()> { + let mut idle_kick = Vec::new(); + for (cid, conn) in &mut self.connections { + conn.last_player_input += dt; + if conn.last_player_input > 60. { + idle_kick.push(*cid); + } + } + for cid in idle_kick { + self.disconnect(cid); + } + if !self.paused { let start = Instant::now(); let r = self.tick(dt); @@ -66,13 +81,22 @@ impl Server { } else { debug!("-> {p:?}"); } - self.tx.send(p).unwrap(); + let _ = self.broadcast.send(p); } Ok(()) } - pub async fn connect(&mut self, id: ConnectionID) -> Vec<PacketC> { + pub async fn connect( + &mut self, + id: ConnectionID, + ) -> ( + Vec<PacketC>, + broadcast::Receiver<PacketC>, + mpsc::Receiver<PacketC>, + ) { let mut init = self.game.prime_client(); + let (replies_tx, replies_rx) = mpsc::channel(1024); + let broadcast_rx = self.broadcast.subscribe(); init.insert( 0, PacketC::Version { @@ -81,33 +105,37 @@ impl Server { supports_bincode: true, }, ); - self.connections.insert(id, ConnectionData::default()); + self.connections.insert( + id, + ConnectionData { + idle: false, + ready: false, + players: HashSet::new(), + last_player_input: 0., + replies: replies_tx, + }, + ); self.update_paused(); - init + (init, broadcast_rx, replies_rx) } - pub async fn disconnect(&mut self, conn: ConnectionID) { + pub fn disconnect(&mut self, conn: ConnectionID) { if let Some(cd) = self.connections.get(&conn) { for player in cd.players.clone() { - let _ = self.packet_in_outer(conn, PacketS::Leave { player }).await; + let _ = self.packet_in_outer(conn, PacketS::Leave { player }); } + self.connections.remove(&conn); + self.update_paused(); } - self.connections.remove(&conn); - self.update_paused(); } - pub async fn packet_in_outer( + pub fn packet_in_outer( &mut self, conn: ConnectionID, packet: PacketS, ) -> Result<Vec<PacketC>, TrError> { if let Some(p) = get_packet_player(&packet) - && !self - .connections - .entry(conn) - .or_default() - .players - .contains(&p) + && !self.connections.get(&conn).unwrap().players.contains(&p) { return Err(tre!("s.error.packet_sender_invalid")); } @@ -130,41 +158,42 @@ impl Server { } } PacketS::Ready => { - self.connections.entry(conn).or_default().ready = true; + self.connections.get_mut(&conn).unwrap().ready = true; self.update_paused(); } PacketS::Idle { paused } => { - self.connections.entry(conn).or_default().idle = *paused; + self.connections.get_mut(&conn).unwrap().idle = *paused; self.update_paused(); } PacketS::Leave { player } => { self.connections - .entry(conn) - .or_default() + .get_mut(&conn) + .unwrap() .players .remove(player); } PacketS::Join { .. } => { - if self.connections.entry(conn).or_default().players.len() > 8 { + if self.connections.get_mut(&conn).unwrap().players.len() > 8 { return Err(tre!("s.error.conn_too_many_players")); } } + PacketS::Interact { .. } + | PacketS::Communicate { .. } + | PacketS::Movement { boost: true, .. } => { + self.connections.get_mut(&conn).unwrap().last_player_input = 0.; + } _ => (), } self.packet_in(Some(conn), packet, &mut replies)?; for p in &replies { if let PacketC::Joined { id } = p { - self.connections - .entry(conn) - .or_default() - .players - .insert(*id); + self.connections.get_mut(&conn).unwrap().players.insert(*id); } } if self.count_chefs() == 0 && !self.game.lobby { - self.tx + self.broadcast .send(PacketC::ServerMessage { message: trm!("s.state.abort_no_players"), error: false, |