diff options
Diffstat (limited to 'server/src/routes/playersync.rs')
-rw-r--r-- | server/src/routes/playersync.rs | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/server/src/routes/playersync.rs b/server/src/routes/playersync.rs new file mode 100644 index 0000000..2a7d3f6 --- /dev/null +++ b/server/src/routes/playersync.rs @@ -0,0 +1,55 @@ +use super::Cors; +use chashmap::CHashMap; +use futures::{SinkExt, StreamExt}; +use log::warn; +use rocket::{get, State}; +use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; +use tokio::sync::broadcast::{self, Sender}; + +#[derive(Default)] +pub struct PlayersyncChannels { + channels: CHashMap<String, broadcast::Sender<Message>>, +} + +#[get("/playersync/<channel>")] +pub fn r_streamsync( + ws: WebSocket, + state: &State<PlayersyncChannels>, + channel: &str, +) -> Cors<Channel<'static>> { + let sender = state + .channels + .get(&channel.to_owned()) + .map(|x| x.to_owned()) + .unwrap_or_else(|| { + let ch = broadcast::channel(16).0; + state.channels.insert(channel.to_owned(), ch.clone()); + ch + }); + Cors(ws.channel(move |ws| { + Box::pin(async move { + if let Err(e) = handle_socket(sender, ws).await { + warn!("streamsync websocket error: {e:?}") + } + Ok(()) + }) + })) +} + +async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyhow::Result<()> { + let mut sub = broadcast.subscribe(); + loop { + tokio::select! { + message = ws.next() => { + if let Some(message) = message { + broadcast.send(message?)?; + } else { + return Ok(()) + } + }, + message = sub.recv() => { + ws.send(message?).await?; + } + }; + } +} |