use super::Cors; use chashmap::CHashMap; use futures::{SinkExt, StreamExt}; use log::warn; use rocket::{get, State}; use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; use tokio::sync::broadcast::{self, Sender}; #[derive(Default)] pub struct PlayersyncChannels { channels: CHashMap>, } #[get("/playersync/")] pub fn r_streamsync( ws: WebSocket, state: &State, channel: &str, ) -> Cors> { let sender = state .channels .get(&channel.to_owned()) .map(|x| x.to_owned()) .unwrap_or_else(|| { let ch = broadcast::channel(16).0; state.channels.insert(channel.to_owned(), ch.clone()); ch }); Cors(ws.channel(move |ws| { Box::pin(async move { if let Err(e) = handle_socket(sender, ws).await { warn!("streamsync websocket error: {e:?}") } Ok(()) }) })) } async fn handle_socket(broadcast: Sender, mut ws: DuplexStream) -> anyhow::Result<()> { let mut sub = broadcast.subscribe(); loop { tokio::select! { message = ws.next() => { if let Some(message) = message { broadcast.send(message?)?; } else { return Ok(()) } }, message = sub.recv() => { ws.send(message?).await?; } }; } }