diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/examples/client.rs | 4 | ||||
-rw-r--r-- | server/protocol/src/lib.rs | 7 | ||||
-rw-r--r-- | server/replaytool/Cargo.toml | 3 | ||||
-rw-r--r-- | server/replaytool/src/main.rs | 135 | ||||
-rw-r--r-- | server/src/game.rs | 1 |
5 files changed, 144 insertions, 6 deletions
diff --git a/server/examples/client.rs b/server/examples/client.rs index 80ba8ffa..70b1bb00 100644 --- a/server/examples/client.rs +++ b/server/examples/client.rs @@ -15,15 +15,13 @@ along with this program. If not, see <https://www.gnu.org/licenses/>. */ +use hurrycurry_protocol::{glam::Vec2, PacketC, PacketS}; use std::{ io::{stdin, BufRead, BufReader, Write}, net::TcpStream, thread, }; -use glam::Vec2; -use hurrycurry_protocol::{PacketC, PacketS}; - fn main() { let mut sock = TcpStream::connect("127.0.0.1:27031").unwrap(); diff --git a/server/protocol/src/lib.rs b/server/protocol/src/lib.rs index 49ff6e1d..36a496a0 100644 --- a/server/protocol/src/lib.rs +++ b/server/protocol/src/lib.rs @@ -79,6 +79,10 @@ pub enum PacketS { ReplaceHand { item: Option<ItemIndex>, }, + /// For use in replay sessions only + ReplayTick { + dt: f64, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -156,6 +160,9 @@ pub enum PacketC { Error { message: String, }, + + /// For use in replay sessions only + ReplayStart, } #[derive(Debug, Clone, Serialize, Deserialize, Copy)] diff --git a/server/replaytool/Cargo.toml b/server/replaytool/Cargo.toml index 16f8377a..e6c1cc23 100644 --- a/server/replaytool/Cargo.toml +++ b/server/replaytool/Cargo.toml @@ -10,8 +10,9 @@ anyhow = "1.0.86" serde = { version = "1.0.204", features = ["derive"] } tokio = { version = "1.38.0", features = ["full"] } serde_json = "1.0.120" -tokio-tungstenite = "0.23.1" +tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] } futures-util = "0.3.30" rand = "0.9.0-alpha.1" +clap = { version = "4.5.8", features = ["derive"] } hurrycurry-protocol = { path = "../protocol" } diff --git a/server/replaytool/src/main.rs b/server/replaytool/src/main.rs index efdc36e0..d2fcf26c 100644 --- a/server/replaytool/src/main.rs +++ b/server/replaytool/src/main.rs @@ -1,5 +1,136 @@ +use anyhow::anyhow; +use clap::Parser; +use futures_util::{SinkExt, StreamExt}; +use hurrycurry_protocol::{PacketC, PacketS}; +use log::{debug, info, warn, LevelFilter}; +use serde::{Deserialize, Serialize}; +use std::{path::PathBuf, time::Instant}; +use tokio::{ + fs::File, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::TcpListener, +}; +use tokio_tungstenite::tungstenite::Message; + +#[derive(Parser)] +enum Args { + Record { url: String, output: PathBuf }, + Replay { input: PathBuf }, +} + +#[derive(Serialize, Deserialize)] +struct Event { + ts: f64, + packet: PacketC, +} #[tokio::main] -async fn main() { - +async fn main() -> anyhow::Result<()> { + env_logger::builder() + .filter_level(LevelFilter::Info) + .parse_env("LOG") + .init(); + + let args = Args::parse(); + + match args { + Args::Record { url, output } => { + let mut file = BufWriter::new(File::create(&output).await?); + info!("connecting to {url:?}..."); + let (mut sock, _) = tokio_tungstenite::connect_async(url).await?; + info!("starting recording."); + let start = Instant::now(); + + while let Some(Ok(message)) = sock.next().await { + match message { + Message::Text(line) => { + let packet: PacketC = match serde_json::from_str(&line) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet: {e}"); + break; + } + }; + debug!("<- {packet:?}"); + file.write_all( + format!( + "{}\n", + serde_json::to_string(&Event { + ts: start.elapsed().as_secs_f64(), + packet: packet + }) + .unwrap() + ) + .as_bytes(), + ) + .await? + } + Message::Close(_) => break, + _ => (), + } + } + } + Args::Replay { input } => { + let ws_listener = TcpListener::bind("0.0.0.0:27032").await?; + info!("listening for websockets on {}", ws_listener.local_addr()?); + + loop { + let mut file = BufReader::new(File::open(&input).await?).lines(); + let mut next_event = + serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; + let mut time = 0.; + + info!("ready"); + let (sock, addr) = ws_listener.accept().await?; + let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { + warn!("invalid ws handshake"); + continue; + }; + info!("{addr} connected via ws"); + + sock.send(tokio_tungstenite::tungstenite::Message::Text( + serde_json::to_string(&PacketC::ReplayStart).unwrap(), + )) + .await?; + while let Some(Ok(message)) = sock.next().await { + match message { + Message::Text(line) => { + let packet: PacketS = match serde_json::from_str(&line) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet: {e}"); + break; + } + }; + debug!("<- {packet:?}"); + + match packet { + PacketS::ReplayTick { dt } => { + time += dt; + while next_event.ts < time { + debug!("<- {:?}", next_event.packet); + sock.send(tokio_tungstenite::tungstenite::Message::Text( + serde_json::to_string(&next_event.packet).unwrap(), + )) + .await?; + + let Some(next) = &file.next_line().await? else { + info!("reached end"); + break; + }; + next_event = serde_json::from_str::<Event>(next)?; + } + } + _ => (), + } + } + Message::Close(_) => break, + _ => (), + } + } + info!("{addr} left"); + } + } + } + Ok(()) } diff --git a/server/src/game.rs b/server/src/game.rs index 74bc9d45..32fb7ac9 100644 --- a/server/src/game.rs +++ b/server/src/game.rs @@ -414,6 +414,7 @@ impl Game { item, }) } + PacketS::ReplayTick { .. } => bail!("packet not supported in this session"), } if self.points != points_before { |