summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-09-07 19:17:49 +0200
committermetamuffin <metamuffin@disroot.org>2023-09-07 19:17:49 +0200
commit2d0761b8932f11b01e241e2db3a8f08250efe878 (patch)
tree29814137e2960286d36f3ae4f7c20299eb6d22b0
parent6f644481f397af070e2b91b69846e375caafdbda (diff)
downloadkeks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar
keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.bz2
keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.zst
new protocol
-rw-r--r--client-web/source/index.ts2
-rw-r--r--client-web/source/protocol/mod.ts7
-rw-r--r--common/packets.d.ts1
-rw-r--r--server/src/config.rs2
-rw-r--r--server/src/idgen.rs21
-rw-r--r--server/src/logic.rs178
-rw-r--r--server/src/main.rs52
-rw-r--r--server/src/protocol.rs15
-rw-r--r--server/src/room.rs127
9 files changed, 244 insertions, 161 deletions
diff --git a/client-web/source/index.ts b/client-web/source/index.ts
index 8ae7e8f..639f0c9 100644
--- a/client-web/source/index.ts
+++ b/client-web/source/index.ts
@@ -14,7 +14,7 @@ import { SignalingConnection } from "./protocol/mod.ts";
import { Room } from "./room.ts"
import { control_bar, info_br } from "./menu.ts";
-export const VERSION = "0.1.14"
+export const VERSION = "0.2.0"
export interface ClientConfig {
appearance?: {
diff --git a/client-web/source/protocol/mod.ts b/client-web/source/protocol/mod.ts
index a7e1f63..83ee8cb 100644
--- a/client-web/source/protocol/mod.ts
+++ b/client-web/source/protocol/mod.ts
@@ -10,7 +10,7 @@ import { crypto_encrypt, crypto_seeded_key, crypt_decrypt, crypto_hash } from ".
export class SignalingConnection {
room!: string
websocket!: WebSocket
- signaling_id!: string
+ room_hash!: string
key!: CryptoKey
my_id?: number // needed for outgoing relay messages
@@ -20,9 +20,9 @@ export class SignalingConnection {
constructor() { }
async connect(room: string): Promise<SignalingConnection> {
this.key = await crypto_seeded_key(room)
- this.signaling_id = await crypto_hash(room)
+ this.room_hash = await crypto_hash(room)
log("ws", "connecting…")
- const ws_url = new URL(`${window.location.protocol.endsWith("s:") ? "wss" : "ws"}://${window.location.host}/signaling/${encodeURIComponent(this.signaling_id)}`)
+ const ws_url = new URL(`${window.location.protocol.endsWith("s:") ? "wss" : "ws"}://${window.location.host}/signaling`)
this.websocket = new WebSocket(ws_url)
this.websocket.onerror = () => this.on_error()
this.websocket.onclose = () => this.on_close()
@@ -44,6 +44,7 @@ export class SignalingConnection {
}
on_open() {
log("ws", "websocket opened");
+ this.send_control({ join: { hash: this.room_hash } })
setInterval(() => this.send_control({ ping: null }), 30000) // stupid workaround for nginx disconnecting inactive connections
}
on_error() {
diff --git a/common/packets.d.ts b/common/packets.d.ts
index 8bec80a..433dd17 100644
--- a/common/packets.d.ts
+++ b/common/packets.d.ts
@@ -22,6 +22,7 @@ export interface ClientboundPacket {
}
export interface ServerboundPacket {
+ join?: { hash?: string }
ping?: null
relay?: { recipient?: number, message: string /* encrypted RelayMessageWrapper */ }
watch_rooms?: string[]
diff --git a/server/src/config.rs b/server/src/config.rs
index 0ae90eb..5ef6c69 100644
--- a/server/src/config.rs
+++ b/server/src/config.rs
@@ -10,7 +10,7 @@ pub struct Config {
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeaturesConfig {
- #[serde(default)] pub watch_rooms: bool,
+ #[serde(default)] pub room_watches: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/server/src/idgen.rs b/server/src/idgen.rs
new file mode 100644
index 0000000..0cdc983
--- /dev/null
+++ b/server/src/idgen.rs
@@ -0,0 +1,21 @@
+use tokio::sync::RwLock;
+
+pub struct IdGenerator {
+ x: RwLock<u64>,
+}
+
+impl Default for IdGenerator {
+ fn default() -> Self {
+ Self {
+ x: Default::default(),
+ }
+ }
+}
+impl IdGenerator {
+ pub async fn generate(&self) -> u64 {
+ // TODO: dummy implementation; ideal would be encrypting the counter
+ let mut x = self.x.write().await;
+ *x += 1;
+ *x
+ }
+}
diff --git a/server/src/logic.rs b/server/src/logic.rs
new file mode 100644
index 0000000..a69ca79
--- /dev/null
+++ b/server/src/logic.rs
@@ -0,0 +1,178 @@
+/*
+ This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2023 metamuffin <metamuffin@disroot.org>
+*/
+use crate::{
+ idgen::IdGenerator,
+ protocol::{ClientboundPacket, ServerboundPacket},
+};
+use futures_util::{stream::SplitStream, StreamExt};
+use log::{debug, error, warn};
+use serde::{Deserialize, Serialize};
+use std::{
+ collections::{HashMap, HashSet},
+ sync::{Arc, LazyLock},
+};
+use tokio::sync::{mpsc::Sender, RwLock};
+use warp::ws::WebSocket;
+
+static CLIENTS: LazyLock<RwLock<HashMap<Client, Sender<ClientboundPacket>>>> =
+ LazyLock::new(|| Default::default());
+
+#[repr(transparent)]
+#[derive(Debug, Hash, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Client(u64);
+
+#[derive(Default)]
+pub struct State {
+ idgen: IdGenerator,
+ rooms: RwLock<HashMap<String, Arc<Room>>>,
+}
+
+#[derive(Debug, Default)]
+pub struct Room {
+ pub users: RwLock<HashSet<Client>>,
+}
+
+#[derive(Debug, Default)]
+pub struct ClientState {
+ current_room: Option<Arc<Room>>,
+}
+
+impl State {
+ pub async fn connect(&self, rx: SplitStream<WebSocket>, tx: Sender<ClientboundPacket>) {
+ debug!("new client connected");
+ let client = Client(self.idgen.generate().await);
+ CLIENTS.write().await.insert(client, tx);
+ self.connect_inner(client, rx).await;
+ CLIENTS.write().await.remove(&client);
+ }
+ async fn connect_inner(&self, client: Client, mut rx: SplitStream<WebSocket>) {
+ let mut cstate = ClientState::default();
+ client
+ .send(ClientboundPacket::Init {
+ your_id: client,
+ version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")),
+ })
+ .await;
+
+ while let Some(result) = rx.next().await {
+ let msg = match result {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("websocket error: {e}");
+ break;
+ }
+ };
+ if let Ok(s) = msg.to_str() {
+ let packet = match serde_json::from_str::<ServerboundPacket>(s) {
+ Ok(p) => p,
+ Err(e) => {
+ error!("client sent invalid packet: {e:?}");
+ break;
+ }
+ };
+ debug!("<- {packet:?}");
+ self.on_recv(client, &mut cstate, packet).await;
+ }
+ }
+
+ if let Some(room) = cstate.current_room {
+ room.leave(client).await;
+ // TODO dont leak room
+ }
+ }
+
+ async fn on_recv(&self, client: Client, cstate: &mut ClientState, packet: ServerboundPacket) {
+ match packet {
+ ServerboundPacket::Ping => (),
+ ServerboundPacket::Join { hash } => {
+ if let Some(room) = &cstate.current_room {
+ room.leave(client).await;
+ // TODO dont leak room
+ // if room.should_remove().await {
+ // self.rooms.write().await.remove(üw);
+ // }
+ }
+ if let Some(hash) = hash {
+ let room = self.rooms.write().await.entry(hash).or_default().clone();
+ room.join(client).await;
+ cstate.current_room = Some(room.clone())
+ } else {
+ cstate.current_room = None
+ }
+ }
+ ServerboundPacket::Relay { recipient, message } => {
+ if let Some(room) = &cstate.current_room {
+ let packet = ClientboundPacket::Message {
+ sender: client,
+ message,
+ };
+ if let Some(recipient) = recipient {
+ room.send_to_client(recipient, packet).await;
+ } else {
+ room.broadcast(Some(client), packet).await;
+ }
+ }
+ }
+ ServerboundPacket::WatchRooms(_) => todo!(),
+ }
+ }
+}
+
+impl Client {
+ pub async fn send(&self, packet: ClientboundPacket) {
+ if let Some(s) = CLIENTS.read().await.get(&self) {
+ s.send(packet).await.unwrap();
+ } else {
+ warn!("invalid recipient {self:?}")
+ }
+ }
+}
+
+impl Room {
+ pub async fn join(&self, client: Client) {
+ debug!("client join {client:?}");
+ self.users.write().await.insert(client);
+
+ // send join of this client to all clients
+ self.broadcast(Some(client), ClientboundPacket::ClientJoin { id: client })
+ .await;
+ // send join of all other clients to this one
+ for rc in self.users.read().await.iter() {
+ self.send_to_client(client, ClientboundPacket::ClientJoin { id: *rc })
+ .await;
+ }
+ }
+
+ pub async fn leave(&self, client: Client) {
+ debug!("client leave {client:?}");
+ for c in self.users.read().await.iter() {
+ if *c != client {
+ self.send_to_client(*c, ClientboundPacket::ClientLeave { id: client })
+ .await;
+ }
+ }
+ self.users.write().await.remove(&client);
+ self.broadcast(Some(client), ClientboundPacket::ClientLeave { id: client })
+ .await;
+ }
+
+ pub async fn broadcast(&self, sender: Option<Client>, packet: ClientboundPacket) {
+ for c in self.users.read().await.iter() {
+ if sender != Some(*c) {
+ c.send(packet.clone()).await;
+ }
+ }
+ }
+ pub async fn send_to_client(&self, recipient: Client, packet: ClientboundPacket) {
+ if let Some(c) = self.users.read().await.get(&recipient) {
+ c.send(packet).await;
+ }
+ }
+
+ pub async fn should_remove(&self) -> bool {
+ self.users.read().await.len() == 0
+ }
+}
diff --git a/server/src/main.rs b/server/src/main.rs
index 3d0af50..9ea0f94 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -6,26 +6,27 @@
#![feature(lazy_cell)]
pub mod assets;
pub mod config;
+pub mod logic;
pub mod protocol;
-pub mod room;
+pub mod idgen;
use assets::css;
use config::{AppearanceConfig, Config};
+use futures_util::{SinkExt, StreamExt, TryFutureExt};
use hyper::{header, StatusCode};
use listenfd::ListenFd;
use log::{debug, error};
-use room::Room;
-use std::collections::HashMap;
+use logic::State;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
-use tokio::sync::RwLock;
+use tokio::sync::mpsc;
use warp::hyper::Server;
use warp::ws::WebSocket;
-use warp::{reply, Filter, Rejection, Reply};
+use warp::{reply, ws::Message, Filter, Rejection, Reply};
-type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>;
+use crate::protocol::ClientboundPacket;
fn main() {
tokio::runtime::Builder::new_multi_thread()
@@ -43,11 +44,11 @@ async fn run() {
let client_config_json = serde_json::to_string(&config).unwrap();
let client_config_css = css_overrides(&config.appearance);
- let rooms: _ = Rooms::default();
- let rooms: _ = warp::any().map(move || rooms.clone());
+ let state: _ = Arc::new(State::default());
+ let state: _ = warp::any().map(move || state.clone());
- let signaling: _ = warp::path!("signaling" / String)
- .and(rooms)
+ let signaling: _ = warp::path!("signaling")
+ .and(state)
.and(warp::ws())
.map(signaling_connect);
@@ -135,22 +136,25 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
Ok(warp::reply::with_status(json, code))
}
-fn signaling_connect(rsecret: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply {
- async fn inner(sock: WebSocket, rsecret: String, rooms: Rooms) {
+fn signaling_connect(state: Arc<State>, ws: warp::ws::Ws) -> impl Reply {
+ async fn inner(sock: WebSocket, state: Arc<State>) {
debug!("ws upgrade");
- let mut guard = rooms.write().await;
- let room = guard
- .entry(rsecret.clone())
- .or_insert_with(|| Default::default())
- .to_owned();
- drop(guard);
-
- room.client_connect(sock).await;
- if room.should_remove().await {
- rooms.write().await.remove(&rsecret);
- }
+ let (mut user_ws_tx, user_ws_rx) = sock.split();
+ let (tx, mut rx) = mpsc::channel::<ClientboundPacket>(64);
+ tokio::task::spawn(async move {
+ while let Some(packet) = rx.recv().await {
+ debug!(" -> {packet:?}");
+ user_ws_tx
+ .send(Message::text(serde_json::to_string(&packet).unwrap()))
+ .unwrap_or_else(|e| {
+ eprintln!("websocket send error: {}", e);
+ })
+ .await;
+ }
+ });
+ state.connect(user_ws_rx, tx).await;
}
- ws.on_upgrade(move |sock| inner(sock, rsecret, rooms))
+ ws.on_upgrade(move |sock| inner(sock, state))
}
fn css_overrides(
diff --git a/server/src/protocol.rs b/server/src/protocol.rs
index a27e339..85ab03a 100644
--- a/server/src/protocol.rs
+++ b/server/src/protocol.rs
@@ -5,22 +5,27 @@
*/
use serde::{Deserialize, Serialize};
+use crate::logic::Client;
+
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ClientboundPacket {
- Init { your_id: usize, version: String },
- ClientJoin { id: usize },
- ClientLeave { id: usize },
- Message { sender: usize, message: String },
+ Init { your_id: Client, version: String },
+ ClientJoin { id: Client },
+ ClientLeave { id: Client },
+ Message { sender: Client, message: String },
RoomInfo { hash: String, user_count: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ServerboundPacket {
+ Join {
+ hash: Option<String>,
+ },
Ping,
Relay {
- recipient: Option<usize>,
+ recipient: Option<Client>,
message: String,
},
WatchRooms(Vec<String>),
diff --git a/server/src/room.rs b/server/src/room.rs
deleted file mode 100644
index abc1a52..0000000
--- a/server/src/room.rs
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2023 metamuffin <metamuffin@disroot.org>
-*/
-use crate::protocol::{ClientboundPacket, ServerboundPacket};
-use futures_util::{SinkExt, StreamExt, TryFutureExt};
-use log::{debug, error};
-use std::{collections::HashMap, sync::atomic::AtomicUsize};
-use tokio::sync::{mpsc, RwLock};
-use warp::ws::{Message, WebSocket};
-
-#[derive(Debug, Default)]
-pub struct Room {
- pub id_counter: AtomicUsize,
- pub clients: RwLock<HashMap<usize, mpsc::UnboundedSender<ClientboundPacket>>>,
-}
-
-impl Room {
- pub async fn client_connect(&self, ws: WebSocket) {
- debug!("new client connected");
- let (mut user_ws_tx, mut user_ws_rx) = ws.split();
-
- let (tx, mut rx) = mpsc::unbounded_channel();
-
- let mut g = self.clients.write().await;
- // ensure write guard to client exists when using id_counter
- let id = self
- .id_counter
- .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- g.insert(id, tx);
- drop(g);
- debug!("assigned id={id}, init connection");
-
- tokio::task::spawn(async move {
- while let Some(packet) = rx.recv().await {
- debug!("{id} -> {packet:?}");
- user_ws_tx
- .send(Message::text(serde_json::to_string(&packet).unwrap()))
- .unwrap_or_else(|e| {
- eprintln!("websocket send error: {}", e);
- })
- .await;
- }
- });
-
- self.send_to_client(
- id,
- ClientboundPacket::Init {
- your_id: id,
- version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")),
- },
- )
- .await;
-
- // send join of this client to all clients
- self.broadcast(None, ClientboundPacket::ClientJoin { id })
- .await;
- // send join of all other clients to this one
- for (&cid, _) in self.clients.read().await.iter() {
- // skip self
- if cid != id {
- self.send_to_client(id, ClientboundPacket::ClientJoin { id: cid })
- .await;
- }
- }
- debug!("client should be ready!");
-
- while let Some(result) = user_ws_rx.next().await {
- let msg = match result {
- Ok(msg) => msg,
- Err(e) => {
- error!("websocket error(id={id}): {e}");
- break;
- }
- };
- if let Ok(s) = msg.to_str() {
- let p = match serde_json::from_str::<ServerboundPacket>(s) {
- Ok(p) => p,
- Err(e) => {
- error!("client(id={id}) sent invalid packet: {e:?}");
- break;
- }
- };
- debug!("{id} <- {p:?}");
- self.client_message(id, p).await;
- }
- }
- self.clients.write().await.remove(&id);
- self.broadcast(Some(id), ClientboundPacket::ClientLeave { id })
- .await;
- }
-
- pub async fn broadcast(&self, sender: Option<usize>, packet: ClientboundPacket) {
- for (&id, tx) in self.clients.read().await.iter() {
- if sender != Some(id) {
- let _ = tx.send(packet.clone());
- }
- }
- }
- pub async fn send_to_client(&self, recipient: usize, packet: ClientboundPacket) {
- if let Some(c) = self.clients.read().await.get(&recipient) {
- let _ = c.send(packet);
- }
- }
-
- pub async fn client_message(&self, sender: usize, packet: ServerboundPacket) {
- match packet {
- ServerboundPacket::Ping => (),
- ServerboundPacket::Relay { recipient, message } => {
- let packet = ClientboundPacket::Message { sender, message };
- // Add some delay for testing scenarios with latency.
- // tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
- if let Some(recipient) = recipient {
- self.send_to_client(recipient, packet).await;
- } else {
- self.broadcast(Some(sender), packet).await
- }
- }
- ServerboundPacket::WatchRooms(list) => {}
- }
- }
-
- pub async fn should_remove(&self) -> bool {
- self.clients.read().await.len() == 0
- }
-}