/*
Hurry Curry! - a game about cooking
Copyright (C) 2025 Hurry Curry! Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
use anyhow::{Result, bail};
use clap::Parser;
use futures_util::{SinkExt, StreamExt};
use hurrycurry_locale::trm;
use hurrycurry_protocol::{PacketC, PacketS};
use hurrycurry_server::{
ConnectionID,
server::{Server, ServerConfig},
};
use log::{LevelFilter, debug, info, trace, warn};
use std::{
env::var, net::SocketAddr, path::PathBuf, process::exit, str::FromStr, sync::Arc,
time::Duration,
};
use tokio::{
net::{TcpListener, TcpStream},
spawn,
sync::{RwLock, broadcast},
time::{interval, sleep},
};
use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
#[derive(Parser)]
pub(crate) struct Args {
/// Print server version, then exit
#[arg(short, long)]
version: bool,
/// Set the path to the game data directory, autodetected if ommitted
#[arg(short, long)]
data_dir: Option,
/// Set the address on which the server should listen
#[arg(short, long, default_value = "[::]:27032")]
listen: SocketAddr,
/// Map name to use as lobby
#[arg(long, default_value = "lobby")]
lobby: String,
/// Inactivity timeout in seconds
#[arg(long, default_value_t = 60.)]
inactivity_timeout: f32,
/// Registers this server to the public server registry
#[arg(long)]
#[cfg(feature = "register")]
register: bool,
/// Enables the mDNS responder for local network discovery
#[cfg(feature = "mdns")]
#[arg(long)]
mdns: bool,
/// Enables automatic gateway port forwarding using UPnP
#[cfg(feature = "upnp")]
#[arg(long)]
upnp: bool,
/// Server name
#[cfg(any(feature = "register", feature = "mdns"))]
#[arg(long, short = 'N', default_value = "A Hurry Curry! Server")]
server_name: String,
/// Uri for connecting remotely for registry submission
#[cfg(feature = "register")]
#[arg(long)]
register_uri: Option,
/// Do not register using IPv4
#[cfg(feature = "register")]
#[arg(long)]
register_disable_ip4: bool,
/// Do not register using IPv6
#[cfg(feature = "register")]
#[arg(long)]
register_disable_ip6: bool,
/// Address of registry server to use when registering
#[cfg(feature = "register")]
#[arg(long, default_value = "https://registry.hurrycurry.org")]
registry_server: String,
}
fn main() -> Result<()> {
env_logger::builder()
.filter_level(LevelFilter::Info)
.parse_env("LOG")
.init();
let args = Args::parse();
let version = env!("CARGO_PKG_VERSION");
let distribution = option_env!("HURRYCURRY_DISTRIBUTION").unwrap_or("unknown");
if args.version {
println!("{version} ({distribution})");
exit(0);
}
info!("Starting Hurry Curry! Server {version} ({distribution})");
let data_path = if let Some(d) = args.data_dir.clone() {
d
} else {
find_data_path()?
};
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(run(data_path, args))?;
Ok(())
}
async fn run(data_path: PathBuf, args: Args) -> anyhow::Result<()> {
let ws_listener = TcpListener::bind(args.listen).await?;
info!("Listening for websockets on {}", ws_listener.local_addr()?);
let (tx, _) = broadcast::channel::(128 * 1024);
let config = ServerConfig {
inactivity_timeout: args.inactivity_timeout,
lobby: args.lobby,
};
let mut state = Server::new(data_path, config, tx)?;
state.load(state.index.generate_with_book(&state.config.lobby)?, None);
let state = Arc::new(RwLock::new(state));
#[cfg(feature = "register")]
if args.register {
let r = hurrycurry_server::network::register::Register::new(
args.server_name.clone(),
args.listen.port(),
args.register_uri,
args.registry_server,
state.clone(),
args.register_disable_ip4,
args.register_disable_ip6,
);
tokio::task::spawn(r.register_loop());
}
#[cfg(feature = "upnp")]
if args.upnp {
tokio::task::spawn(hurrycurry_server::network::upnp::upnp_loop(
args.listen.port(),
));
}
#[cfg(feature = "mdns")]
if args.mdns {
tokio::task::spawn(hurrycurry_server::network::mdns::mdns_loop(
args.server_name.clone(),
args.listen,
state.clone(),
));
}
{
let state = state.clone();
spawn(async move {
let dt = 1. / 50.;
let mut tick = interval(Duration::from_secs_f32(dt));
loop {
tick.tick().await;
if let Err(e) = state.write().await.tick_outer(dt) {
warn!("Tick failed: {e}");
}
}
});
}
for id in (1..).map(ConnectionID) {
let (sock, addr) = ws_listener.accept().await?;
let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else {
warn!("Invalid ws handshake");
continue;
};
info!("{id} Client connected ({addr})");
let state = state.clone();
let (init, mut broadcast_rx, mut replies_rx) = state.write().await.connect(id).await;
spawn(async move {
for p in init {
if let Err(e) = sock
.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&p).unwrap().into(),
))
.await
{
warn!("{id} WebSocket error when sending initial packets: {e}");
return;
}
}
debug!("{id} client has caught up");
loop {
tokio::select! {
p = replies_rx.recv() => match p {
Some(packet) => {
if send_packet(id, &mut sock, packet).await { break; };
},
None => {
info!("{id} Server closed the connection");
break;
}
},
p = broadcast_rx.recv() => match p {
Ok(packet) => {
if send_packet(id, &mut sock, packet).await { break; };
},
Err(_) => {
warn!("{id} Broadcast packet channel overflowed");
state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.channel_overflow")));
}
},
Some(Ok(message)) = sock.next() => {
let packet = match message {
Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) {
Ok(p) => p,
Err(e) => {
warn!("{id} Invalid packet: {e}");
state.write().await.disconnect(id, Some(trm!("s.disconnect_reason.invalid_packet", s = e.to_string())));
break;
}
},
Message::Binary(_packet) => continue,
Message::Close(_) => {
info!("{id} Client closed the connection");
break
},
_ => continue,
};
if matches!(
packet,
PacketS::Movement { .. } | PacketS::ReplayTick { .. } | PacketS::Keepalive
) {
trace!("{id} <- {packet:?}");
} else {
debug!("{id} <- {packet:?}");
}
let packet_out = match state.write().await.packet_in_outer(id, packet) {
Ok(packets) => packets,
Err(e) => {
warn!("{id} Packet error: {e}");
vec![PacketC::ServerMessage {
message: e.into(),
error: true,
}]
}
};
for packet in packet_out {
if send_packet(id, &mut sock, packet).await { break; };
}
}
};
}
while let Ok(packet) = replies_rx.try_recv() {
if send_packet(id, &mut sock, packet).await {
break;
};
}
state.write().await.disconnect(id, None);
sleep(Duration::from_millis(100)).await; // avoids potential godot bug where disconnect packets are lost
});
}
Ok(())
}
async fn send_packet(
id: ConnectionID,
sock: &mut WebSocketStream,
packet: PacketC,
) -> bool {
let message = Message::Text(serde_json::to_string(&packet).unwrap().into());
if let Err(e) = sock.send(message).await {
warn!("{id} WebSocket error: {e}");
true
} else {
false
}
}
fn find_data_path() -> Result {
let mut test_order = Vec::new();
if let Ok(path) = var("HURRYCURRY_DATA_PATH") {
test_order.push(path);
}
if let Some(path) = option_env!("HURRYCURRY_DATA_PATH") {
test_order.push(path.to_owned());
}
#[cfg(debug_assertions)]
test_order.push("data".to_string());
#[cfg(windows)]
match read_windows_reg_datadir() {
Ok(path) => test_order.push(path),
Err(e) => warn!("Cannot find read datadir from windows registry: {e}"),
};
#[cfg(not(windows))]
test_order.extend([
"/usr/local/share/hurrycurry/data".to_string(),
"/usr/share/hurrycurry/data".to_string(),
"/opt/hurrycurry/data".to_string(),
]);
let Some(d) = test_order
.iter()
.find(|p| PathBuf::from_str(p).unwrap().join("index.yaml").exists())
else {
warn!("The following paths were tested without success: {test_order:#?}",);
bail!("Could not find the data directory. Use the --data-dir option to specify a path.");
};
info!("Selected data dir {d:?}");
Ok(PathBuf::from_str(d)?)
}
#[cfg(test)]
mod test {
use hurrycurry_protocol::{Character, PacketS, PlayerClass, PlayerID};
use hurrycurry_server::{
ConnectionID,
server::{Server, ServerConfig},
};
use std::future::Future;
use tokio::sync::broadcast;
fn harness(body: impl Future) {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(body);
}
fn server() -> Server {
Server::new(
"../data".into(),
ServerConfig::default(),
broadcast::channel(1024).0,
)
.unwrap()
}
#[test]
fn init_server() {
harness(async {
server();
});
}
#[test]
fn full_game() {
harness(async {
let mut s = server();
s.load(s.index.generate_with_book("junior").unwrap(), None);
while s.tick(0.1).is_none() {}
});
}
#[test]
fn map_load() {
harness(async {
let mut s = server();
s.load(s.index.generate("5star").unwrap(), None);
});
}
#[test]
fn map_load_book() {
harness(async {
let mut s = server();
s.load(s.index.generate_with_book("lobby").unwrap(), None);
});
}
#[test]
fn tick() {
harness(async {
let mut s = server();
for _ in 0..100 {
s.tick(0.1);
}
});
}
#[test]
fn packet_sender_verif() {
harness(async {
let mut s = server();
for (conn, p) in [
PacketS::Effect {
player: PlayerID(0),
name: "test".to_owned(),
},
PacketS::Leave {
player: PlayerID(0),
},
PacketS::ReplayTick { dt: 1. },
]
.into_iter()
.enumerate()
{
s.packet_in_outer(
ConnectionID(conn.try_into().unwrap()),
PacketS::Join {
name: format!("test {conn}"),
character: Character::default(),
class: PlayerClass::Chef,
id: None,
position: None,
},
)
.unwrap();
let x = s.packet_in_outer(ConnectionID(conn.try_into().unwrap()), p);
assert!(x.is_ok(), "test {} {:?}", conn, x)
}
});
}
}
#[cfg(windows)]
fn read_windows_reg_datadir() -> Result {
use anyhow::Context;
Ok(windows_registry::CURRENT_USER
.open("Software\\Hurry Curry!")
.context("HKCU\\Hurry Curry!")?
.get_string("datadir")
.context("datadir subkey")?)
}