diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-27 19:25:11 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-27 19:25:11 +0200 |
commit | 11a585b3dbe620dcc8772e713b22f1d9ba80d598 (patch) | |
tree | 44f8d97137412aefc79a2425a489c34fa3e5f6c5 /server/src/logic/playersync.rs | |
parent | d871aa7c5bba49ff55170b5d2dac9cd440ae7170 (diff) | |
download | jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar.bz2 jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar.zst |
move files around
Diffstat (limited to 'server/src/logic/playersync.rs')
-rw-r--r-- | server/src/logic/playersync.rs | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/server/src/logic/playersync.rs b/server/src/logic/playersync.rs new file mode 100644 index 0000000..b4cc51b --- /dev/null +++ b/server/src/logic/playersync.rs @@ -0,0 +1,109 @@ +use anyhow::bail; +use chashmap::CHashMap; +use futures::{SinkExt, StreamExt}; +use log::warn; +use rocket::{get, State}; +use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast::{self, Sender}; + +use crate::helper::cors::Cors; + +#[derive(Default)] +pub struct PlayersyncChannels { + channels: CHashMap<String, broadcast::Sender<Message>>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Packet { + Time(f64), + Playing(bool), + Join(String), + Leave(String), +} + +#[get("/playersync/<channel>")] +pub fn r_playersync( + ws: WebSocket, + state: &State<PlayersyncChannels>, + channel: &str, +) -> Cors<Channel<'static>> { + let sender = state + .channels + .get(&channel.to_owned()) + .map(|x| x.to_owned()) + .unwrap_or_else(|| { + let ch = broadcast::channel(16).0; + state.channels.insert(channel.to_owned(), ch.clone()); + ch + }); + Cors(ws.channel(move |ws| { + Box::pin(async move { + let mut state = ClientState { + username: "unknown user".into(), + }; + if let Err(e) = handle_socket(&sender, ws, &mut state).await { + warn!("streamsync websocket error: {e:?}") + } + let _ = sender.send(Message::Text( + serde_json::to_string(&Packet::Leave(state.username)).unwrap(), + )); + Ok(()) + }) + })) +} + +struct ClientState { + username: String, +} + +async fn handle_socket( + broadcast: &Sender<Message>, + mut ws: DuplexStream, + state: &mut ClientState, +) -> anyhow::Result<()> { + let mut sub = broadcast.subscribe(); + loop { + tokio::select! { + message = ws.next() => { + match handle_packet(broadcast, message,state) { + Err(e) => Err(e)?, + Ok(true) => return Ok(()), + Ok(false) => () + } + }, + message = sub.recv() => { + ws.send(message?).await?; + } + }; + } +} + +fn handle_packet( + broadcast: &Sender<Message>, + message: Option<rocket_ws::result::Result<Message>>, + state: &mut ClientState, +) -> anyhow::Result<bool> { + let Some(message) = message else { + return Ok(true); + }; + let message = message?.into_text()?; + let packet: Packet = serde_json::from_str(&message)?; + + let broadcast = |p: Packet| -> anyhow::Result<()> { + broadcast.send(Message::Text(serde_json::to_string(&p)?))?; + Ok(()) + }; + + match packet { + Packet::Join(username) => { + broadcast(Packet::Join(username.clone()))?; + state.username = username; + } + Packet::Leave(_) => bail!("illegal packet"), + p => broadcast(p)?, + }; + + Ok(false) +} |