/* Hurry Curry! - a game about cooking Copyright (C) 2025 Hurry Curry! Contributors This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3 of the License only. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ use crate::Event; use anyhow::{anyhow, Result}; use async_compression::tokio::bufread::ZstdDecoder; use futures_util::{SinkExt, StreamExt}; use hurrycurry_protocol::{Message, PacketC, PacketS}; use log::{debug, info, warn}; use std::path::Path; use tokio::{ fs::File, io::{AsyncBufReadExt, BufReader}, net::TcpListener, }; use tokio_tungstenite::tungstenite; pub async fn replay(ws_listener: &TcpListener, input: &Path) -> Result<()> { let mut file = BufReader::new(ZstdDecoder::new(BufReader::new(File::open(&input).await?))).lines(); let mut next_event = serde_json::from_str::(&file.next_line().await?.ok_or(anyhow!("eof"))?)?; let mut time = 0.; let mut paused = false; info!("ready"); let (sock, addr) = ws_listener.accept().await?; let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else { warn!("invalid ws handshake"); return Ok(()); }; info!("{addr} connected via ws"); sock.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&PacketC::ReplayStart).unwrap().into(), )) .await?; while let Some(Ok(message)) = sock.next().await { let mut send = async |p: PacketC| { sock.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&p).unwrap().into(), )) .await }; match message { tungstenite::Message::Text(line) => { let packet: PacketS = match serde_json::from_str(&line) { Ok(p) => p, Err(e) => { warn!("invalid packet: {e}"); break; } }; debug!("<- {packet:?}"); match packet { PacketS::Join { .. } => { send(PacketC::ServerMessage { message: Message::Translation { id: "s.replay.cannot_join".to_owned(), params: vec![], }, error: true, }) .await?; } PacketS::Idle { paused: pause } => { // Cannot display server hint like in main server because cant address them to spectators. paused = pause; send(PacketC::Pause { state: pause }).await? } PacketS::ReplayTick { dt } => { if paused { continue; } time += dt; while next_event.ts < time { debug!("<- {:?}", next_event.packet); if !matches!(next_event.packet, PacketC::SetIngame { state: false, .. }) { send(next_event.packet).await?; } if let Some(next) = &file.next_line().await? { next_event = serde_json::from_str::(next)?; } else { info!("reached end"); send(PacketC::ServerMessage { message: Message::Text("End of Replay".to_string()), error: false, }) .await?; send(PacketC::ReplayStop).await?; next_event = Event { ts: f64::INFINITY, packet: PacketC::FlushMap, }; continue; }; } } x => warn!("unhandled client packet: {x:?}"), } } tungstenite::Message::Close(_) => break, _ => (), } } info!("{addr} left"); Ok(()) }