use super::Config; use crate::{game::protocol::Packet, State}; use anyhow::{anyhow, Result}; use axum::{ extract::{ self, connect_info::ConnectInfo, ws::{Message, WebSocket, WebSocketUpgrade}, }, http::HeaderMap, response::{Html, IntoResponse}, routing::get, Router, }; use headers::ContentType; use log::{info, warn}; use std::{net::SocketAddr, str::FromStr, sync::Arc}; use tokio::{ spawn, sync::{broadcast, RwLock}, }; struct SpectateState { past_events: RwLock>, events: broadcast::Sender, } pub async fn spectate_server(config: Config, state: Arc) -> Result<()> { let sstate = Arc::new(SpectateState { past_events: Default::default(), events: broadcast::channel(512).0, }); spawn(broadcaster(sstate.clone(), state)); let app = Router::new() .route("/", get(index)) .route("/main.js", get(javascript)) .route("/style.css", get(css)) .route("/events", get(ws_handler)) .with_state(sstate); let listener = tokio::net::TcpListener::bind(config.bind).await.unwrap(); info!("listening on {}", listener.local_addr()?); axum::serve( listener, app.into_make_service_with_connect_info::(), ) .await?; Ok(()) } async fn index() -> Html<&'static str> { Html(include_str!("index.html")) } #[cfg(debug_assertions)] async fn javascript() -> (HeaderMap, String) { use headers::HeaderMapExt; use tokio::fs::read_to_string; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("application/javascript").unwrap()); ( hm, read_to_string(concat!(env!("OUT_DIR"), "/main.js")) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn javascript() -> (HeaderMap, &'static str) { use headers::HeaderMapExt; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("application/javascript").unwrap()); (hm, include_str!(concat!(env!("OUT_DIR"), "/main.js"))) } #[cfg(debug_assertions)] async fn css() -> (HeaderMap, String) { use headers::HeaderMapExt; use tokio::fs::read_to_string; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("text/css").unwrap()); ( hm, read_to_string(concat!( env!("CARGO_MANIFEST_DIR"), "/src/spectate/style.css" )) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn css() -> (HeaderMap, &'static str) { use headers::HeaderMapExt; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("text/css").unwrap()); ( hm, include_str!(concat!( env!("CARGO_MANIFEST_DIR"), "/src/spectate/style.css" )), ) } async fn broadcaster(sstate: Arc, state: Arc) { let mut ticks = state.tick.subscribe(); while let Ok(new_game) = ticks.recv().await { let mut events = Vec::new(); { let g = state.game.read().await; events.push(Packet::Tick); if new_game { sstate.past_events.write().await.clear(); events.push(Packet::Game { my_id: 0, width: g.map.size.x as usize, height: g.map.size.y as usize, }); for (player, (_, _, name)) in &g.heads { events.push(Packet::Player { id: *player, name: name.to_owned(), }) } } for (player, (_, pos, _)) in &g.heads { events.push(Packet::Pos { id: *player, x: pos.x, y: pos.y, }) } if !g.dead.is_empty() { events.push(Packet::Die(g.dead.clone())); } } sstate.past_events.write().await.extend(events.clone()); for ev in events { let _ = sstate.events.send(ev); } } } async fn ws_handler( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo, extract::State(state): extract::State>, ) -> impl IntoResponse { ws.on_upgrade(move |socket| async move { if let Err(e) = handle_socket(socket, addr, state).await { warn!("client error {e}") } }) } async fn handle_socket( mut socket: WebSocket, _addr: SocketAddr, state: Arc, ) -> anyhow::Result<()> { let past = state.past_events.read().await.clone(); for p in past { socket .send(Message::Text(serde_json::to_string(&p)?)) .await?; } let mut live = state.events.subscribe(); loop { tokio::select! { message = socket.recv() => { message.ok_or(anyhow!("socket end"))??; } event = live.recv() => { socket .send(Message::Text(serde_json::to_string(&event?)?)) .await?; } } } }