aboutsummaryrefslogtreecommitdiff
path: root/server/replaytool/src/replay.rs
blob: d442f59921ca3020d4ba1feb563664b92843f2a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
    Hurry Curry! - a game about cooking
    Copyright (C) 2025 Hurry Curry! Contributors

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, version 3 of the License only.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.

*/

use crate::Event;
use anyhow::{anyhow, Result};
use async_compression::tokio::bufread::ZstdDecoder;
use futures_util::{SinkExt, StreamExt};
use hurrycurry_protocol::{Message, PacketC, PacketS};
use log::{debug, info, warn};
use std::path::Path;
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
    net::TcpListener,
};
use tokio_tungstenite::tungstenite;

pub async fn replay(ws_listener: &TcpListener, input: &Path) -> Result<()> {
    let mut file =
        BufReader::new(ZstdDecoder::new(BufReader::new(File::open(&input).await?))).lines();

    let mut next_event =
        serde_json::from_str::<Event>(&file.next_line().await?.ok_or(anyhow!("eof"))?)?;
    let mut time = 0.;
    let mut paused = false;

    info!("ready");
    let (sock, addr) = ws_listener.accept().await?;
    let Ok(mut sock) = tokio_tungstenite::accept_async(sock).await else {
        warn!("invalid ws handshake");
        return Ok(());
    };
    info!("{addr} connected via ws");

    sock.send(tokio_tungstenite::tungstenite::Message::Text(
        serde_json::to_string(&PacketC::ReplayStart).unwrap().into(),
    ))
    .await?;
    while let Some(Ok(message)) = sock.next().await {
        let mut send = async |p: PacketC| {
            sock.send(tokio_tungstenite::tungstenite::Message::Text(
                serde_json::to_string(&p).unwrap().into(),
            ))
            .await
        };
        match message {
            tungstenite::Message::Text(line) => {
                let packet: PacketS = match serde_json::from_str(&line) {
                    Ok(p) => p,
                    Err(e) => {
                        warn!("invalid packet: {e}");
                        break;
                    }
                };
                debug!("<- {packet:?}");

                match packet {
                    PacketS::Join { .. } => {
                        send(PacketC::ServerMessage {
                            message: Message::Translation {
                                id: "s.replay.cannot_join".to_owned(),
                                params: vec![],
                            },
                            error: true,
                        })
                        .await?;
                    }
                    PacketS::Idle { paused: pause } => {
                        // Cannot display server hint like in main server because cant address them to spectators.
                        paused = pause;
                        send(PacketC::Pause { state: pause }).await?
                    }
                    PacketS::ReplayTick { dt } => {
                        if paused {
                            continue;
                        }
                        time += dt;
                        while next_event.ts < time {
                            debug!("<- {:?}", next_event.packet);
                            if !matches!(next_event.packet, PacketC::SetIngame { state: false, .. })
                            {
                                send(next_event.packet).await?;
                            }

                            if let Some(next) = &file.next_line().await? {
                                next_event = serde_json::from_str::<Event>(next)?;
                            } else {
                                info!("reached end");
                                send(PacketC::ServerMessage {
                                    message: Message::Text("End of Replay".to_string()),
                                    error: false,
                                })
                                .await?;
                                send(PacketC::ReplayStop).await?;
                                next_event = Event {
                                    ts: f64::INFINITY,
                                    packet: PacketC::FlushMap,
                                };
                                continue;
                            };
                        }
                    }
                    x => warn!("unhandled client packet: {x:?}"),
                }
            }
            tungstenite::Message::Close(_) => break,
            _ => (),
        }
    }
    info!("{addr} left");
    Ok(())
}