diff options
author | metamuffin <metamuffin@disroot.org> | 2023-09-07 19:17:49 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-09-07 19:17:49 +0200 |
commit | 2d0761b8932f11b01e241e2db3a8f08250efe878 (patch) | |
tree | 29814137e2960286d36f3ae4f7c20299eb6d22b0 | |
parent | 6f644481f397af070e2b91b69846e375caafdbda (diff) | |
download | keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.bz2 keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.zst |
new protocol
-rw-r--r-- | client-web/source/index.ts | 2 | ||||
-rw-r--r-- | client-web/source/protocol/mod.ts | 7 | ||||
-rw-r--r-- | common/packets.d.ts | 1 | ||||
-rw-r--r-- | server/src/config.rs | 2 | ||||
-rw-r--r-- | server/src/idgen.rs | 21 | ||||
-rw-r--r-- | server/src/logic.rs | 178 | ||||
-rw-r--r-- | server/src/main.rs | 52 | ||||
-rw-r--r-- | server/src/protocol.rs | 15 | ||||
-rw-r--r-- | server/src/room.rs | 127 |
9 files changed, 244 insertions, 161 deletions
diff --git a/client-web/source/index.ts b/client-web/source/index.ts index 8ae7e8f..639f0c9 100644 --- a/client-web/source/index.ts +++ b/client-web/source/index.ts @@ -14,7 +14,7 @@ import { SignalingConnection } from "./protocol/mod.ts"; import { Room } from "./room.ts" import { control_bar, info_br } from "./menu.ts"; -export const VERSION = "0.1.14" +export const VERSION = "0.2.0" export interface ClientConfig { appearance?: { diff --git a/client-web/source/protocol/mod.ts b/client-web/source/protocol/mod.ts index a7e1f63..83ee8cb 100644 --- a/client-web/source/protocol/mod.ts +++ b/client-web/source/protocol/mod.ts @@ -10,7 +10,7 @@ import { crypto_encrypt, crypto_seeded_key, crypt_decrypt, crypto_hash } from ". export class SignalingConnection { room!: string websocket!: WebSocket - signaling_id!: string + room_hash!: string key!: CryptoKey my_id?: number // needed for outgoing relay messages @@ -20,9 +20,9 @@ export class SignalingConnection { constructor() { } async connect(room: string): Promise<SignalingConnection> { this.key = await crypto_seeded_key(room) - this.signaling_id = await crypto_hash(room) + this.room_hash = await crypto_hash(room) log("ws", "connecting…") - const ws_url = new URL(`${window.location.protocol.endsWith("s:") ? "wss" : "ws"}://${window.location.host}/signaling/${encodeURIComponent(this.signaling_id)}`) + const ws_url = new URL(`${window.location.protocol.endsWith("s:") ? "wss" : "ws"}://${window.location.host}/signaling`) this.websocket = new WebSocket(ws_url) this.websocket.onerror = () => this.on_error() this.websocket.onclose = () => this.on_close() @@ -44,6 +44,7 @@ export class SignalingConnection { } on_open() { log("ws", "websocket opened"); + this.send_control({ join: { hash: this.room_hash } }) setInterval(() => this.send_control({ ping: null }), 30000) // stupid workaround for nginx disconnecting inactive connections } on_error() { diff --git a/common/packets.d.ts b/common/packets.d.ts index 8bec80a..433dd17 100644 --- a/common/packets.d.ts +++ b/common/packets.d.ts @@ -22,6 +22,7 @@ export interface ClientboundPacket { } export interface ServerboundPacket { + join?: { hash?: string } ping?: null relay?: { recipient?: number, message: string /* encrypted RelayMessageWrapper */ } watch_rooms?: string[] diff --git a/server/src/config.rs b/server/src/config.rs index 0ae90eb..5ef6c69 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -10,7 +10,7 @@ pub struct Config { #[rustfmt::skip] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FeaturesConfig { - #[serde(default)] pub watch_rooms: bool, + #[serde(default)] pub room_watches: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/server/src/idgen.rs b/server/src/idgen.rs new file mode 100644 index 0000000..0cdc983 --- /dev/null +++ b/server/src/idgen.rs @@ -0,0 +1,21 @@ +use tokio::sync::RwLock; + +pub struct IdGenerator { + x: RwLock<u64>, +} + +impl Default for IdGenerator { + fn default() -> Self { + Self { + x: Default::default(), + } + } +} +impl IdGenerator { + pub async fn generate(&self) -> u64 { + // TODO: dummy implementation; ideal would be encrypting the counter + let mut x = self.x.write().await; + *x += 1; + *x + } +} diff --git a/server/src/logic.rs b/server/src/logic.rs new file mode 100644 index 0000000..a69ca79 --- /dev/null +++ b/server/src/logic.rs @@ -0,0 +1,178 @@ +/* + This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin <metamuffin@disroot.org> +*/ +use crate::{ + idgen::IdGenerator, + protocol::{ClientboundPacket, ServerboundPacket}, +}; +use futures_util::{stream::SplitStream, StreamExt}; +use log::{debug, error, warn}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, LazyLock}, +}; +use tokio::sync::{mpsc::Sender, RwLock}; +use warp::ws::WebSocket; + +static CLIENTS: LazyLock<RwLock<HashMap<Client, Sender<ClientboundPacket>>>> = + LazyLock::new(|| Default::default()); + +#[repr(transparent)] +#[derive(Debug, Hash, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct Client(u64); + +#[derive(Default)] +pub struct State { + idgen: IdGenerator, + rooms: RwLock<HashMap<String, Arc<Room>>>, +} + +#[derive(Debug, Default)] +pub struct Room { + pub users: RwLock<HashSet<Client>>, +} + +#[derive(Debug, Default)] +pub struct ClientState { + current_room: Option<Arc<Room>>, +} + +impl State { + pub async fn connect(&self, rx: SplitStream<WebSocket>, tx: Sender<ClientboundPacket>) { + debug!("new client connected"); + let client = Client(self.idgen.generate().await); + CLIENTS.write().await.insert(client, tx); + self.connect_inner(client, rx).await; + CLIENTS.write().await.remove(&client); + } + async fn connect_inner(&self, client: Client, mut rx: SplitStream<WebSocket>) { + let mut cstate = ClientState::default(); + client + .send(ClientboundPacket::Init { + your_id: client, + version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")), + }) + .await; + + while let Some(result) = rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + error!("websocket error: {e}"); + break; + } + }; + if let Ok(s) = msg.to_str() { + let packet = match serde_json::from_str::<ServerboundPacket>(s) { + Ok(p) => p, + Err(e) => { + error!("client sent invalid packet: {e:?}"); + break; + } + }; + debug!("<- {packet:?}"); + self.on_recv(client, &mut cstate, packet).await; + } + } + + if let Some(room) = cstate.current_room { + room.leave(client).await; + // TODO dont leak room + } + } + + async fn on_recv(&self, client: Client, cstate: &mut ClientState, packet: ServerboundPacket) { + match packet { + ServerboundPacket::Ping => (), + ServerboundPacket::Join { hash } => { + if let Some(room) = &cstate.current_room { + room.leave(client).await; + // TODO dont leak room + // if room.should_remove().await { + // self.rooms.write().await.remove(üw); + // } + } + if let Some(hash) = hash { + let room = self.rooms.write().await.entry(hash).or_default().clone(); + room.join(client).await; + cstate.current_room = Some(room.clone()) + } else { + cstate.current_room = None + } + } + ServerboundPacket::Relay { recipient, message } => { + if let Some(room) = &cstate.current_room { + let packet = ClientboundPacket::Message { + sender: client, + message, + }; + if let Some(recipient) = recipient { + room.send_to_client(recipient, packet).await; + } else { + room.broadcast(Some(client), packet).await; + } + } + } + ServerboundPacket::WatchRooms(_) => todo!(), + } + } +} + +impl Client { + pub async fn send(&self, packet: ClientboundPacket) { + if let Some(s) = CLIENTS.read().await.get(&self) { + s.send(packet).await.unwrap(); + } else { + warn!("invalid recipient {self:?}") + } + } +} + +impl Room { + pub async fn join(&self, client: Client) { + debug!("client join {client:?}"); + self.users.write().await.insert(client); + + // send join of this client to all clients + self.broadcast(Some(client), ClientboundPacket::ClientJoin { id: client }) + .await; + // send join of all other clients to this one + for rc in self.users.read().await.iter() { + self.send_to_client(client, ClientboundPacket::ClientJoin { id: *rc }) + .await; + } + } + + pub async fn leave(&self, client: Client) { + debug!("client leave {client:?}"); + for c in self.users.read().await.iter() { + if *c != client { + self.send_to_client(*c, ClientboundPacket::ClientLeave { id: client }) + .await; + } + } + self.users.write().await.remove(&client); + self.broadcast(Some(client), ClientboundPacket::ClientLeave { id: client }) + .await; + } + + pub async fn broadcast(&self, sender: Option<Client>, packet: ClientboundPacket) { + for c in self.users.read().await.iter() { + if sender != Some(*c) { + c.send(packet.clone()).await; + } + } + } + pub async fn send_to_client(&self, recipient: Client, packet: ClientboundPacket) { + if let Some(c) = self.users.read().await.get(&recipient) { + c.send(packet).await; + } + } + + pub async fn should_remove(&self) -> bool { + self.users.read().await.len() == 0 + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 3d0af50..9ea0f94 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,26 +6,27 @@ #![feature(lazy_cell)] pub mod assets; pub mod config; +pub mod logic; pub mod protocol; -pub mod room; +pub mod idgen; use assets::css; use config::{AppearanceConfig, Config}; +use futures_util::{SinkExt, StreamExt, TryFutureExt}; use hyper::{header, StatusCode}; use listenfd::ListenFd; use log::{debug, error}; -use room::Room; -use std::collections::HashMap; +use logic::State; use std::convert::Infallible; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::mpsc; use warp::hyper::Server; use warp::ws::WebSocket; -use warp::{reply, Filter, Rejection, Reply}; +use warp::{reply, ws::Message, Filter, Rejection, Reply}; -type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>; +use crate::protocol::ClientboundPacket; fn main() { tokio::runtime::Builder::new_multi_thread() @@ -43,11 +44,11 @@ async fn run() { let client_config_json = serde_json::to_string(&config).unwrap(); let client_config_css = css_overrides(&config.appearance); - let rooms: _ = Rooms::default(); - let rooms: _ = warp::any().map(move || rooms.clone()); + let state: _ = Arc::new(State::default()); + let state: _ = warp::any().map(move || state.clone()); - let signaling: _ = warp::path!("signaling" / String) - .and(rooms) + let signaling: _ = warp::path!("signaling") + .and(state) .and(warp::ws()) .map(signaling_connect); @@ -135,22 +136,25 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> { Ok(warp::reply::with_status(json, code)) } -fn signaling_connect(rsecret: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply { - async fn inner(sock: WebSocket, rsecret: String, rooms: Rooms) { +fn signaling_connect(state: Arc<State>, ws: warp::ws::Ws) -> impl Reply { + async fn inner(sock: WebSocket, state: Arc<State>) { debug!("ws upgrade"); - let mut guard = rooms.write().await; - let room = guard - .entry(rsecret.clone()) - .or_insert_with(|| Default::default()) - .to_owned(); - drop(guard); - - room.client_connect(sock).await; - if room.should_remove().await { - rooms.write().await.remove(&rsecret); - } + let (mut user_ws_tx, user_ws_rx) = sock.split(); + let (tx, mut rx) = mpsc::channel::<ClientboundPacket>(64); + tokio::task::spawn(async move { + while let Some(packet) = rx.recv().await { + debug!(" -> {packet:?}"); + user_ws_tx + .send(Message::text(serde_json::to_string(&packet).unwrap())) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + state.connect(user_ws_rx, tx).await; } - ws.on_upgrade(move |sock| inner(sock, rsecret, rooms)) + ws.on_upgrade(move |sock| inner(sock, state)) } fn css_overrides( diff --git a/server/src/protocol.rs b/server/src/protocol.rs index a27e339..85ab03a 100644 --- a/server/src/protocol.rs +++ b/server/src/protocol.rs @@ -5,22 +5,27 @@ */ use serde::{Deserialize, Serialize}; +use crate::logic::Client; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ClientboundPacket { - Init { your_id: usize, version: String }, - ClientJoin { id: usize }, - ClientLeave { id: usize }, - Message { sender: usize, message: String }, + Init { your_id: Client, version: String }, + ClientJoin { id: Client }, + ClientLeave { id: Client }, + Message { sender: Client, message: String }, RoomInfo { hash: String, user_count: usize }, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ServerboundPacket { + Join { + hash: Option<String>, + }, Ping, Relay { - recipient: Option<usize>, + recipient: Option<Client>, message: String, }, WatchRooms(Vec<String>), diff --git a/server/src/room.rs b/server/src/room.rs deleted file mode 100644 index abc1a52..0000000 --- a/server/src/room.rs +++ /dev/null @@ -1,127 +0,0 @@ -/* - This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2023 metamuffin <metamuffin@disroot.org> -*/ -use crate::protocol::{ClientboundPacket, ServerboundPacket}; -use futures_util::{SinkExt, StreamExt, TryFutureExt}; -use log::{debug, error}; -use std::{collections::HashMap, sync::atomic::AtomicUsize}; -use tokio::sync::{mpsc, RwLock}; -use warp::ws::{Message, WebSocket}; - -#[derive(Debug, Default)] -pub struct Room { - pub id_counter: AtomicUsize, - pub clients: RwLock<HashMap<usize, mpsc::UnboundedSender<ClientboundPacket>>>, -} - -impl Room { - pub async fn client_connect(&self, ws: WebSocket) { - debug!("new client connected"); - let (mut user_ws_tx, mut user_ws_rx) = ws.split(); - - let (tx, mut rx) = mpsc::unbounded_channel(); - - let mut g = self.clients.write().await; - // ensure write guard to client exists when using id_counter - let id = self - .id_counter - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - g.insert(id, tx); - drop(g); - debug!("assigned id={id}, init connection"); - - tokio::task::spawn(async move { - while let Some(packet) = rx.recv().await { - debug!("{id} -> {packet:?}"); - user_ws_tx - .send(Message::text(serde_json::to_string(&packet).unwrap())) - .unwrap_or_else(|e| { - eprintln!("websocket send error: {}", e); - }) - .await; - } - }); - - self.send_to_client( - id, - ClientboundPacket::Init { - your_id: id, - version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")), - }, - ) - .await; - - // send join of this client to all clients - self.broadcast(None, ClientboundPacket::ClientJoin { id }) - .await; - // send join of all other clients to this one - for (&cid, _) in self.clients.read().await.iter() { - // skip self - if cid != id { - self.send_to_client(id, ClientboundPacket::ClientJoin { id: cid }) - .await; - } - } - debug!("client should be ready!"); - - while let Some(result) = user_ws_rx.next().await { - let msg = match result { - Ok(msg) => msg, - Err(e) => { - error!("websocket error(id={id}): {e}"); - break; - } - }; - if let Ok(s) = msg.to_str() { - let p = match serde_json::from_str::<ServerboundPacket>(s) { - Ok(p) => p, - Err(e) => { - error!("client(id={id}) sent invalid packet: {e:?}"); - break; - } - }; - debug!("{id} <- {p:?}"); - self.client_message(id, p).await; - } - } - self.clients.write().await.remove(&id); - self.broadcast(Some(id), ClientboundPacket::ClientLeave { id }) - .await; - } - - pub async fn broadcast(&self, sender: Option<usize>, packet: ClientboundPacket) { - for (&id, tx) in self.clients.read().await.iter() { - if sender != Some(id) { - let _ = tx.send(packet.clone()); - } - } - } - pub async fn send_to_client(&self, recipient: usize, packet: ClientboundPacket) { - if let Some(c) = self.clients.read().await.get(&recipient) { - let _ = c.send(packet); - } - } - - pub async fn client_message(&self, sender: usize, packet: ServerboundPacket) { - match packet { - ServerboundPacket::Ping => (), - ServerboundPacket::Relay { recipient, message } => { - let packet = ClientboundPacket::Message { sender, message }; - // Add some delay for testing scenarios with latency. - // tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - if let Some(recipient) = recipient { - self.send_to_client(recipient, packet).await; - } else { - self.broadcast(Some(sender), packet).await - } - } - ServerboundPacket::WatchRooms(list) => {} - } - } - - pub async fn should_remove(&self) -> bool { - self.clients.read().await.len() == 0 - } -} |