/* Hurry Curry! - a game about cooking Copyright 2024 metamuffin This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3 of the License only. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ use anyhow::anyhow; use async_compression::tokio::{bufread::ZstdDecoder, write::ZstdEncoder}; use clap::Parser; use futures_util::{SinkExt, StreamExt}; use hurrycurry_protocol::{PacketC, PacketS}; use log::{debug, info, warn, LevelFilter}; use serde::{Deserialize, Serialize}; use std::{path::PathBuf, time::Instant}; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, net::TcpListener, }; use tokio_tungstenite::tungstenite::Message; #[derive(Parser)] enum Args { Record { url: String, output: PathBuf }, Replay { input: PathBuf }, } #[derive(Serialize, Deserialize)] struct Event { ts: f64, packet: PacketC, } #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::builder() .filter_level(LevelFilter::Info) .parse_env("LOG") .init(); let args = Args::parse(); match args { Args::Record { url, output } => { let mut file = BufWriter::new(ZstdEncoder::new(File::create(&output).await?)); info!("connecting to {url:?}..."); let (mut sock, _) = tokio_tungstenite::connect_async(url).await?; info!("starting recording."); let start = Instant::now(); while let Some(Ok(message)) = sock.next().await { match message { Message::Text(line) => { let packet: PacketC = match serde_json::from_str(&line) { Ok(p) => p, Err(e) => { warn!("invalid packet: {e}"); break; } }; debug!("<- {packet:?}"); let is_end = matches!(packet, PacketC::SetIngame { state: false, .. }); file.write_all( format!( "{}\n", serde_json::to_string(&Event { ts: start.elapsed().as_secs_f64(), packet: packet }) .unwrap() ) .as_bytes(), ) .await?; if is_end { info!("stopping replay..."); break; } } Message::Close(_) => break, _ => (), } } drop(file); info!("done") } Args::Replay { input } => { let ws_listener = TcpListener::bind("0.0.0.0:27032").await?; info!("listening for websockets on {}", ws_listener.local_addr()?); loop { let mut file = BufReader::new(ZstdDecoder::new(BufReader::new(File::open(&input).await?))) .lines(); let mut next_event = serde_json::from_str::(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; let mut time = 0.; info!("ready"); let (sock, addr) = ws_listener.accept().await?; let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { warn!("invalid ws handshake"); continue; }; info!("{addr} connected via ws"); sock.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&PacketC::ReplayStart).unwrap(), )) .await?; 'outer: while let Some(Ok(message)) = sock.next().await { match message { Message::Text(line) => { let packet: PacketS = match serde_json::from_str(&line) { Ok(p) => p, Err(e) => { warn!("invalid packet: {e}"); break; } }; debug!("<- {packet:?}"); match packet { PacketS::Join { .. } => { sock.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&PacketC::ServerMessage { text: "Replays cannot be joined".to_string(), }) .unwrap(), )) .await?; } PacketS::ReplayTick { dt } => { time += dt; while next_event.ts < time { debug!("<- {:?}", next_event.packet); sock.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&next_event.packet).unwrap(), )) .await?; let Some(next) = &file.next_line().await? else { info!("reached end"); break 'outer; }; next_event = serde_json::from_str::(next)?; } } x => warn!("unhandled client packet: {x:?}"), } } Message::Close(_) => break, _ => (), } } info!("{addr} left"); } } } Ok(()) }