1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
use crate::game::protocol::Packet;
use crate::State;
use super::Config;
use anyhow::Result;
use axum::extract;
use axum::extract::connect_info::ConnectInfo;
use axum::extract::ws::Message;
use axum::response::Html;
use axum::{
extract::ws::{WebSocket, WebSocketUpgrade},
response::IntoResponse,
routing::get,
Router,
};
use log::{info, warn};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::spawn;
use tokio::sync::{broadcast, RwLock};
struct SpectateState {
past_events: RwLock<Vec<Packet>>,
events: broadcast::Sender<Packet>,
}
pub async fn spectate_server(config: Config, state: Arc<State>) -> Result<()> {
let sstate = Arc::new(SpectateState {
past_events: Default::default(),
events: broadcast::channel(16).0,
});
spawn(broadcaster(sstate.clone(), state));
let app = Router::new()
.route("/", get(index))
.route("/events", get(ws_handler))
.with_state(sstate);
let listener = tokio::net::TcpListener::bind(config.bind).await.unwrap();
info!("listening on {}", listener.local_addr()?);
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;
Ok(())
}
async fn index() -> Html<&'static str> {
Html(include_str!("index.html"))
}
async fn broadcaster(sstate: Arc<SpectateState>, state: Arc<State>) {
let mut ticks = state.tick.subscribe();
while let Ok(new_game) = ticks.recv().await {
let mut events = Vec::new();
{
let g = state.game.read().await;
if new_game {
sstate.past_events.write().await.clear();
events.push(Packet::Game {
my_id: 0,
width: g.size.x as usize,
height: g.size.y as usize,
});
for (player, (_, _, name)) in &g.heads {
events.push(Packet::Player {
id: *player,
name: name.to_owned(),
})
}
}
for (player, (_, pos, _)) in &g.heads {
events.push(Packet::Pos {
id: *player,
x: pos.x,
y: pos.y,
})
}
if !g.dead.is_empty() {
events.push(Packet::Die(g.dead.clone()));
}
events.push(Packet::Tick);
}
sstate.past_events.write().await.extend(events.clone());
for ev in events {
let _ = sstate.events.send(ev);
}
}
}
async fn ws_handler(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
extract::State(state): extract::State<Arc<SpectateState>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| async move {
if let Err(e) = handle_socket(socket, addr, state).await {
warn!("client error {e}")
}
})
}
async fn handle_socket(
mut socket: WebSocket,
_addr: SocketAddr,
state: Arc<SpectateState>,
) -> anyhow::Result<()> {
let past = state.past_events.read().await.clone();
for p in past {
socket
.send(Message::Text(serde_json::to_string(&p)?))
.await?;
}
let mut live = state.events.subscribe();
while let Ok(p) = live.recv().await {
socket
.send(Message::Text(serde_json::to_string(&p)?))
.await?;
}
Ok(())
}
|