| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
 | use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
use std::{sync::Arc, time::Duration};
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
    net::TcpListener,
    spawn,
    sync::{broadcast, RwLock},
    time::sleep,
};
use tokio_tungstenite::tungstenite::Message;
use undercooked::{
    game::Game,
    protocol::{PacketC, PacketS},
};
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init_from_env("LOG");
    let raw_listener = TcpListener::bind("0.0.0.0:27031").await?;
    let ws_listener = TcpListener::bind("0.0.0.0:27032").await?;
    info!(
        "listening for line-based tcp on {}",
        raw_listener.local_addr()?
    );
    info!("listening for websockets on {}", ws_listener.local_addr()?);
    let game = Arc::new(RwLock::new(Game::new()));
    let (tx, rx) = broadcast::channel::<PacketC>(1024);
    {
        let game = game.clone();
        spawn(async move {
            loop {
                {
                    let mut g = game.write().await;
                    while let Some(p) = g.packet_out() {
                        debug!("-> {p:?}");
                        let _ = tx.send(p);
                    }
                }
                sleep(Duration::from_millis(20)).await;
            }
        });
    }
    for id in 0.. {
        tokio::select! {
            r = raw_listener.accept() => {
                let (sock, addr) = r?;
                let (read, mut write) = sock.into_split();
                let game = game.clone();
                let mut rx = rx.resubscribe();
                info!("{addr} connected");
                spawn(async move {
                    write
                        .write_all(serde_json::to_string(&PacketC::Joined { id }).unwrap().as_bytes())
                        .await?;
                    write.write_all(b"\n").await?;
                    while let Ok(packet) = rx.recv().await {
                        write
                            .write_all(serde_json::to_string(&packet).unwrap().as_bytes())
                            .await?;
                        write.write_all(b"\n").await?;
                    }
                    Ok::<_, anyhow::Error>(())
                });
                spawn(async move {
                    let mut read = BufReader::new(read).lines();
                    while let Ok(Some(line)) = read.next_line().await {
                        let Ok(packet): Result<PacketS, _> = serde_json::from_str(&line) else {
                            warn!("invalid json over tcp");
                            break
                        };
                        debug!("<- {id} {packet:?}");
                        if let Err(e) = game.write().await.packet_in(id, packet) {
                            warn!("client error: {e}");
                        }
                    }
                });
            }
            r = ws_listener.accept() => {
                let (sock, addr) = r?;
                let sock = tokio_tungstenite::accept_async(sock).await?;
                let (mut write, mut read) = sock.split();
                let game = game.clone();
                let mut rx = rx.resubscribe();
                info!("{addr} connected via ws");
                spawn(async move {
                    if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text(
                        serde_json::to_string(&PacketC::Joined { id }).unwrap(),
                    )).await {
                        warn!("ws error on init: {e}");
                        return;
                    }
                    while let Ok(packet) = rx.recv().await {
                        if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text(
                            serde_json::to_string(&packet).unwrap(),
                        )).await {
                            warn!("ws error: {e}");
                            break;
                        }
                    }
                });
                spawn(async move {
                    while let Some(Ok(message)) = read.next().await {
                        match message {
                            Message::Text(line) => {
                                let Ok(packet): Result<PacketS, _> = serde_json::from_str(&line) else {
                                    warn!("invalid json over ws");
                                    break
                                };
                                debug!("<- {id} {packet:?}");
                                if let Err(e) = game.write().await.packet_in(id, packet) {
                                    warn!("client error: {e}");
                                }
                            },
                            Message::Close(_) => break,
                            _ => (),
                        }
                    }
                });
            }
        }
    }
    Ok(())
}
 |