summaryrefslogtreecommitdiff
path: root/server/src/room.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-08 14:51:54 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-08 14:51:54 +0200
commit2cac71d01b23c2566b797160fd70f254c58550f3 (patch)
tree524150959b6e7cc0edf361cfa4fb77a1857a35dc /server/src/room.rs
parentcd255848196da0d8732d31049cc0f98388205a30 (diff)
downloadkeks-meet-2cac71d01b23c2566b797160fd70f254c58550f3.tar
keks-meet-2cac71d01b23c2566b797160fd70f254c58550f3.tar.bz2
keks-meet-2cac71d01b23c2566b797160fd70f254c58550f3.tar.zst
server rewrite works
Diffstat (limited to 'server/src/room.rs')
-rw-r--r--server/src/room.rs66
1 files changed, 50 insertions, 16 deletions
diff --git a/server/src/room.rs b/server/src/room.rs
index a47d2e5..db545a6 100644
--- a/server/src/room.rs
+++ b/server/src/room.rs
@@ -1,7 +1,7 @@
use crate::protocol::{ClientboundPacket, ServerboundPacket};
use futures_util::{SinkExt, StreamExt, TryFutureExt};
-use log::error;
-use std::collections::HashMap;
+use log::{debug, error};
+use std::{collections::HashMap, sync::atomic::AtomicUsize};
use tokio::sync::{mpsc, RwLock};
use warp::ws::{Message, WebSocket};
@@ -13,17 +13,36 @@ pub struct Client {
#[derive(Debug, Default)]
pub struct Room {
+ pub id_counter: AtomicUsize,
pub clients: RwLock<HashMap<usize, Client>>,
}
impl Room {
pub async fn client_connect(&self, ws: WebSocket) {
+ debug!("new client connected");
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
let (tx, mut rx) = mpsc::unbounded_channel();
+ let mut g = self.clients.write().await;
+ // ensure write guard to client exists when using id_counter
+ let id = self
+ .id_counter
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let name = format!("user no. {id}");
+ g.insert(
+ id,
+ Client {
+ out: tx,
+ name: name.clone(),
+ },
+ );
+ drop(g);
+ debug!("assigned id={id}, init connection");
+
tokio::task::spawn(async move {
while let Some(packet) = rx.recv().await {
+ debug!("{id} -> {packet:?}");
user_ws_tx
.send(Message::text(serde_json::to_string(&packet).unwrap()))
.unwrap_or_else(|e| {
@@ -33,20 +52,33 @@ impl Room {
}
});
- let mut g = self.clients.write().await;
- let id = g.len();
- let name = format!("user no. {id}");
- g.insert(
+ self.send_to_client(
id,
- Client {
- out: tx,
- name: name.clone(),
+ ClientboundPacket::Init {
+ your_id: id,
+ version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")),
},
- );
- drop(g);
+ )
+ .await;
- self.broadcast(id, ClientboundPacket::ClientJoin { id, name })
+ // send join of this client to all clients
+ self.broadcast(None, ClientboundPacket::ClientJoin { id, name })
.await;
+ // send join of all other clients to this one
+ for (&cid, c) in self.clients.read().await.iter() {
+ // skip self
+ if cid != id {
+ self.send_to_client(
+ id,
+ ClientboundPacket::ClientJoin {
+ id: cid,
+ name: c.name.clone(),
+ },
+ )
+ .await;
+ }
+ }
+ debug!("client should be ready!");
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
@@ -64,16 +96,18 @@ impl Room {
break;
}
};
+ debug!("{id} <- {p:?}");
self.client_message(id, p).await;
- };
+ }
}
-
self.clients.write().await.remove(&id);
+ self.broadcast(Some(id), ClientboundPacket::ClientLeave { id })
+ .await;
}
- pub async fn broadcast(&self, sender: usize, packet: ClientboundPacket) {
+ pub async fn broadcast(&self, sender: Option<usize>, packet: ClientboundPacket) {
for (&id, tx) in self.clients.read().await.iter() {
- if sender != id {
+ if sender != Some(id) {
let _ = tx.out.send(packet.clone());
}
}