diff options
author | metamuffin <metamuffin@disroot.org> | 2024-07-08 18:52:39 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-07-08 18:52:39 +0200 |
commit | c987d11babad2d8115e7fae6627669baee5df289 (patch) | |
tree | 4f41e6ad4fba71e7eb9cdbbef9a13877c8935d9e /server/replaytool/src | |
parent | 90f42ec7a3f28fb3a87c233a81e8f65de832983e (diff) | |
download | hurrycurry-c987d11babad2d8115e7fae6627669baee5df289.tar hurrycurry-c987d11babad2d8115e7fae6627669baee5df289.tar.bz2 hurrycurry-c987d11babad2d8115e7fae6627669baee5df289.tar.zst |
add replay system to client and server tool
Diffstat (limited to 'server/replaytool/src')
-rw-r--r-- | server/replaytool/src/main.rs | 135 |
1 files changed, 133 insertions, 2 deletions
diff --git a/server/replaytool/src/main.rs b/server/replaytool/src/main.rs index efdc36e0..d2fcf26c 100644 --- a/server/replaytool/src/main.rs +++ b/server/replaytool/src/main.rs @@ -1,5 +1,136 @@ +use anyhow::anyhow; +use clap::Parser; +use futures_util::{SinkExt, StreamExt}; +use hurrycurry_protocol::{PacketC, PacketS}; +use log::{debug, info, warn, LevelFilter}; +use serde::{Deserialize, Serialize}; +use std::{path::PathBuf, time::Instant}; +use tokio::{ + fs::File, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::TcpListener, +}; +use tokio_tungstenite::tungstenite::Message; + +#[derive(Parser)] +enum Args { + Record { url: String, output: PathBuf }, + Replay { input: PathBuf }, +} + +#[derive(Serialize, Deserialize)] +struct Event { + ts: f64, + packet: PacketC, +} #[tokio::main] -async fn main() { - +async fn main() -> anyhow::Result<()> { + env_logger::builder() + .filter_level(LevelFilter::Info) + .parse_env("LOG") + .init(); + + let args = Args::parse(); + + match args { + Args::Record { url, output } => { + let mut file = BufWriter::new(File::create(&output).await?); + info!("connecting to {url:?}..."); + let (mut sock, _) = tokio_tungstenite::connect_async(url).await?; + info!("starting recording."); + let start = Instant::now(); + + while let Some(Ok(message)) = sock.next().await { + match message { + Message::Text(line) => { + let packet: PacketC = match serde_json::from_str(&line) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet: {e}"); + break; + } + }; + debug!("<- {packet:?}"); + file.write_all( + format!( + "{}\n", + serde_json::to_string(&Event { + ts: start.elapsed().as_secs_f64(), + packet: packet + }) + .unwrap() + ) + .as_bytes(), + ) + .await? + } + Message::Close(_) => break, + _ => (), + } + } + } + Args::Replay { input } => { + let ws_listener = TcpListener::bind("0.0.0.0:27032").await?; + info!("listening for websockets on {}", ws_listener.local_addr()?); + + loop { + let mut file = BufReader::new(File::open(&input).await?).lines(); + let mut next_event = + serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; + let mut time = 0.; + + info!("ready"); + let (sock, addr) = ws_listener.accept().await?; + let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { + warn!("invalid ws handshake"); + continue; + }; + info!("{addr} connected via ws"); + + sock.send(tokio_tungstenite::tungstenite::Message::Text( + serde_json::to_string(&PacketC::ReplayStart).unwrap(), + )) + .await?; + while let Some(Ok(message)) = sock.next().await { + match message { + Message::Text(line) => { + let packet: PacketS = match serde_json::from_str(&line) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet: {e}"); + break; + } + }; + debug!("<- {packet:?}"); + + match packet { + PacketS::ReplayTick { dt } => { + time += dt; + while next_event.ts < time { + debug!("<- {:?}", next_event.packet); + sock.send(tokio_tungstenite::tungstenite::Message::Text( + serde_json::to_string(&next_event.packet).unwrap(), + )) + .await?; + + let Some(next) = &file.next_line().await? else { + info!("reached end"); + break; + }; + next_event = serde_json::from_str::<Event>(next)?; + } + } + _ => (), + } + } + Message::Close(_) => break, + _ => (), + } + } + info!("{addr} left"); + } + } + } + Ok(()) } |