/*
Hurry Curry! - a game about cooking
Copyright 2024 metamuffin
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
use anyhow::{anyhow, Result};
use clap::Parser;
use futures_util::{SinkExt, StreamExt};
use hurrycurry_protocol::{PacketC, PacketS, PlayerID, BINCODE_CONFIG, VERSION};
use hurrycurry_server::{data::DATA_DIR, state::State};
use log::{debug, info, trace, warn, LevelFilter};
use std::{
path::PathBuf,
process::exit,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
net::TcpListener,
spawn,
sync::{broadcast, mpsc::channel, RwLock},
time::interval,
};
use tokio_tungstenite::tungstenite::Message;
#[derive(Parser)]
struct Args {
/// Print the version, then exit
#[arg(short, long)]
version: bool,
/// Set the path to the game data directory, autodetect if ommitted
#[arg(short, long)]
data_dir: Option,
}
fn main() -> Result<()> {
env_logger::builder()
.filter_level(LevelFilter::Info)
.parse_env("LOG")
.init();
let args = Args::parse();
if args.version {
println!("{}", env!("CARGO_PKG_VERSION"));
exit(0);
}
let data_dir = if let Some(d) = args.data_dir {
d
} else {
let d = PathBuf::from_str(
[
"data",
"/usr/local/share/hurrycurry/data",
"/usr/share/hurrycurry/data",
"/opt/hurrycurry/data",
]
.into_iter()
.find(|p| PathBuf::from_str(p).unwrap().join("index.yaml").exists())
.ok_or(anyhow!(
"Could not find the data directory. Please run the server next to the `data` directory or specify a path to it via arguments."
))?,
)?;
info!("Detected data dir to be {d:?}");
d
};
*DATA_DIR.lock().unwrap() = Some(data_dir);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(run())?;
Ok(())
}
async fn run() -> anyhow::Result<()> {
let ws_listener = TcpListener::bind("0.0.0.0:27032").await?;
info!("Listening for websockets on {}", ws_listener.local_addr()?);
let (tx, rx) = broadcast::channel::(128 * 1024);
let state = Arc::new(RwLock::new(State::new(tx).await?));
{
let state = state.clone();
spawn(async move {
let dt = 1. / 25.;
let mut tick = interval(Duration::from_secs_f32(dt));
loop {
tick.tick().await;
if let Err(e) = state.write().await.tick(dt).await {
warn!("tick failed: {e}");
}
}
});
}
for id in (1..).map(PlayerID) {
let (sock, addr) = ws_listener.accept().await?;
let Ok(sock) = tokio_tungstenite::accept_async(sock).await else {
warn!("invalid ws handshake");
continue;
};
info!("{addr} connected via websocket");
let (mut write, mut read) = sock.split();
let state = state.clone();
let mut rx = rx.resubscribe();
let (error_tx, mut error_rx) = channel::(8);
let mut init = state.write().await.game.prime_client();
init.insert(
0,
PacketC::Version {
major: VERSION.0,
minor: VERSION.1,
supports_bincode: true,
},
);
init.insert(1, PacketC::Init { id });
let supports_binary = Arc::new(AtomicBool::new(false));
let supports_binary2 = supports_binary.clone();
spawn(async move {
for p in init {
if let Err(e) = write
.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&p).unwrap(),
))
.await
{
warn!("ws error on init: {e}");
return;
}
}
loop {
let Some(packet) = tokio::select!(
p = rx.recv() => Some(match p {
Ok(e) => e,
Err(e) => {
rx = rx.resubscribe();
warn!("client was lagging. resubscribed: {e}");
PacketC::ServerMessage {
text: "Lagging behind. Some clientbound packets were dropped."
.to_string(),
}
}
}),
p = error_rx.recv() => p
) else {
info!("client outbound sender dropped. closing connection");
break;
};
let message = if supports_binary.load(Ordering::Relaxed) {
Message::Binary(bincode::encode_to_vec(&packet, BINCODE_CONFIG).unwrap())
} else {
Message::Text(serde_json::to_string(&packet).unwrap())
};
if let Err(e) = write.send(message).await {
warn!("ws error: {e}");
break;
}
}
});
spawn(async move {
info!("{id:?} joined");
while let Some(Ok(message)) = read.next().await {
let packet = match message {
Message::Text(line) => match serde_json::from_str(&line) {
Ok(p) => p,
Err(e) => {
warn!("invalid json packet: {e}");
break;
}
},
Message::Binary(packet) => {
supports_binary2.store(true, Ordering::Relaxed);
match bincode::decode_from_slice::(&packet, BINCODE_CONFIG) {
Ok((p, _size)) => p,
Err(e) => {
warn!("invalid binary packet: {e}");
break;
}
}
}
Message::Close(_) => break,
_ => continue,
};
if matches!(
packet,
PacketS::Position { .. } | PacketS::ReplayTick { .. }
) {
trace!("<- {id:?} {packet:?}");
} else {
debug!("<- {id:?} {packet:?}");
}
let packet_out = match state.write().await.packet_in(id, packet).await {
Ok(packets) => packets,
Err(e) => {
warn!("client error: {e}");
vec![PacketC::Error {
message: format!("{e}"),
}]
}
};
for packet in packet_out {
let _ = error_tx.send(packet).await;
}
}
info!("{id:?} left");
state.write().await.packet_in(id, PacketS::Leave).await.ok();
});
}
Ok(())
}