diff options
Diffstat (limited to 'server')
| -rw-r--r-- | server/examples/client.rs | 4 | ||||
| -rw-r--r-- | server/protocol/src/lib.rs | 7 | ||||
| -rw-r--r-- | server/replaytool/Cargo.toml | 3 | ||||
| -rw-r--r-- | server/replaytool/src/main.rs | 135 | ||||
| -rw-r--r-- | server/src/game.rs | 1 | 
5 files changed, 144 insertions, 6 deletions
| diff --git a/server/examples/client.rs b/server/examples/client.rs index 80ba8ffa..70b1bb00 100644 --- a/server/examples/client.rs +++ b/server/examples/client.rs @@ -15,15 +15,13 @@      along with this program.  If not, see <https://www.gnu.org/licenses/>.  */ +use hurrycurry_protocol::{glam::Vec2, PacketC, PacketS};  use std::{      io::{stdin, BufRead, BufReader, Write},      net::TcpStream,      thread,  }; -use glam::Vec2; -use hurrycurry_protocol::{PacketC, PacketS}; -  fn main() {      let mut sock = TcpStream::connect("127.0.0.1:27031").unwrap(); diff --git a/server/protocol/src/lib.rs b/server/protocol/src/lib.rs index 49ff6e1d..36a496a0 100644 --- a/server/protocol/src/lib.rs +++ b/server/protocol/src/lib.rs @@ -79,6 +79,10 @@ pub enum PacketS {      ReplaceHand {          item: Option<ItemIndex>,      }, +    /// For use in replay sessions only +    ReplayTick { +        dt: f64, +    },  }  #[derive(Debug, Clone, Serialize, Deserialize)] @@ -156,6 +160,9 @@ pub enum PacketC {      Error {          message: String,      }, + +    /// For use in replay sessions only +    ReplayStart,  }  #[derive(Debug, Clone, Serialize, Deserialize, Copy)] diff --git a/server/replaytool/Cargo.toml b/server/replaytool/Cargo.toml index 16f8377a..e6c1cc23 100644 --- a/server/replaytool/Cargo.toml +++ b/server/replaytool/Cargo.toml @@ -10,8 +10,9 @@ anyhow = "1.0.86"  serde = { version = "1.0.204", features = ["derive"] }  tokio = { version = "1.38.0", features = ["full"] }  serde_json = "1.0.120" -tokio-tungstenite = "0.23.1" +tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] }  futures-util = "0.3.30"  rand = "0.9.0-alpha.1" +clap = { version = "4.5.8", features = ["derive"] }  hurrycurry-protocol = { path = "../protocol" } diff --git a/server/replaytool/src/main.rs b/server/replaytool/src/main.rs index efdc36e0..d2fcf26c 100644 --- a/server/replaytool/src/main.rs +++ b/server/replaytool/src/main.rs @@ -1,5 +1,136 @@ +use anyhow::anyhow; +use clap::Parser; +use futures_util::{SinkExt, StreamExt}; +use hurrycurry_protocol::{PacketC, PacketS}; +use log::{debug, info, warn, LevelFilter}; +use serde::{Deserialize, Serialize}; +use std::{path::PathBuf, time::Instant}; +use tokio::{ +    fs::File, +    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, +    net::TcpListener, +}; +use tokio_tungstenite::tungstenite::Message; + +#[derive(Parser)] +enum Args { +    Record { url: String, output: PathBuf }, +    Replay { input: PathBuf }, +} + +#[derive(Serialize, Deserialize)] +struct Event { +    ts: f64, +    packet: PacketC, +}  #[tokio::main] -async fn main() { -     +async fn main() -> anyhow::Result<()> { +    env_logger::builder() +        .filter_level(LevelFilter::Info) +        .parse_env("LOG") +        .init(); + +    let args = Args::parse(); + +    match args { +        Args::Record { url, output } => { +            let mut file = BufWriter::new(File::create(&output).await?); +            info!("connecting to {url:?}..."); +            let (mut sock, _) = tokio_tungstenite::connect_async(url).await?; +            info!("starting recording."); +            let start = Instant::now(); + +            while let Some(Ok(message)) = sock.next().await { +                match message { +                    Message::Text(line) => { +                        let packet: PacketC = match serde_json::from_str(&line) { +                            Ok(p) => p, +                            Err(e) => { +                                warn!("invalid packet: {e}"); +                                break; +                            } +                        }; +                        debug!("<- {packet:?}"); +                        file.write_all( +                            format!( +                                "{}\n", +                                serde_json::to_string(&Event { +                                    ts: start.elapsed().as_secs_f64(), +                                    packet: packet +                                }) +                                .unwrap() +                            ) +                            .as_bytes(), +                        ) +                        .await? +                    } +                    Message::Close(_) => break, +                    _ => (), +                } +            } +        } +        Args::Replay { input } => { +            let ws_listener = TcpListener::bind("0.0.0.0:27032").await?; +            info!("listening for websockets on {}", ws_listener.local_addr()?); + +            loop { +                let mut file = BufReader::new(File::open(&input).await?).lines(); +                let mut next_event = +                    serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; +                let mut time = 0.; + +                info!("ready"); +                let (sock, addr) = ws_listener.accept().await?; +                let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { +                    warn!("invalid ws handshake"); +                    continue; +                }; +                info!("{addr} connected via ws"); + +                sock.send(tokio_tungstenite::tungstenite::Message::Text( +                    serde_json::to_string(&PacketC::ReplayStart).unwrap(), +                )) +                .await?; +                while let Some(Ok(message)) = sock.next().await { +                    match message { +                        Message::Text(line) => { +                            let packet: PacketS = match serde_json::from_str(&line) { +                                Ok(p) => p, +                                Err(e) => { +                                    warn!("invalid packet: {e}"); +                                    break; +                                } +                            }; +                            debug!("<- {packet:?}"); + +                            match packet { +                                PacketS::ReplayTick { dt } => { +                                    time += dt; +                                    while next_event.ts < time { +                                        debug!("<- {:?}", next_event.packet); +                                        sock.send(tokio_tungstenite::tungstenite::Message::Text( +                                            serde_json::to_string(&next_event.packet).unwrap(), +                                        )) +                                        .await?; + +                                        let Some(next) = &file.next_line().await? else { +                                            info!("reached end"); +                                            break; +                                        }; +                                        next_event = serde_json::from_str::<Event>(next)?; +                                    } +                                } +                                _ => (), +                            } +                        } +                        Message::Close(_) => break, +                        _ => (), +                    } +                } +                info!("{addr} left"); +            } +        } +    } +    Ok(())  } diff --git a/server/src/game.rs b/server/src/game.rs index 74bc9d45..32fb7ac9 100644 --- a/server/src/game.rs +++ b/server/src/game.rs @@ -414,6 +414,7 @@ impl Game {                      item,                  })              } +            PacketS::ReplayTick { .. } => bail!("packet not supported in this session"),          }          if self.points != points_before { | 
