aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server/src/commands.rs10
-rw-r--r--server/src/main.rs168
-rw-r--r--server/src/server.rs13
-rw-r--r--server/src/state.rs83
4 files changed, 147 insertions, 127 deletions
diff --git a/server/src/commands.rs b/server/src/commands.rs
index f8016c9f..a59d93a0 100644
--- a/server/src/commands.rs
+++ b/server/src/commands.rs
@@ -151,7 +151,7 @@ impl Server {
skip_announce,
} => {
if !self.game.lobby {
- self.tx
+ self.broadcast
.send(PacketC::ServerMessage {
message: trm!(
"s.state.game_aborted",
@@ -177,7 +177,7 @@ impl Server {
}
}
Command::End => {
- self.tx
+ self.broadcast
.send(PacketC::ServerMessage {
message: trm!(
"s.state.game_aborted",
@@ -248,7 +248,7 @@ impl Server {
error: false,
}),
Command::Effect { name } => {
- self.tx.send(PacketC::Effect { name, player }).ok();
+ self.broadcast.send(PacketC::Effect { name, player }).ok();
}
Command::Item { name } => {
let item = self
@@ -256,7 +256,7 @@ impl Server {
.data
.get_item_by_name(&name)
.ok_or(tre!("s.error.item_not_found", s = name))?;
- self.tx
+ self.broadcast
.send(PacketC::Communicate {
player,
message: Some(Message::Item(item)),
@@ -270,7 +270,7 @@ impl Server {
.data
.get_tile_by_name(&name)
.ok_or(tre!("s.error.no_tile", s = name))?;
- self.tx
+ self.broadcast
.send(PacketC::Communicate {
player,
message: Some(Message::Tile(tile)),
diff --git a/server/src/main.rs b/server/src/main.rs
index e87d52b6..0236ebe4 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -27,12 +27,12 @@ use std::{
time::Duration,
};
use tokio::{
- net::TcpListener,
+ net::{TcpListener, TcpStream},
spawn,
- sync::{RwLock, broadcast, mpsc::channel},
+ sync::{RwLock, broadcast},
time::interval,
};
-use tokio_tungstenite::tungstenite::Message;
+use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
#[derive(Parser)]
pub(crate) struct Args {
@@ -150,7 +150,7 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
let ws_listener = TcpListener::bind(args.listen).await?;
info!("Listening for websockets on {}", ws_listener.local_addr()?);
- let (tx, rx) = broadcast::channel::<PacketC>(128 * 1024);
+ let (tx, _) = broadcast::channel::<PacketC>(128 * 1024);
let mut state = Server::new(data_path, tx)?;
state.load(state.index.generate_with_book("lobby")?, None);
@@ -200,25 +200,18 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
for id in (1..).map(ConnectionID) {
let (sock, addr) = ws_listener.accept().await?;
- let Ok(sock) = tokio_tungstenite::accept_async(sock).await else {
+ let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else {
warn!("Invalid ws handshake");
continue;
};
info!("{id} Client connected ({addr})");
- let (mut write, mut read) = sock.split();
let state = state.clone();
- let mut rx = rx.resubscribe();
- let (error_tx, mut error_rx) = channel::<PacketC>(8);
-
- let init = state.write().await.connect(id).await;
-
- // let supports_binary = Arc::new(AtomicBool::new(false));
- // let supports_binary2 = supports_binary.clone();
+ let (init, mut broadcast_rx, mut replies_rx) = state.write().await.connect(id).await;
spawn(async move {
for p in init {
- if let Err(e) = write
+ if let Err(e) = sock
.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&p).unwrap().into(),
))
@@ -228,94 +221,93 @@ async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
return;
}
}
+ debug!("{id} client has caught up");
loop {
- let Some(packet) = tokio::select!(
- p = rx.recv() => Some(match p {
- Ok(e) => e,
+ tokio::select! {
+ p = replies_rx.recv() => match p {
+ Some(packet) => {
+ if send_packet(id, &mut sock, packet).await { break; };
+ },
+ None => {
+ info!("{id} Server closed the connection");
+ break;
+ }
+ },
+ p = broadcast_rx.recv() => match p {
+ Ok(packet) => {
+ if send_packet(id, &mut sock, packet).await { break; };
+ },
Err(e) => {
- rx = rx.resubscribe();
+ broadcast_rx = broadcast_rx.resubscribe();
warn!("{id} Client was lagging; resubscribed: {e}");
- PacketC::ServerMessage {
+ let packet = PacketC::ServerMessage {
message: trm!("s.state.overflow_resubscribe"),
error: true,
- }
- }
- }),
- p = error_rx.recv() => p
- ) else {
- info!("{id} Client outbound sender dropped. closing connection");
- break;
- };
- // let message = if supports_binary.load(Ordering::Relaxed) {
- // Message::Binary(
- // bincode::encode_to_vec(&packet, BINCODE_CONFIG)
- // .unwrap()
- // .into(),
- // )
- // } else {
- let message = Message::Text(serde_json::to_string(&packet).unwrap().into());
- // };
- if let Err(e) = write.send(message).await {
- warn!("{id} WebSocket error: {e}");
- break;
- }
- }
- });
-
- spawn(async move {
- while let Some(Ok(message)) = read.next().await {
- let packet = match message {
- Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) {
- Ok(p) => p,
- Err(e) => {
- warn!("{id} Invalid json packet: {e}");
- break;
+ };
+ if send_packet(id, &mut sock, packet).await { break; };
}
},
- Message::Binary(_packet) => {
- // supports_binary2.store(true, Ordering::Relaxed);
- // match bincode::decode_from_slice::<PacketS, _>(&packet, BINCODE_CONFIG) {
- // Ok((p, _size)) => p,
- // Err(e) => {
- // warn!("Invalid binary packet: {e}");
- // break;
- // }
- // }
- continue;
- }
- Message::Close(_) => break,
- _ => continue,
- };
+ Some(Ok(message)) = sock.next() => {
+ let packet = match message {
+ Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) {
+ Ok(p) => p,
+ Err(e) => {
+ warn!("{id} Invalid json packet: {e}");
+ break;
+ }
+ },
+ Message::Binary(_packet) => continue,
+ Message::Close(_) => {
+ info!("{id} Client closed the connection");
+ break
+ },
+ _ => continue,
+ };
- if matches!(
- packet,
- PacketS::Movement { .. } | PacketS::ReplayTick { .. }
- ) {
- trace!("{id} <- {packet:?}");
- } else {
- debug!("{id} <- {packet:?}");
- }
- let packet_out = match state.write().await.packet_in_outer(id, packet).await {
- Ok(packets) => packets,
- Err(e) => {
- warn!("Client error: {e}");
- vec![PacketC::ServerMessage {
- message: e.into(),
- error: true,
- }]
- }
+ if matches!(
+ packet,
+ PacketS::Movement { .. } | PacketS::ReplayTick { .. }
+ ) {
+ trace!("{id} <- {packet:?}");
+ } else {
+ debug!("{id} <- {packet:?}");
+ }
+ let packet_out = match state.write().await.packet_in_outer(id, packet) {
+ Ok(packets) => packets,
+ Err(e) => {
+ warn!("Client error: {e}");
+ vec![PacketC::ServerMessage {
+ message: e.into(),
+ error: true,
+ }]
+ }
+ };
+ for packet in packet_out {
+ if send_packet(id, &mut sock, packet).await { break; };
+ }
+ }
};
- for packet in packet_out {
- let _ = error_tx.send(packet).await;
- }
}
- info!("{id} Client disconnected");
- let _ = state.write().await.disconnect(id).await;
+ let _ = state.write().await.disconnect(id);
});
}
Ok(())
}
+async fn send_packet(
+ id: ConnectionID,
+ sock: &mut WebSocketStream<TcpStream>,
+ packet: PacketC,
+) -> bool {
+ let message = Message::Text(serde_json::to_string(&packet).unwrap().into());
+ if let Err(e) = sock.send(message).await {
+ warn!("{id} WebSocket error: {e}");
+ true
+ } else {
+ false
+ }
+}
+
#[cfg(test)]
mod test {
use hurrycurry_protocol::{Character, PacketS, PlayerClass, PlayerID};
@@ -386,11 +378,9 @@ mod test {
position: None,
},
)
- .await
.unwrap();
assert!(
s.packet_in_outer(ConnectionID(conn.try_into().unwrap()), p)
- .await
.is_err(),
"test {}",
conn,
diff --git a/server/src/server.rs b/server/src/server.rs
index dc63f1b5..267b3062 100644
--- a/server/src/server.rs
+++ b/server/src/server.rs
@@ -42,13 +42,15 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
-use tokio::sync::broadcast::Sender;
+use tokio::sync::{broadcast, mpsc};
-#[derive(Debug, Default)]
+#[derive(Debug)]
pub struct ConnectionData {
pub players: HashSet<PlayerID>,
pub idle: bool,
pub ready: bool,
+ pub last_player_input: f32,
+ pub replies: mpsc::Sender<PacketC>,
}
pub enum AnnounceState {
@@ -59,7 +61,7 @@ pub enum AnnounceState {
pub struct Server {
pub tick_perf: (Duration, usize),
- pub tx: Sender<PacketC>,
+ pub broadcast: broadcast::Sender<PacketC>,
pub connections: HashMap<ConnectionID, ConnectionData>,
pub paused: bool,
pub announce_state: AnnounceState,
@@ -130,7 +132,6 @@ impl GameServerExt for Game {
timer: Option<Duration>,
packet_out: &mut VecDeque<PacketC>,
) {
- // TODO cleanup
let players = self
.players
.iter()
@@ -312,12 +313,12 @@ impl GameServerExt for Game {
}
impl Server {
- pub fn new(data_path: PathBuf, tx: Sender<PacketC>) -> Result<Self> {
+ pub fn new(data_path: PathBuf, tx: broadcast::Sender<PacketC>) -> Result<Self> {
Ok(Self {
game: Game::default(),
tick_perf: (Duration::ZERO, 0),
index: DataIndex::new(data_path).context("Failed to load data index")?,
- tx,
+ broadcast: tx,
announce_state: AnnounceState::Done,
packet_out: VecDeque::new(),
connections: HashMap::new(),
diff --git a/server/src/state.rs b/server/src/state.rs
index 001e2bf0..a42c1663 100644
--- a/server/src/state.rs
+++ b/server/src/state.rs
@@ -23,10 +23,25 @@ use anyhow::Result;
use hurrycurry_locale::{TrError, tre, trm};
use hurrycurry_protocol::{Menu, Message, PacketC, PacketS, PlayerID, VERSION};
use log::{debug, info, trace};
-use std::time::{Duration, Instant};
+use std::{
+ collections::HashSet,
+ time::{Duration, Instant},
+};
+use tokio::sync::{broadcast, mpsc};
impl Server {
pub fn tick_outer(&mut self, dt: f32) -> anyhow::Result<()> {
+ let mut idle_kick = Vec::new();
+ for (cid, conn) in &mut self.connections {
+ conn.last_player_input += dt;
+ if conn.last_player_input > 60. {
+ idle_kick.push(*cid);
+ }
+ }
+ for cid in idle_kick {
+ self.disconnect(cid);
+ }
+
if !self.paused {
let start = Instant::now();
let r = self.tick(dt);
@@ -66,13 +81,22 @@ impl Server {
} else {
debug!("-> {p:?}");
}
- self.tx.send(p).unwrap();
+ let _ = self.broadcast.send(p);
}
Ok(())
}
- pub async fn connect(&mut self, id: ConnectionID) -> Vec<PacketC> {
+ pub async fn connect(
+ &mut self,
+ id: ConnectionID,
+ ) -> (
+ Vec<PacketC>,
+ broadcast::Receiver<PacketC>,
+ mpsc::Receiver<PacketC>,
+ ) {
let mut init = self.game.prime_client();
+ let (replies_tx, replies_rx) = mpsc::channel(1024);
+ let broadcast_rx = self.broadcast.subscribe();
init.insert(
0,
PacketC::Version {
@@ -81,33 +105,37 @@ impl Server {
supports_bincode: true,
},
);
- self.connections.insert(id, ConnectionData::default());
+ self.connections.insert(
+ id,
+ ConnectionData {
+ idle: false,
+ ready: false,
+ players: HashSet::new(),
+ last_player_input: 0.,
+ replies: replies_tx,
+ },
+ );
self.update_paused();
- init
+ (init, broadcast_rx, replies_rx)
}
- pub async fn disconnect(&mut self, conn: ConnectionID) {
+ pub fn disconnect(&mut self, conn: ConnectionID) {
if let Some(cd) = self.connections.get(&conn) {
for player in cd.players.clone() {
- let _ = self.packet_in_outer(conn, PacketS::Leave { player }).await;
+ let _ = self.packet_in_outer(conn, PacketS::Leave { player });
}
+ self.connections.remove(&conn);
+ self.update_paused();
}
- self.connections.remove(&conn);
- self.update_paused();
}
- pub async fn packet_in_outer(
+ pub fn packet_in_outer(
&mut self,
conn: ConnectionID,
packet: PacketS,
) -> Result<Vec<PacketC>, TrError> {
if let Some(p) = get_packet_player(&packet)
- && !self
- .connections
- .entry(conn)
- .or_default()
- .players
- .contains(&p)
+ && !self.connections.get(&conn).unwrap().players.contains(&p)
{
return Err(tre!("s.error.packet_sender_invalid"));
}
@@ -130,41 +158,42 @@ impl Server {
}
}
PacketS::Ready => {
- self.connections.entry(conn).or_default().ready = true;
+ self.connections.get_mut(&conn).unwrap().ready = true;
self.update_paused();
}
PacketS::Idle { paused } => {
- self.connections.entry(conn).or_default().idle = *paused;
+ self.connections.get_mut(&conn).unwrap().idle = *paused;
self.update_paused();
}
PacketS::Leave { player } => {
self.connections
- .entry(conn)
- .or_default()
+ .get_mut(&conn)
+ .unwrap()
.players
.remove(player);
}
PacketS::Join { .. } => {
- if self.connections.entry(conn).or_default().players.len() > 8 {
+ if self.connections.get_mut(&conn).unwrap().players.len() > 8 {
return Err(tre!("s.error.conn_too_many_players"));
}
}
+ PacketS::Interact { .. }
+ | PacketS::Communicate { .. }
+ | PacketS::Movement { boost: true, .. } => {
+ self.connections.get_mut(&conn).unwrap().last_player_input = 0.;
+ }
_ => (),
}
self.packet_in(Some(conn), packet, &mut replies)?;
for p in &replies {
if let PacketC::Joined { id } = p {
- self.connections
- .entry(conn)
- .or_default()
- .players
- .insert(*id);
+ self.connections.get_mut(&conn).unwrap().players.insert(*id);
}
}
if self.count_chefs() == 0 && !self.game.lobby {
- self.tx
+ self.broadcast
.send(PacketC::ServerMessage {
message: trm!("s.state.abort_no_players"),
error: false,