diff options
| author | metamuffin <metamuffin@disroot.org> | 2025-09-11 22:12:26 +0200 | 
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2025-09-11 22:12:26 +0200 | 
| commit | 2b1d79588241fc9afa3ef9ddc3b20b26b8ce2433 (patch) | |
| tree | 538bc08b09ac0bb2fc8298d4c8b2070b8ab2f434 /server/replaytool/src | |
| parent | 64075a8ca3225b7b3bd5e7e68e222ef394e501a0 (diff) | |
| download | hurrycurry-2b1d79588241fc9afa3ef9ddc3b20b26b8ce2433.tar hurrycurry-2b1d79588241fc9afa3ef9ddc3b20b26b8ce2433.tar.bz2 hurrycurry-2b1d79588241fc9afa3ef9ddc3b20b26b8ce2433.tar.zst | |
Split replaytool into multiple files
Diffstat (limited to 'server/replaytool/src')
| -rw-r--r-- | server/replaytool/src/main.rs | 173 | ||||
| -rw-r--r-- | server/replaytool/src/record.rs | 81 | ||||
| -rw-r--r-- | server/replaytool/src/replay.rs | 127 | 
3 files changed, 221 insertions, 160 deletions
| diff --git a/server/replaytool/src/main.rs b/server/replaytool/src/main.rs index 337152ec..5ff821cc 100644 --- a/server/replaytool/src/main.rs +++ b/server/replaytool/src/main.rs @@ -15,24 +15,21 @@      along with this program.  If not, see <https://www.gnu.org/licenses/>.  */ -use anyhow::{anyhow, Context}; -use async_compression::tokio::{bufread::ZstdDecoder, write::ZstdEncoder}; + +pub mod record; +pub mod replay; +  use clap::Parser; -use futures_util::{SinkExt, StreamExt}; -use hurrycurry_protocol::{Message, PacketC, PacketS}; -use log::{debug, info, warn, LevelFilter}; +use hurrycurry_protocol::PacketC; +use log::{info, warn, LevelFilter};  use serde::{Deserialize, Serialize};  use std::{ -    path::{Path, PathBuf}, -    time::{Duration, Instant, SystemTime}, -}; -use tokio::{ -    fs::File, -    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, -    net::TcpListener, -    time::sleep, +    path::PathBuf, +    time::{Duration, SystemTime},  }; -use tokio_tungstenite::tungstenite; +use tokio::{net::TcpListener, time::sleep}; + +use crate::{record::record, replay::replay};  #[derive(Parser)]  enum Args { @@ -84,7 +81,7 @@ async fn main() -> anyhow::Result<()> {              } else {                  output.clone()              }; -            if let Err(e) = do_record(&out, &url).await { +            if let Err(e) = record(&out, &url).await {                  warn!("recording failed: {e}");                  sleep(Duration::from_secs(1)).await;              } @@ -99,153 +96,9 @@ async fn main() -> anyhow::Result<()> {              info!("listening for websockets on {}", ws_listener.local_addr()?);              loop { -                let mut file = -                    BufReader::new(ZstdDecoder::new(BufReader::new(File::open(&input).await?))) -                        .lines(); - -                let mut next_event = -                    serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; -                let mut time = 0.; -                let mut paused = false; - -                info!("ready"); -                let (sock, addr) = ws_listener.accept().await?; -                let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { -                    warn!("invalid ws handshake"); -                    continue; -                }; -                info!("{addr} connected via ws"); - -                sock.send(tokio_tungstenite::tungstenite::Message::Text( -                    serde_json::to_string(&PacketC::ReplayStart).unwrap().into(), -                )) -                .await?; -                while let Some(Ok(message)) = sock.next().await { -                    let mut send = async |p: PacketC| { -                        sock.send(tokio_tungstenite::tungstenite::Message::Text( -                            serde_json::to_string(&p).unwrap().into(), -                        )) -                        .await -                    }; -                    match message { -                        tungstenite::Message::Text(line) => { -                            let packet: PacketS = match serde_json::from_str(&line) { -                                Ok(p) => p, -                                Err(e) => { -                                    warn!("invalid packet: {e}"); -                                    break; -                                } -                            }; -                            debug!("<- {packet:?}"); - -                            match packet { -                                PacketS::Join { .. } => { -                                    send(PacketC::ServerMessage { -                                        message: Message::Translation { -                                            id: "s.replay.cannot_join".to_owned(), -                                            params: vec![], -                                        }, -                                        error: true, -                                    }) -                                    .await?; -                                } -                                PacketS::Idle { paused: pause } => { -                                    // Cannot display server hint like in main server because cant address them to spectators. -                                    paused = pause; -                                    send(PacketC::Pause { state: pause }).await? -                                } -                                PacketS::ReplayTick { dt } => { -                                    if paused { -                                        continue; -                                    } -                                    time += dt; -                                    while next_event.ts < time { -                                        debug!("<- {:?}", next_event.packet); -                                        if !matches!( -                                            next_event.packet, -                                            PacketC::SetIngame { state: false, .. } -                                        ) { -                                            send(next_event.packet).await?; -                                        } - -                                        if let Some(next) = &file.next_line().await? { -                                            next_event = serde_json::from_str::<Event>(next)?; -                                        } else { -                                            info!("reached end"); -                                            send(PacketC::ServerMessage { -                                                message: Message::Text("End of Replay".to_string()), -                                                error: false, -                                            }) -                                            .await?; -                                            send(PacketC::ReplayStop).await?; -                                            next_event = Event { -                                                ts: f64::INFINITY, -                                                packet: PacketC::FlushMap, -                                            }; -                                            continue; -                                        }; -                                    } -                                } -                                x => warn!("unhandled client packet: {x:?}"), -                            } -                        } -                        tungstenite::Message::Close(_) => break, -                        _ => (), -                    } -                } -                info!("{addr} left"); -            } -        } -    } -    Ok(()) -} - -pub async fn do_record(output: &Path, url: &str) -> anyhow::Result<()> { -    let mut file = BufWriter::new(ZstdEncoder::new(BufWriter::new( -        File::create(&output).await?, -    ))); -    info!("connecting to {url:?}..."); -    let (mut sock, _) = tokio_tungstenite::connect_async(url).await?; -    info!("starting recording."); -    let start = Instant::now(); - -    while let Some(Ok(message)) = sock.next().await { -        match message { -            tungstenite::Message::Text(line) => { -                let packet: PacketC = serde_json::from_str(&line).context("invalid packet")?; -                debug!("<- {packet:?}"); - -                let is_end = matches!(packet, PacketC::SetIngame { state: false, .. }); - -                file.write_all( -                    format!( -                        "{}\n", -                        serde_json::to_string(&Event { -                            ts: start.elapsed().as_secs_f64(), -                            packet -                        }) -                        .unwrap() -                    ) -                    .as_bytes(), -                ) -                .await?; - -                if is_end { -                    info!("stopping replay..."); -                    break; -                } +                replay(&ws_listener, &input).await?;              } -            tungstenite::Message::Close(_) => break, -            _ => (),          }      } -    file.flush().await?; -    let mut file = file.into_inner(); -    file.flush().await?; -    let mut file = file.into_inner(); -    file.flush().await?; -    let mut file = file.into_inner(); -    file.flush().await?; -    info!("done");      Ok(())  } diff --git a/server/replaytool/src/record.rs b/server/replaytool/src/record.rs new file mode 100644 index 00000000..8c12fc94 --- /dev/null +++ b/server/replaytool/src/record.rs @@ -0,0 +1,81 @@ +/* +    Hurry Curry! - a game about cooking +    Copyright (C) 2025 Hurry Curry! Contributors + +    This program is free software: you can redistribute it and/or modify +    it under the terms of the GNU Affero General Public License as published by +    the Free Software Foundation, version 3 of the License only. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Affero General Public License for more details. + +    You should have received a copy of the GNU Affero General Public License +    along with this program.  If not, see <https://www.gnu.org/licenses/>. + +*/ + +use anyhow::Context; +use async_compression::tokio::write::ZstdEncoder; +use futures_util::StreamExt; +use hurrycurry_protocol::PacketC; +use log::{debug, info}; +use std::{path::Path, time::Instant}; +use tokio::{ +    fs::File, +    io::{AsyncWriteExt, BufWriter}, +}; +use tokio_tungstenite::tungstenite; + +use crate::Event; + +pub async fn record(output: &Path, url: &str) -> anyhow::Result<()> { +    let mut file = BufWriter::new(ZstdEncoder::new(BufWriter::new( +        File::create(&output).await?, +    ))); +    info!("connecting to {url:?}..."); +    let (mut sock, _) = tokio_tungstenite::connect_async(url).await?; +    info!("starting recording."); +    let start = Instant::now(); + +    while let Some(Ok(message)) = sock.next().await { +        match message { +            tungstenite::Message::Text(line) => { +                let packet: PacketC = serde_json::from_str(&line).context("invalid packet")?; +                debug!("<- {packet:?}"); + +                let is_end = matches!(packet, PacketC::SetIngame { state: false, .. }); + +                file.write_all( +                    format!( +                        "{}\n", +                        serde_json::to_string(&Event { +                            ts: start.elapsed().as_secs_f64(), +                            packet +                        }) +                        .unwrap() +                    ) +                    .as_bytes(), +                ) +                .await?; + +                if is_end { +                    info!("stopping replay..."); +                    break; +                } +            } +            tungstenite::Message::Close(_) => break, +            _ => (), +        } +    } +    file.flush().await?; +    let mut file = file.into_inner(); +    file.flush().await?; +    let mut file = file.into_inner(); +    file.flush().await?; +    let mut file = file.into_inner(); +    file.flush().await?; +    info!("done"); +    Ok(()) +} diff --git a/server/replaytool/src/replay.rs b/server/replaytool/src/replay.rs new file mode 100644 index 00000000..d442f599 --- /dev/null +++ b/server/replaytool/src/replay.rs @@ -0,0 +1,127 @@ +/* +    Hurry Curry! - a game about cooking +    Copyright (C) 2025 Hurry Curry! Contributors + +    This program is free software: you can redistribute it and/or modify +    it under the terms of the GNU Affero General Public License as published by +    the Free Software Foundation, version 3 of the License only. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Affero General Public License for more details. + +    You should have received a copy of the GNU Affero General Public License +    along with this program.  If not, see <https://www.gnu.org/licenses/>. + +*/ + +use crate::Event; +use anyhow::{anyhow, Result}; +use async_compression::tokio::bufread::ZstdDecoder; +use futures_util::{SinkExt, StreamExt}; +use hurrycurry_protocol::{Message, PacketC, PacketS}; +use log::{debug, info, warn}; +use std::path::Path; +use tokio::{ +    fs::File, +    io::{AsyncBufReadExt, BufReader}, +    net::TcpListener, +}; +use tokio_tungstenite::tungstenite; + +pub async fn replay(ws_listener: &TcpListener, input: &Path) -> Result<()> { +    let mut file = +        BufReader::new(ZstdDecoder::new(BufReader::new(File::open(&input).await?))).lines(); + +    let mut next_event = +        serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; +    let mut time = 0.; +    let mut paused = false; + +    info!("ready"); +    let (sock, addr) = ws_listener.accept().await?; +    let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { +        warn!("invalid ws handshake"); +        return Ok(()); +    }; +    info!("{addr} connected via ws"); + +    sock.send(tokio_tungstenite::tungstenite::Message::Text( +        serde_json::to_string(&PacketC::ReplayStart).unwrap().into(), +    )) +    .await?; +    while let Some(Ok(message)) = sock.next().await { +        let mut send = async |p: PacketC| { +            sock.send(tokio_tungstenite::tungstenite::Message::Text( +                serde_json::to_string(&p).unwrap().into(), +            )) +            .await +        }; +        match message { +            tungstenite::Message::Text(line) => { +                let packet: PacketS = match serde_json::from_str(&line) { +                    Ok(p) => p, +                    Err(e) => { +                        warn!("invalid packet: {e}"); +                        break; +                    } +                }; +                debug!("<- {packet:?}"); + +                match packet { +                    PacketS::Join { .. } => { +                        send(PacketC::ServerMessage { +                            message: Message::Translation { +                                id: "s.replay.cannot_join".to_owned(), +                                params: vec![], +                            }, +                            error: true, +                        }) +                        .await?; +                    } +                    PacketS::Idle { paused: pause } => { +                        // Cannot display server hint like in main server because cant address them to spectators. +                        paused = pause; +                        send(PacketC::Pause { state: pause }).await? +                    } +                    PacketS::ReplayTick { dt } => { +                        if paused { +                            continue; +                        } +                        time += dt; +                        while next_event.ts < time { +                            debug!("<- {:?}", next_event.packet); +                            if !matches!(next_event.packet, PacketC::SetIngame { state: false, .. }) +                            { +                                send(next_event.packet).await?; +                            } + +                            if let Some(next) = &file.next_line().await? { +                                next_event = serde_json::from_str::<Event>(next)?; +                            } else { +                                info!("reached end"); +                                send(PacketC::ServerMessage { +                                    message: Message::Text("End of Replay".to_string()), +                                    error: false, +                                }) +                                .await?; +                                send(PacketC::ReplayStop).await?; +                                next_event = Event { +                                    ts: f64::INFINITY, +                                    packet: PacketC::FlushMap, +                                }; +                                continue; +                            }; +                        } +                    } +                    x => warn!("unhandled client packet: {x:?}"), +                } +            } +            tungstenite::Message::Close(_) => break, +            _ => (), +        } +    } +    info!("{addr} left"); +    Ok(()) +} | 
