diff options
Diffstat (limited to 'server/src/routes/playersync.rs')
| -rw-r--r-- | server/src/routes/playersync.rs | 65 | 
1 files changed, 59 insertions, 6 deletions
| diff --git a/server/src/routes/playersync.rs b/server/src/routes/playersync.rs index 2a7d3f6..1ef9d73 100644 --- a/server/src/routes/playersync.rs +++ b/server/src/routes/playersync.rs @@ -1,9 +1,11 @@  use super::Cors; +use anyhow::bail;  use chashmap::CHashMap;  use futures::{SinkExt, StreamExt};  use log::warn;  use rocket::{get, State};  use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; +use serde::{Deserialize, Serialize};  use tokio::sync::broadcast::{self, Sender};  #[derive(Default)] @@ -11,6 +13,15 @@ pub struct PlayersyncChannels {      channels: CHashMap<String, broadcast::Sender<Message>>,  } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Packet { +    Time(f64), +    Playing(bool), +    Join(String), +    Leave(String), +} +  #[get("/playersync/<channel>")]  pub fn r_streamsync(      ws: WebSocket, @@ -28,23 +39,37 @@ pub fn r_streamsync(          });      Cors(ws.channel(move |ws| {          Box::pin(async move { -            if let Err(e) = handle_socket(sender, ws).await { +            let mut state = ClientState { +                username: "unknown user".into(), +            }; +            if let Err(e) = handle_socket(&sender, ws, &mut state).await {                  warn!("streamsync websocket error: {e:?}")              } +            let _ = sender.send(Message::Text( +                serde_json::to_string(&Packet::Leave(state.username)).unwrap(), +            ));              Ok(())          })      }))  } -async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyhow::Result<()> { +struct ClientState { +    username: String, +} + +async fn handle_socket( +    broadcast: &Sender<Message>, +    mut ws: DuplexStream, +    state: &mut ClientState, +) -> anyhow::Result<()> {      let mut sub = broadcast.subscribe();      loop {          tokio::select! {              message = ws.next() => { -                if let Some(message) = message { -                    broadcast.send(message?)?; -                } else { -                    return Ok(()) +                match handle_packet(broadcast, message,state) { +                    Err(e) => Err(e)?, +                    Ok(true) => return Ok(()), +                    Ok(false) => ()                  }              },              message = sub.recv() => { @@ -53,3 +78,31 @@ async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyh          };      }  } + +fn handle_packet( +    broadcast: &Sender<Message>, +    message: Option<rocket_ws::result::Result<Message>>, +    state: &mut ClientState, +) -> anyhow::Result<bool> { +    let Some(message) = message else { +        return Ok(true); +    }; +    let message = message?.into_text()?; +    let packet: Packet = serde_json::from_str(&message)?; + +    let broadcast = |p: Packet| -> anyhow::Result<()> { +        broadcast.send(Message::Text(serde_json::to_string(&p)?))?; +        Ok(()) +    }; + +    match packet { +        Packet::Join(username) => { +            broadcast(Packet::Join(username.clone()))?; +            state.username = username; +        } +        Packet::Leave(_) => bail!("illegal packet"), +        p => broadcast(p)?, +    }; + +    Ok(false) +} | 
