/*
Hurry Curry! - a game about cooking
Copyright (C) 2025 Hurry Curry! Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
use anyhow::{bail, Result};
use clap::Parser;
use futures_util::{SinkExt, StreamExt};
use hurrycurry_protocol::{PacketC, PacketS, BINCODE_CONFIG, VERSION};
use hurrycurry_server::{
data::DATA_DIR,
server::{GameServerExt, Server},
trm, ConnectionID,
};
use log::{debug, info, trace, warn, LevelFilter};
use std::{
env::var,
net::SocketAddr,
path::PathBuf,
process::exit,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
net::TcpListener,
spawn,
sync::{broadcast, mpsc::channel, RwLock},
time::interval,
};
use tokio_tungstenite::tungstenite::Message;
#[derive(Parser)]
pub(crate) struct Args {
/// Print the version, then exit
#[arg(short, long)]
version: bool,
/// Set the path to the game data directory, autodetect if ommitted
#[arg(short, long)]
data_dir: Option,
/// Set the address on which the server should listen
#[arg(short, long, default_value = "[::]:27032")]
listen: SocketAddr,
/// Enables submissions to the public server registry
#[arg(long)]
register: bool,
/// Enables the mDNS responder for local network discovery
#[cfg(feature = "mdns")]
#[arg(long)]
mdns: bool,
// Enables automatic gateway port forwarding using UPnP
#[cfg(feature = "upnp")]
#[arg(long)]
upnp: bool,
/// Server name
#[cfg(any(feature = "register", feature = "mdns"))]
#[arg(long, short = 'N', default_value = "A Hurry Curry! Server")]
server_name: String,
/// Uri for connecting remotely for registry submission
#[cfg(feature = "register")]
#[arg(long)]
register_uri: Option,
/// Do not register using IPv4
#[cfg(feature = "register")]
#[arg(long)]
register_disable_ip4: bool,
/// Do not register using IPv6
#[cfg(feature = "register")]
#[arg(long)]
register_disable_ip6: bool,
}
fn main() -> Result<()> {
env_logger::builder()
.filter_level(LevelFilter::Info)
.parse_env("LOG")
.init();
let args = Args::parse();
let version = env!("CARGO_PKG_VERSION");
let distribution = option_env!("HURRYCURRY_DISTRIBUTION").unwrap_or("unknown");
if args.version {
println!("{version} ({distribution})");
exit(0);
}
info!("Starting Hurry Curry! Server {version} ({distribution})");
let data_dir = if let Some(d) = args.data_dir.clone() {
d
} else {
let mut test_order = Vec::new();
if let Ok(path) = var("HURRYCURRY_DATA_PATH") {
test_order.push(path);
}
if let Some(path) = option_env!("HURRYCURRY_DATA_PATH") {
test_order.push(path.to_owned());
}
#[cfg(debug_assertions)]
test_order.push("data".to_string());
#[cfg(windows)]
match read_windows_reg_datadir() {
Ok(path) => test_order.push(path),
Err(e) => warn!("Cannot find read datadir from windows registry: {e}"),
};
#[cfg(not(windows))]
test_order.extend([
"/usr/local/share/hurrycurry/data".to_string(),
"/usr/share/hurrycurry/data".to_string(),
"/opt/hurrycurry/data".to_string(),
]);
let Some(d) = test_order
.iter()
.find(|p| PathBuf::from_str(p).unwrap().join("index.yaml").exists())
else {
warn!("The following paths were tested without success: {test_order:#?}",);
bail!(
"Could not find the data directory. Use the --data-dir option to specify a path."
);
};
info!("Selected data dir {d:?}");
PathBuf::from_str(d)?
};
*DATA_DIR.lock().unwrap() = Some(data_dir);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(run(args))?;
Ok(())
}
async fn run(args: Args) -> anyhow::Result<()> {
let ws_listener = TcpListener::bind(args.listen).await?;
info!("Listening for websockets on {}", ws_listener.local_addr()?);
let (tx, rx) = broadcast::channel::(128 * 1024);
let mut state = Server::new(tx)?;
state.load(state.index.generate_with_book("lobby")?, None);
let state = Arc::new(RwLock::new(state));
#[cfg(feature = "register")]
if args.register {
let r = hurrycurry_server::network::register::Register::new(
args.server_name.clone(),
args.listen.port(),
args.register_uri,
state.clone(),
args.register_disable_ip4,
args.register_disable_ip6,
);
tokio::task::spawn(r.register_loop());
}
#[cfg(feature = "upnp")]
if args.upnp {
tokio::task::spawn(hurrycurry_server::network::upnp::upnp_loop(
args.listen.port(),
));
}
#[cfg(feature = "mdns")]
if args.mdns {
tokio::task::spawn(hurrycurry_server::network::mdns::mdns_loop(
args.server_name.clone(),
args.listen,
state.clone(),
));
}
{
let state = state.clone();
spawn(async move {
let dt = 1. / 50.;
let mut tick = interval(Duration::from_secs_f32(dt));
loop {
tick.tick().await;
if let Err(e) = state.write().await.tick_outer(dt) {
warn!("Tick failed: {e}");
}
}
});
}
for id in (1..).map(ConnectionID) {
let (sock, addr) = ws_listener.accept().await?;
let Ok(sock) = tokio_tungstenite::accept_async(sock).await else {
warn!("Invalid ws handshake");
continue;
};
info!("{addr} connected via websocket");
let (mut write, mut read) = sock.split();
let state = state.clone();
let mut rx = rx.resubscribe();
let (error_tx, mut error_rx) = channel::(8);
let mut init = state.write().await.game.prime_client();
init.insert(
0,
PacketC::Version {
major: VERSION.0,
minor: VERSION.1,
supports_bincode: true,
},
);
let supports_binary = Arc::new(AtomicBool::new(false));
let supports_binary2 = supports_binary.clone();
spawn(async move {
for p in init {
if let Err(e) = write
.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&p).unwrap().into(),
))
.await
{
warn!("WebSocket error when sending initial packets: {e}");
return;
}
}
loop {
let Some(packet) = tokio::select!(
p = rx.recv() => Some(match p {
Ok(e) => e,
Err(e) => {
rx = rx.resubscribe();
warn!("Client was lagging; resubscribed: {e}");
PacketC::ServerMessage {
message: trm!("s.state.overflow_resubscribe"),
error: true,
}
}
}),
p = error_rx.recv() => p
) else {
info!("Client outbound sender dropped. closing connection");
break;
};
let message = if supports_binary.load(Ordering::Relaxed) {
Message::Binary(
bincode::encode_to_vec(&packet, BINCODE_CONFIG)
.unwrap()
.into(),
)
} else {
Message::Text(serde_json::to_string(&packet).unwrap().into())
};
if let Err(e) = write.send(message).await {
warn!("WebSocket error: {e}");
break;
}
}
});
spawn(async move {
info!("{id:?} connected");
while let Some(Ok(message)) = read.next().await {
let packet = match message {
Message::Text(line) if line.len() < 8196 => match serde_json::from_str(&line) {
Ok(p) => p,
Err(e) => {
warn!("Invalid json packet: {e}");
break;
}
},
Message::Binary(packet) => {
supports_binary2.store(true, Ordering::Relaxed);
match bincode::decode_from_slice::(&packet, BINCODE_CONFIG) {
Ok((p, _size)) => p,
Err(e) => {
warn!("Invalid binary packet: {e}");
break;
}
}
}
Message::Close(_) => break,
_ => continue,
};
if matches!(
packet,
PacketS::Movement { .. } | PacketS::ReplayTick { .. }
) {
trace!("<- {id:?} {packet:?}");
} else {
debug!("<- {id:?} {packet:?}");
}
let packet_out = match state.write().await.packet_in_outer(id, packet).await {
Ok(packets) => packets,
Err(e) => {
warn!("Client error: {e}");
vec![PacketC::ServerMessage {
message: e.into(),
error: true,
}]
}
};
for packet in packet_out {
let _ = error_tx.send(packet).await;
}
}
info!("{id:?} disconnected");
let _ = state.write().await.disconnect(id).await;
});
}
Ok(())
}
#[cfg(test)]
mod test {
use hurrycurry_protocol::{Character, PacketS, PlayerClass, PlayerID};
use hurrycurry_server::{data::DATA_DIR, server::Server, ConnectionID};
use std::future::Future;
use tokio::sync::broadcast;
fn harness(body: impl Future) {
*DATA_DIR.lock().unwrap() = Some("../data".into());
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(body);
}
#[test]
fn run() {
harness(async { Server::new(broadcast::channel(1024).0).unwrap() });
}
#[test]
fn map_load() {
harness(async {
let mut s = Server::new(broadcast::channel(1024).0).unwrap();
s.load(s.index.generate("5star").unwrap(), None);
});
}
#[test]
fn map_load_book() {
harness(async {
let mut s = Server::new(broadcast::channel(1024).0).unwrap();
s.load(s.index.generate_with_book("lobby").unwrap(), None);
});
}
#[test]
fn tick() {
harness(async {
let mut s = Server::new(broadcast::channel(1024).0).unwrap();
for _ in 0..100 {
s.tick(0.1);
}
});
}
#[test]
fn packet_sender_verif() {
harness(async {
let mut s = Server::new(broadcast::channel(1024).0).unwrap();
for (conn, p) in [
PacketS::Effect {
player: PlayerID(0),
name: "test".to_owned(),
},
PacketS::Leave {
player: PlayerID(0),
},
PacketS::ReplayTick { dt: 1. },
]
.into_iter()
.enumerate()
{
s.packet_in_outer(
ConnectionID(conn.try_into().unwrap()),
PacketS::Join {
name: format!("test {conn}"),
character: Character::default(),
class: PlayerClass::Chef,
id: None,
position: None,
},
)
.await
.unwrap();
assert!(
s.packet_in_outer(ConnectionID(conn.try_into().unwrap()), p)
.await
.is_err(),
"test {}",
conn,
)
}
});
}
}
#[cfg(windows)]
fn read_windows_reg_datadir() -> Result {
use anyhow::Context;
Ok(windows_registry::CURRENT_USER
.open("Software\\Hurry Curry!")
.context("HKCU\\Hurry Curry!")?
.get_string("datadir")
.context("datadir subkey")?)
}