diff options
author | metamuffin <metamuffin@disroot.org> | 2024-01-21 15:17:05 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-01-21 15:17:05 +0100 |
commit | 06d5c0d961c85abb3dd645b65b4447936fe7690f (patch) | |
tree | 83684a1ccdaf078541836e25f5992e9a9754ae32 /server | |
parent | 86582075d1bc4eacd664b5eb48c4472e1f2119b0 (diff) | |
download | jellything-06d5c0d961c85abb3dd645b65b4447936fe7690f.tar jellything-06d5c0d961c85abb3dd645b65b4447936fe7690f.tar.bz2 jellything-06d5c0d961c85abb3dd645b65b4447936fe7690f.tar.zst |
refactor streamsync a bit
Diffstat (limited to 'server')
-rw-r--r-- | server/src/routes/playersync.rs | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/server/src/routes/playersync.rs b/server/src/routes/playersync.rs index 2a7d3f6..1ef9d73 100644 --- a/server/src/routes/playersync.rs +++ b/server/src/routes/playersync.rs @@ -1,9 +1,11 @@ use super::Cors; +use anyhow::bail; use chashmap::CHashMap; use futures::{SinkExt, StreamExt}; use log::warn; use rocket::{get, State}; use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; +use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::{self, Sender}; #[derive(Default)] @@ -11,6 +13,15 @@ pub struct PlayersyncChannels { channels: CHashMap<String, broadcast::Sender<Message>>, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Packet { + Time(f64), + Playing(bool), + Join(String), + Leave(String), +} + #[get("/playersync/<channel>")] pub fn r_streamsync( ws: WebSocket, @@ -28,23 +39,37 @@ pub fn r_streamsync( }); Cors(ws.channel(move |ws| { Box::pin(async move { - if let Err(e) = handle_socket(sender, ws).await { + let mut state = ClientState { + username: "unknown user".into(), + }; + if let Err(e) = handle_socket(&sender, ws, &mut state).await { warn!("streamsync websocket error: {e:?}") } + let _ = sender.send(Message::Text( + serde_json::to_string(&Packet::Leave(state.username)).unwrap(), + )); Ok(()) }) })) } -async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyhow::Result<()> { +struct ClientState { + username: String, +} + +async fn handle_socket( + broadcast: &Sender<Message>, + mut ws: DuplexStream, + state: &mut ClientState, +) -> anyhow::Result<()> { let mut sub = broadcast.subscribe(); loop { tokio::select! { message = ws.next() => { - if let Some(message) = message { - broadcast.send(message?)?; - } else { - return Ok(()) + match handle_packet(broadcast, message,state) { + Err(e) => Err(e)?, + Ok(true) => return Ok(()), + Ok(false) => () } }, message = sub.recv() => { @@ -53,3 +78,31 @@ async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyh }; } } + +fn handle_packet( + broadcast: &Sender<Message>, + message: Option<rocket_ws::result::Result<Message>>, + state: &mut ClientState, +) -> anyhow::Result<bool> { + let Some(message) = message else { + return Ok(true); + }; + let message = message?.into_text()?; + let packet: Packet = serde_json::from_str(&message)?; + + let broadcast = |p: Packet| -> anyhow::Result<()> { + broadcast.send(Message::Text(serde_json::to_string(&p)?))?; + Ok(()) + }; + + match packet { + Packet::Join(username) => { + broadcast(Packet::Join(username.clone()))?; + state.username = username; + } + Packet::Leave(_) => bail!("illegal packet"), + p => broadcast(p)?, + }; + + Ok(false) +} |