aboutsummaryrefslogtreecommitdiff
path: root/src/spectate/server.rs
blob: 8fa1a3d39fef456937bddc93206d8f479c5bb576 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::game::protocol::Packet;
use crate::State;

use super::Config;
use anyhow::Result;
use axum::extract;
use axum::extract::connect_info::ConnectInfo;
use axum::extract::ws::Message;
use axum::response::Html;
use axum::{
    extract::ws::{WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    routing::get,
    Router,
};
use log::{info, warn};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::spawn;
use tokio::sync::{broadcast, RwLock};

struct SpectateState {
    past_events: RwLock<Vec<Packet>>,
    events: broadcast::Sender<Packet>,
}

pub async fn spectate_server(config: Config, state: Arc<State>) -> Result<()> {
    let sstate = Arc::new(SpectateState {
        past_events: Default::default(),
        events: broadcast::channel(16).0,
    });
    spawn(broadcaster(sstate.clone(), state));
    let app = Router::new()
        .route("/", get(index))
        .route("/events", get(ws_handler))
        .with_state(sstate);
    let listener = tokio::net::TcpListener::bind(config.bind).await.unwrap();
    info!("listening on {}", listener.local_addr()?);
    axum::serve(
        listener,
        app.into_make_service_with_connect_info::<SocketAddr>(),
    )
    .await?;
    Ok(())
}

async fn index() -> Html<&'static str> {
    Html(include_str!("index.html"))
}

async fn broadcaster(sstate: Arc<SpectateState>, state: Arc<State>) {
    let mut ticks = state.tick.subscribe();
    while let Ok(new_game) = ticks.recv().await {
        let mut events = Vec::new();

        {
            let g = state.game.read().await;
            if new_game {
                sstate.past_events.write().await.clear();
                events.push(Packet::Game {
                    my_id: 0,
                    width: g.size.x as usize,
                    height: g.size.y as usize,
                });
                for (player, (_, _, name)) in &g.heads {
                    events.push(Packet::Player {
                        id: *player,
                        name: name.to_owned(),
                    })
                }
            }
            for (player, (_, pos, _)) in &g.heads {
                events.push(Packet::Pos {
                    id: *player,
                    x: pos.x,
                    y: pos.y,
                })
            }
            if !g.dead.is_empty() {
                events.push(Packet::Die(g.dead.clone()));
            }
        }

        sstate.past_events.write().await.extend(events.clone());
        for ev in events {
            let _ = sstate.events.send(ev);
        }
    }
}

async fn ws_handler(
    ws: WebSocketUpgrade,
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
    extract::State(state): extract::State<Arc<SpectateState>>,
) -> impl IntoResponse {
    ws.on_upgrade(move |socket| async move {
        if let Err(e) = handle_socket(socket, addr, state).await {
            warn!("client error {e}")
        }
    })
}

async fn handle_socket(
    mut socket: WebSocket,
    _addr: SocketAddr,
    state: Arc<SpectateState>,
) -> anyhow::Result<()> {
    let past = state.past_events.read().await.clone();
    for p in past {
        socket
            .send(Message::Text(serde_json::to_string(&p)?))
            .await?;
    }
    let mut live = state.events.subscribe();
    while let Ok(p) = live.recv().await {
        socket
            .send(Message::Text(serde_json::to_string(&p)?))
            .await?;
    }
    Ok(())
}