use super::Cors; use anyhow::bail; use chashmap::CHashMap; use futures::{SinkExt, StreamExt}; use log::warn; use rocket::{get, State}; use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::{self, Sender}; #[derive(Default)] pub struct PlayersyncChannels { channels: CHashMap>, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "snake_case")] pub enum Packet { Time(f64), Playing(bool), Join(String), Leave(String), } #[get("/playersync/")] pub fn r_playersync( ws: WebSocket, state: &State, channel: &str, ) -> Cors> { let sender = state .channels .get(&channel.to_owned()) .map(|x| x.to_owned()) .unwrap_or_else(|| { let ch = broadcast::channel(16).0; state.channels.insert(channel.to_owned(), ch.clone()); ch }); Cors(ws.channel(move |ws| { Box::pin(async move { let mut state = ClientState { username: "unknown user".into(), }; if let Err(e) = handle_socket(&sender, ws, &mut state).await { warn!("streamsync websocket error: {e:?}") } let _ = sender.send(Message::Text( serde_json::to_string(&Packet::Leave(state.username)).unwrap(), )); Ok(()) }) })) } struct ClientState { username: String, } async fn handle_socket( broadcast: &Sender, mut ws: DuplexStream, state: &mut ClientState, ) -> anyhow::Result<()> { let mut sub = broadcast.subscribe(); loop { tokio::select! { message = ws.next() => { match handle_packet(broadcast, message,state) { Err(e) => Err(e)?, Ok(true) => return Ok(()), Ok(false) => () } }, message = sub.recv() => { ws.send(message?).await?; } }; } } fn handle_packet( broadcast: &Sender, message: Option>, state: &mut ClientState, ) -> anyhow::Result { let Some(message) = message else { return Ok(true); }; let message = message?.into_text()?; let packet: Packet = serde_json::from_str(&message)?; let broadcast = |p: Packet| -> anyhow::Result<()> { broadcast.send(Message::Text(serde_json::to_string(&p)?))?; Ok(()) }; match packet { Packet::Join(username) => { broadcast(Packet::Join(username.clone()))?; state.username = username; } Packet::Leave(_) => bail!("illegal packet"), p => broadcast(p)?, }; Ok(false) }