/* Hurry Curry! - a game about cooking Copyright (C) 2025 Hurry Curry! Contributors This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3 of the License only. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ use anyhow::{Result, bail}; use clap::Parser; use futures_util::{SinkExt, StreamExt}; use hurrycurry_locale::trm; use hurrycurry_protocol::{PacketC, PacketS}; use hurrycurry_server::{ ConnectionID, server::{Server, ServerConfig}, }; use log::{LevelFilter, debug, info, trace, warn}; use std::{ env::var, net::SocketAddr, path::PathBuf, process::exit, str::FromStr, sync::Arc, time::Duration, }; use tokio::{ net::{TcpListener, TcpStream}, spawn, sync::{RwLock, broadcast}, time::{interval, sleep}, }; use tokio_tungstenite::{WebSocketStream, tungstenite::Message}; #[derive(Parser)] pub(crate) struct Args { /// Print server version, then exit #[arg(short, long)] version: bool, /// Set the path to the game data directory, autodetected if ommitted #[arg(short, long)] data_dir: Option, /// Set the address on which the server should listen #[arg(short, long, default_value = "[::]:27032")] listen: SocketAddr, /// Map name to use as lobby #[arg(long, default_value = "lobby")] lobby: String, /// Inactivity timeout in seconds #[arg(long, default_value_t = 60.)] inactivity_timeout: f32, /// Registers this server to the public server registry #[arg(long)] #[cfg(feature = "register")] register: bool, /// Enables the mDNS responder for local network discovery #[cfg(feature = "mdns")] #[arg(long)] mdns: bool, /// Enables automatic gateway port forwarding using UPnP #[cfg(feature = "upnp")] #[arg(long)] upnp: bool, /// Server name #[cfg(any(feature = "register", feature = "mdns"))] #[arg(long, short = 'N', default_value = "A Hurry Curry! Server")] server_name: String, /// Uri for connecting remotely for registry submission #[cfg(feature = "register")] #[arg(long)] register_uri: Option, /// Do not register using IPv4 #[cfg(feature = "register")] #[arg(long)] register_disable_ip4: bool, /// Do not register using IPv6 #[cfg(feature = "register")] #[arg(long)] register_disable_ip6: bool, /// Address of registry server to use when registering #[cfg(feature = "register")] #[arg(long, default_value = "https://registry.hurrycurry.org")] registry_server: String, } fn main() -> Result<()> { env_logger::builder() .filter_level(LevelFilter::Info) .parse_env("LOG") .init(); let args = Args::parse(); let version = env!("CARGO_PKG_VERSION"); let distribution = option_env!("HURRYCURRY_DISTRIBUTION").unwrap_or("unknown"); if args.version { println!("{version} ({distribution})"); exit(0); } info!("Starting Hurry Curry! Server {version} ({distribution})"); let data_path = if let Some(d) = args.data_dir.clone() { d } else { find_data_path()? }; tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? .block_on(run(data_path, args))?; Ok(()) } async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> { let ws_listener = TcpListener::bind(args.listen).await?; info!("Listening for websockets on {}", ws_listener.local_addr()?); let (tx, _) = broadcast::channel::(128 * 1024); let config = ServerConfig { inactivity_timeout: args.inactivity_timeout, lobby: args.lobby, }; let mut state = Server::new(data_path, config, tx)?; state.load(state.index.generate_with_book(&state.config.lobby)?, None); let state = Arc::new(RwLock::new(state)); #[cfg(feature = "register")] if args.register { let r = hurrycurry_server::network::register::Register::new( args.server_name.clone(), args.listen.port(), args.register_uri, args.registry_server, state.clone(), args.register_disable_ip4, args.register_disable_ip6, ); tokio::task::spawn(r.register_loop()); } #[cfg(feature = "upnp")] if args.upnp { tokio::task::spawn(hurrycurry_server::network::upnp::upnp_loop( args.listen.port(), )); } #[cfg(feature = "mdns")] if args.mdns { tokio::task::spawn(hurrycurry_server::network::mdns::mdns_loop( args.server_name.clone(), args.listen, state.clone(), )); } { let state = state.clone(); spawn(async move { let dt = 1. / 50.; let mut tick = interval(Duration::from_secs_f32(dt)); loop { tick.tick().await; if let Err(e) = state.write().await.tick_outer(dt) { warn!("Tick failed: {e}"); } } }); } for id in (1..).map(ConnectionID) { let (sock, addr) = ws_listener.accept().await?; let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { warn!("Invalid ws handshake"); continue; }; info!("{id} Client connected ({addr})"); let state = state.clone(); let (init, mut broadcast_rx, mut replies_rx) = state.write().await.connect(id).await; spawn(async move { for p in init { if let Err(e) = sock .send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&p).unwrap().into(), )) .await { warn!("{id} WebSocket error when sending initial packets: {e}"); return; } } debug!("{id} client has caught up"); loop { tokio::select! { p = replies_rx.recv() => match p { Some(packet) => { if send_packet(id, &mut sock, packet).await { break; }; }, None => { info!("{id} Server closed the connection"); break; } }, p = broadcast_rx.recv() => match p { Ok(packet) => { if send_packet(id, &mut sock, packet).await { break; }; }, Err(_) => { warn!("{id} Broadcast packet channel overflowed"); state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.channel_overflow"))); } }, Some(Ok(message)) = sock.next() => { let packet = match message { Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) { Ok(p) => p, Err(e) => { warn!("{id} Invalid packet: {e}"); state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.invalid_packet", s = e.to_string()))); break; } }, Message::Binary(_packet) => continue, Message::Close(_) => { info!("{id} Client closed the connection"); break }, _ => continue, }; if matches!( packet, PacketS::Movement { .. } | PacketS::ReplayTick { .. } | PacketS::Keepalive ) { trace!("{id} <- {packet:?}"); } else { debug!("{id} <- {packet:?}"); } let packet_out = match state.write().await.packet_in_outer(id, packet) { Ok(packets) => packets, Err(e) => { warn!("{id} Packet error: {e}"); vec![PacketC::ServerMessage { message: e.into(), error: true, }] } }; for packet in packet_out { if send_packet(id, &mut sock, packet).await { break; }; } } }; } while let Ok(packet) = replies_rx.try_recv() { if send_packet(id, &mut sock, packet).await { break; }; } state.write().await.disconnect(id, None); sleep(Duration::from_millis(100)).await; // avoids potential godot bug where disconnect packets are lost }); } Ok(()) } async fn send_packet( id: ConnectionID, sock: &mut WebSocketStream, packet: PacketC, ) -> bool { let message = Message::Text(serde_json::to_string(&packet).unwrap().into()); if let Err(e) = sock.send(message).await { warn!("{id} WebSocket error: {e}"); true } else { false } } fn find_data_path() -> Result { let mut test_order = Vec::new(); if let Ok(path) = var("HURRYCURRY_DATA_PATH") { test_order.push(path); } if let Some(path) = option_env!("HURRYCURRY_DATA_PATH") { test_order.push(path.to_owned()); } #[cfg(debug_assertions)] test_order.push("data".to_string()); #[cfg(windows)] match read_windows_reg_datadir() { Ok(path) => test_order.push(path), Err(e) => warn!("Cannot find read datadir from windows registry: {e}"), }; #[cfg(not(windows))] test_order.extend([ "/usr/local/share/hurrycurry/data".to_string(), "/usr/share/hurrycurry/data".to_string(), "/opt/hurrycurry/data".to_string(), ]); let Some(d) = test_order .iter() .find(|p| PathBuf::from_str(p).unwrap().join("index.yaml").exists()) else { warn!("The following paths were tested without success: {test_order:#?}",); bail!("Could not find the data directory. Use the --data-dir option to specify a path."); }; info!("Selected data dir {d:?}"); Ok(PathBuf::from_str(d)?) } #[cfg(test)] mod test { use hurrycurry_protocol::{Character, PacketS, PlayerClass, PlayerID}; use hurrycurry_server::{ ConnectionID, server::{Server, ServerConfig}, }; use std::future::Future; use tokio::sync::broadcast; fn harness(body: impl Future) { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(body); } fn server() -> Server { Server::new( "../data".into(), ServerConfig::default(), broadcast::channel(1024).0, ) .unwrap() } #[test] fn init_server() { harness(async { server(); }); } #[test] fn full_game() { harness(async { let mut s = server(); s.load(s.index.generate_with_book("junior").unwrap(), None); while s.tick(0.1).is_none() {} }); } #[test] fn map_load() { harness(async { let mut s = server(); s.load(s.index.generate("5star").unwrap(), None); }); } #[test] fn map_load_book() { harness(async { let mut s = server(); s.load(s.index.generate_with_book("lobby").unwrap(), None); }); } #[test] fn tick() { harness(async { let mut s = server(); for _ in 0..100 { s.tick(0.1); } }); } #[test] fn packet_sender_verif() { harness(async { let mut s = server(); for (conn, p) in [ PacketS::Effect { player: PlayerID(0), name: "test".to_owned(), }, PacketS::Leave { player: PlayerID(0), }, PacketS::ReplayTick { dt: 1. }, ] .into_iter() .enumerate() { s.packet_in_outer( ConnectionID(conn.try_into().unwrap()), PacketS::Join { name: format!("test {conn}"), character: Character::default(), class: PlayerClass::Chef, id: None, position: None, }, ) .unwrap(); let x = s.packet_in_outer(ConnectionID(conn.try_into().unwrap()), p); assert!(x.is_ok(), "test {} {:?}", conn, x) } }); } } #[cfg(windows)] fn read_windows_reg_datadir() -> Result { use anyhow::Context; Ok(windows_registry::CURRENT_USER .open("Software\\Hurry Curry!") .context("HKCU\\Hurry Curry!")? .get_string("datadir") .context("datadir subkey")?) }