/* Hurry Curry! - a game about cooking Copyright 2024 metamuffin This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3 of the License only. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ use anyhow::{anyhow, Result}; use clap::Parser; use futures_util::{SinkExt, StreamExt}; use hurrycurry_protocol::{PacketC, PacketS, BINCODE_CONFIG, VERSION}; use hurrycurry_server::{ data::DATA_DIR, register::Register, server::{GameServerExt, Server}, trm, ConnectionID, }; use log::{debug, info, trace, warn, LevelFilter}; use std::{ net::SocketAddr, path::PathBuf, process::exit, str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, time::Duration, }; use tokio::{ net::TcpListener, spawn, sync::{broadcast, mpsc::channel, RwLock}, time::interval, }; use tokio_tungstenite::tungstenite::Message; #[derive(Parser)] pub(crate) struct Args { /// Print the version, then exit #[arg(short, long)] version: bool, /// Set the path to the game data directory, autodetect if ommitted #[arg(short, long)] data_dir: Option, /// Set the address on which the server should listen #[arg(short, long, default_value = "0.0.0.0:27032")] listen: SocketAddr, /// Enables submissions to the public server registry #[arg(long)] register: bool, /// Enables mDNS discoverability #[arg(long)] discoverable: bool, /// Server name #[arg(long, short = 'N', default_value = "A Hurry Curry! Server")] server_name: String, /// Uri for connecting remotely for registry submission #[arg(long)] register_uri: Option, } fn main() -> Result<()> { env_logger::builder() .filter_level(LevelFilter::Warn) .parse_env("LOG") .init(); let args = Args::parse(); if args.version { println!("{}", env!("CARGO_PKG_VERSION")); exit(0); } let data_dir = if let Some(d) = args.data_dir.clone() { d } else { let d = PathBuf::from_str( [ "data", "/usr/local/share/hurrycurry/data", "/usr/share/hurrycurry/data", "/opt/hurrycurry/data", ] .into_iter() .find(|p| PathBuf::from_str(p).unwrap().join("index.yaml").exists()) .ok_or(anyhow!( "Could not find the data directory. Please run the server next to the `data` directory or specify a path to it via arguments." ))?, )?; info!("Detected data dir to be {d:?}"); d }; *DATA_DIR.lock().unwrap() = Some(data_dir); tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? .block_on(run(args))?; Ok(()) } async fn run(args: Args) -> anyhow::Result<()> { let ws_listener = TcpListener::bind(args.listen).await?; info!("Listening for websockets on {}", ws_listener.local_addr()?); let (tx, rx) = broadcast::channel::(128 * 1024); let mut state = Server::new(tx).await?; state.load(state.index.generate("lobby").await?, None); let state = Arc::new(RwLock::new(state)); if args.register { let r = Register::new( args.server_name.clone(), args.listen.port(), args.register_uri, state.clone(), ); tokio::task::spawn(r.register_loop()); } { let state = state.clone(); spawn(async move { let dt = 1. / 50.; let mut tick = interval(Duration::from_secs_f32(dt)); loop { tick.tick().await; if let Err(e) = state.write().await.tick_outer(dt).await { warn!("Tick failed: {e}"); } } }); } for id in (1..).map(ConnectionID) { let (sock, addr) = ws_listener.accept().await?; let Ok(sock) = tokio_tungstenite::accept_async(sock).await else { warn!("Invalid ws handshake"); continue; }; info!("{addr} connected via websocket"); let (mut write, mut read) = sock.split(); let state = state.clone(); let mut rx = rx.resubscribe(); let (error_tx, mut error_rx) = channel::(8); let mut init = state.write().await.game.prime_client(); init.insert( 0, PacketC::Version { major: VERSION.0, minor: VERSION.1, supports_bincode: true, }, ); let supports_binary = Arc::new(AtomicBool::new(false)); let supports_binary2 = supports_binary.clone(); spawn(async move { for p in init { if let Err(e) = write .send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&p).unwrap(), )) .await { warn!("WebSocket error when sending initial packets: {e}"); return; } } loop { let Some(packet) = tokio::select!( p = rx.recv() => Some(match p { Ok(e) => e, Err(e) => { rx = rx.resubscribe(); warn!("Client was lagging; resubscribed: {e}"); PacketC::ServerMessage { message: trm!("s.state.overflow_resubscribe"), error: true, } } }), p = error_rx.recv() => p ) else { info!("Client outbound sender dropped. closing connection"); break; }; let message = if supports_binary.load(Ordering::Relaxed) { Message::Binary(bincode::encode_to_vec(&packet, BINCODE_CONFIG).unwrap()) } else { Message::Text(serde_json::to_string(&packet).unwrap()) }; if let Err(e) = write.send(message).await { warn!("WebSocket error: {e}"); break; } } }); spawn(async move { info!("{id:?} connected"); while let Some(Ok(message)) = read.next().await { let packet = match message { Message::Text(line) => match serde_json::from_str(&line) { Ok(p) => p, Err(e) => { warn!("Invalid json packet: {e}"); break; } }, Message::Binary(packet) => { supports_binary2.store(true, Ordering::Relaxed); match bincode::decode_from_slice::(&packet, BINCODE_CONFIG) { Ok((p, _size)) => p, Err(e) => { warn!("Invalid binary packet: {e}"); break; } } } Message::Close(_) => break, _ => continue, }; if matches!( packet, PacketS::Movement { .. } | PacketS::ReplayTick { .. } ) { trace!("<- {id:?} {packet:?}"); } else { debug!("<- {id:?} {packet:?}"); } let packet_out = match state.write().await.packet_in_outer(id, packet).await { Ok(packets) => packets, Err(e) => { warn!("Client error: {e:?}"); vec![PacketC::ServerMessage { message: e.into(), error: true, }] } }; for packet in packet_out { let _ = error_tx.send(packet).await; } } info!("{id:?} disconnected"); let _ = state.write().await.disconnect(id).await; }); } Ok(()) } #[cfg(test)] mod test { use hurrycurry_protocol::{PacketS, PlayerID}; use hurrycurry_server::{data::DATA_DIR, server::Server, ConnectionID}; use std::future::Future; use tokio::sync::broadcast; fn harness(body: impl Future) { *DATA_DIR.lock().unwrap() = Some("../data".into()); tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(body); } #[test] fn run() { harness(async { Server::new(broadcast::channel(1024).0).await.unwrap() }); } #[test] fn map_load() { harness(async { let mut s = Server::new(broadcast::channel(1024).0).await.unwrap(); s.load(s.index.generate("lobby").await.unwrap(), None); }); } #[test] fn tick() { harness(async { let mut s = Server::new(broadcast::channel(1024).0).await.unwrap(); for _ in 0..100 { s.tick(0.1); } }); } #[test] fn packet_sender_verif() { harness(async { let mut s = Server::new(broadcast::channel(1024).0).await.unwrap(); for (conn, p) in [ PacketS::Effect { player: PlayerID(0), name: "test".to_owned(), }, PacketS::Leave { player: PlayerID(0), }, PacketS::ReplayTick { dt: 1. }, ] .into_iter() .enumerate() { s.packet_in_outer( ConnectionID(conn.try_into().unwrap()), PacketS::Join { name: format!("test {conn}"), character: 1, id: None, }, ) .await .unwrap(); assert!( s.packet_in_outer(ConnectionID(conn.try_into().unwrap()), p) .await .is_err(), "test {}", conn, ) } }); } }