diff options
Diffstat (limited to 'server/src/routes/playersync.rs')
-rw-r--r-- | server/src/routes/playersync.rs | 108 |
1 files changed, 0 insertions, 108 deletions
diff --git a/server/src/routes/playersync.rs b/server/src/routes/playersync.rs deleted file mode 100644 index 9eb6175..0000000 --- a/server/src/routes/playersync.rs +++ /dev/null @@ -1,108 +0,0 @@ -use super::Cors; -use anyhow::bail; -use chashmap::CHashMap; -use futures::{SinkExt, StreamExt}; -use log::warn; -use rocket::{get, State}; -use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast::{self, Sender}; - -#[derive(Default)] -pub struct PlayersyncChannels { - channels: CHashMap<String, broadcast::Sender<Message>>, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Packet { - Time(f64), - Playing(bool), - Join(String), - Leave(String), -} - -#[get("/playersync/<channel>")] -pub fn r_playersync( - ws: WebSocket, - state: &State<PlayersyncChannels>, - channel: &str, -) -> Cors<Channel<'static>> { - let sender = state - .channels - .get(&channel.to_owned()) - .map(|x| x.to_owned()) - .unwrap_or_else(|| { - let ch = broadcast::channel(16).0; - state.channels.insert(channel.to_owned(), ch.clone()); - ch - }); - Cors(ws.channel(move |ws| { - Box::pin(async move { - let mut state = ClientState { - username: "unknown user".into(), - }; - if let Err(e) = handle_socket(&sender, ws, &mut state).await { - warn!("streamsync websocket error: {e:?}") - } - let _ = sender.send(Message::Text( - serde_json::to_string(&Packet::Leave(state.username)).unwrap(), - )); - Ok(()) - }) - })) -} - -struct ClientState { - username: String, -} - -async fn handle_socket( - broadcast: &Sender<Message>, - mut ws: DuplexStream, - state: &mut ClientState, -) -> anyhow::Result<()> { - let mut sub = broadcast.subscribe(); - loop { - tokio::select! { - message = ws.next() => { - match handle_packet(broadcast, message,state) { - Err(e) => Err(e)?, - Ok(true) => return Ok(()), - Ok(false) => () - } - }, - message = sub.recv() => { - ws.send(message?).await?; - } - }; - } -} - -fn handle_packet( - broadcast: &Sender<Message>, - message: Option<rocket_ws::result::Result<Message>>, - state: &mut ClientState, -) -> anyhow::Result<bool> { - let Some(message) = message else { - return Ok(true); - }; - let message = message?.into_text()?; - let packet: Packet = serde_json::from_str(&message)?; - - let broadcast = |p: Packet| -> anyhow::Result<()> { - broadcast.send(Message::Text(serde_json::to_string(&p)?))?; - Ok(()) - }; - - match packet { - Packet::Join(username) => { - broadcast(Packet::Join(username.clone()))?; - state.username = username; - } - Packet::Leave(_) => bail!("illegal packet"), - p => broadcast(p)?, - }; - - Ok(false) -} |