1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
/*
Hurry Curry! - a game about cooking
Copyright (C) 2025 Hurry Curry! Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use crate::Event;
use anyhow::{anyhow, Result};
use async_compression::tokio::bufread::ZstdDecoder;
use futures_util::{SinkExt, StreamExt};
use hurrycurry_protocol::{Message, PacketC, PacketS};
use log::{debug, info, warn};
use std::path::Path;
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
net::TcpListener,
};
use tokio_tungstenite::tungstenite;
pub async fn replay(ws_listener: &TcpListener, input: &Path) -> Result<()> {
let mut file =
BufReader::new(ZstdDecoder::new(BufReader::new(File::open(&input).await?))).lines();
let mut next_event =
serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?;
let mut time = 0.;
let mut paused = false;
info!("ready");
let (sock, addr) = ws_listener.accept().await?;
let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else {
warn!("invalid ws handshake");
return Ok(());
};
info!("{addr} connected via ws");
sock.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&PacketC::ReplayStart).unwrap().into(),
))
.await?;
while let Some(Ok(message)) = sock.next().await {
let mut send = async |p: PacketC| {
sock.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&p).unwrap().into(),
))
.await
};
match message {
tungstenite::Message::Text(line) => {
let packet: PacketS = match serde_json::from_str(&line) {
Ok(p) => p,
Err(e) => {
warn!("invalid packet: {e}");
break;
}
};
debug!("<- {packet:?}");
match packet {
PacketS::Join { .. } => {
send(PacketC::ServerMessage {
message: Message::Translation {
id: "s.replay.cannot_join".to_owned(),
params: vec![],
},
error: true,
})
.await?;
}
PacketS::Idle { paused: pause } => {
// Cannot display server hint like in main server because cant address them to spectators.
paused = pause;
send(PacketC::Pause { state: pause }).await?
}
PacketS::ReplayTick { dt } => {
if paused {
continue;
}
time += dt;
while next_event.ts < time {
debug!("<- {:?}", next_event.packet);
if !matches!(next_event.packet, PacketC::SetIngame { state: false, .. })
{
send(next_event.packet).await?;
}
if let Some(next) = &file.next_line().await? {
next_event = serde_json::from_str::<Event>(next)?;
} else {
info!("reached end");
send(PacketC::ServerMessage {
message: Message::Text("End of Replay".to_string()),
error: false,
})
.await?;
send(PacketC::ReplayStop).await?;
next_event = Event {
ts: f64::INFINITY,
packet: PacketC::FlushMap,
};
continue;
};
}
}
x => warn!("unhandled client packet: {x:?}"),
}
}
tungstenite::Message::Close(_) => break,
_ => (),
}
}
info!("{addr} left");
Ok(())
}
|