use crate::game::protocol::Packet; use crate::State; use super::Config; use anyhow::Result; use axum::extract; use axum::extract::connect_info::ConnectInfo; use axum::extract::ws::Message; use axum::http::HeaderMap; use axum::response::Html; use axum::{ extract::ws::{WebSocket, WebSocketUpgrade}, response::IntoResponse, routing::get, Router, }; use headers::ContentType; use log::{info, warn}; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use tokio::spawn; use tokio::sync::{broadcast, RwLock}; struct SpectateState { past_events: RwLock>, events: broadcast::Sender, } pub async fn spectate_server(config: Config, state: Arc) -> Result<()> { let sstate = Arc::new(SpectateState { past_events: Default::default(), events: broadcast::channel(16).0, }); spawn(broadcaster(sstate.clone(), state)); let app = Router::new() .route("/", get(index)) .route("/main.js", get(javascript)) .route("/style.css", get(css)) .route("/events", get(ws_handler)) .with_state(sstate); let listener = tokio::net::TcpListener::bind(config.bind).await.unwrap(); info!("listening on {}", listener.local_addr()?); axum::serve( listener, app.into_make_service_with_connect_info::(), ) .await?; Ok(()) } async fn index() -> Html<&'static str> { Html(include_str!("index.html")) } #[cfg(debug_assertions)] async fn javascript() -> (HeaderMap, String) { use headers::HeaderMapExt; use tokio::fs::read_to_string; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("application/javascript").unwrap()); ( hm, read_to_string(concat!(env!("OUT_DIR"), "/main.js")) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn javascript() -> (HeaderMap, &'static str) { use headers::HeaderMapExt; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("application/javascript").unwrap()); (hm, include_str!(concat!(env!("OUT_DIR"), "/main.js"))) } #[cfg(debug_assertions)] async fn css() -> (HeaderMap, String) { use headers::HeaderMapExt; use tokio::fs::read_to_string; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("text/css").unwrap()); ( hm, read_to_string(concat!( env!("CARGO_MANIFEST_DIR"), "/src/spectate/style.css" )) .await .unwrap(), ) } #[cfg(not(debug_assertions))] async fn css() -> (HeaderMap, &'static str) { use headers::HeaderMapExt; let mut hm = HeaderMap::new(); hm.typed_insert(ContentType::from_str("text/css").unwrap()); ( hm, include_str!(concat!( env!("CARGO_MANIFEST_DIR"), "/src/spectate/style.css" )), ) } async fn broadcaster(sstate: Arc, state: Arc) { let mut ticks = state.tick.subscribe(); while let Ok(new_game) = ticks.recv().await { let mut events = Vec::new(); { let g = state.game.read().await; events.push(Packet::Tick); if new_game { sstate.past_events.write().await.clear(); events.push(Packet::Game { my_id: 0, width: g.map.size.x as usize, height: g.map.size.y as usize, }); for (player, (_, _, name)) in &g.heads { events.push(Packet::Player { id: *player, name: name.to_owned(), }) } } for (player, (_, pos, _)) in &g.heads { events.push(Packet::Pos { id: *player, x: pos.x, y: pos.y, }) } if !g.dead.is_empty() { events.push(Packet::Die(g.dead.clone())); } } sstate.past_events.write().await.extend(events.clone()); for ev in events { let _ = sstate.events.send(ev); } } } async fn ws_handler( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo, extract::State(state): extract::State>, ) -> impl IntoResponse { ws.on_upgrade(move |socket| async move { if let Err(e) = handle_socket(socket, addr, state).await { warn!("client error {e}") } }) } async fn handle_socket( mut socket: WebSocket, _addr: SocketAddr, state: Arc, ) -> anyhow::Result<()> { let past = state.past_events.read().await.clone(); for p in past { socket .send(Message::Text(serde_json::to_string(&p)?)) .await?; } let mut live = state.events.subscribe(); while let Ok(p) = live.recv().await { socket .send(Message::Text(serde_json::to_string(&p)?)) .await?; } Ok(()) }