use super::Config; use crate::{game::protocol::Packet, State}; use anyhow::{anyhow, Result}; use axum::{ extract::{ self, connect_info::ConnectInfo, ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade}, }, http::HeaderMap, response::{Html, IntoResponse}, routing::get, Json, Router, }; use headers::ContentType; use log::{info, warn}; use std::{ cmp::Reverse, collections::{HashMap, VecDeque}, net::SocketAddr, str::FromStr, sync::Arc, }; use tokio::{ spawn, sync::{broadcast, RwLock}, }; struct SpectateState { past_events: RwLock>, win_history: RwLock>>, events: broadcast::Sender, } pub async fn spectate_server(config: Config, state: Arc) -> Result<()> { let sstate = Arc::new(SpectateState { past_events: Default::default(), win_history: Default::default(), events: broadcast::channel(512).0, }); spawn(game_broadcaster(sstate.clone(), state.clone())); spawn(chat_broadcaster(sstate.clone(), state.clone())); let app = Router::new() .route("/", get(index)) .route("/main.js", get(javascript)) .route("/style.css", get(css)) .route("/events", get(ws_handler)) .route("/stats", get(stats)) .with_state(sstate); let listener = tokio::net::TcpListener::bind(config.bind).await.unwrap(); info!("listening on {}", listener.local_addr()?); axum::serve( listener, app.into_make_service_with_connect_info::(), ) .await?; Ok(()) } #[cfg(debug_assertions)] async fn index() -> Html { use tokio::fs::read_to_string; Html( read_to_string(concat!( env!("CARGO_MANIFEST_DIR"), "/src/spectate/index.html" )) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn index() -> Html<&'static str> { Html(include_str!("index.html")) } #[cfg(debug_assertions)] async fn javascript() -> (HeaderMap, String) { use headers::HeaderMapExt; use tokio::fs::read_to_string; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("application/javascript").unwrap()); ( hm, read_to_string(concat!(env!("OUT_DIR"), "/main.js")) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn javascript() -> (HeaderMap, &'static str) { use headers::HeaderMapExt; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("application/javascript").unwrap()); (hm, include_str!(concat!(env!("OUT_DIR"), "/main.js"))) } #[cfg(debug_assertions)] async fn css() -> (HeaderMap, String) { use headers::HeaderMapExt; use tokio::fs::read_to_string; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("text/css").unwrap()); ( hm, read_to_string(concat!( env!("CARGO_MANIFEST_DIR"), "/src/spectate/style.css" )) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn css() -> (HeaderMap, &'static str) { use headers::HeaderMapExt; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("text/css").unwrap()); (hm, include_str!("style.css")) } async fn stats( extract::State(state): extract::State>, ) -> Json> { let stats = state.win_history.read().await.to_owned(); let mut scoreboard = HashMap::new(); for w in stats.clone() { if let Some(w) = w { *scoreboard.entry(w).or_default() += 1 } } let mut scoreboard = scoreboard.into_iter().collect::>(); scoreboard.sort_by_key(|(_, s)| Reverse(*s)); Json(scoreboard) } async fn chat_broadcaster(sstate: Arc, state: Arc) { let mut messages = state.chat.subscribe(); while let Ok((author, message)) = messages.recv().await { let p = Packet::Message { id: author, message: format!("{author}: {message}"), }; sstate.past_events.write().await.push(p.clone()); let _ = sstate.events.send(p); } } async fn game_broadcaster(sstate: Arc, state: Arc) { let mut ticks = state.tick.subscribe(); while let Ok(new_game) = ticks.recv().await { let mut events = Vec::new(); { let g = state.game.read().await; events.push(Packet::Tick); if let Some(winner) = new_game { if let Some(winner) = winner { let winner = if let Some(winner) = state.players.write().await.get(&winner).cloned() { Some(winner) } else { None }; let mut h = sstate.win_history.write().await; h.push_front(winner); while h.len() > 64 { h.pop_back(); } } events.push(Packet::Win(winner.unwrap_or(u32::MAX) as usize, 0)); // TODO packet misuse sstate.past_events.write().await.clear(); events.push(Packet::Game { my_id: 0, width: g.map.size.x as usize, height: g.map.size.y as usize, }); for (player, (_, _, name)) in &g.heads { events.push(Packet::Player { id: *player, name: name.to_owned(), }) } } for (player, (_, pos, _)) in &g.heads { events.push(Packet::Pos { id: *player, x: pos.x, y: pos.y, }) } if !g.dead.is_empty() { events.push(Packet::Die(g.dead.clone())); } } sstate.past_events.write().await.extend(events.clone()); for ev in events { let _ = sstate.events.send(ev); } } } async fn ws_handler( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo, extract::State(state): extract::State>, ) -> impl IntoResponse { ws.on_upgrade(move |socket| async move { if let Err(e) = handle_socket(socket, addr, state).await { warn!("client error {e}") } }) } async fn handle_socket( mut socket: WebSocket, _addr: SocketAddr, state: Arc, ) -> anyhow::Result<()> { let past = state.past_events.read().await.clone(); for p in past { socket .send(Message::Text(Utf8Bytes::from(serde_json::to_string(&p)?))) .await?; } let mut live = state.events.subscribe(); loop { tokio::select! { message = socket.recv() => { message.ok_or(anyhow!("socket end"))??; } event = live.recv() => { socket .send(Message::Text(Utf8Bytes::from( serde_json::to_string(&event?)?))) .await?; } } } }