diff options
-rw-r--r-- | client-web/source/local_user.ts | 1 | ||||
-rw-r--r-- | client-web/source/rnnoise.ts | 2 | ||||
-rw-r--r-- | client-web/source/room.ts | 38 | ||||
-rw-r--r-- | readme.md | 20 | ||||
-rw-r--r-- | server/Cargo.lock | 120 | ||||
-rw-r--r-- | server/Cargo.toml | 1 | ||||
-rw-r--r-- | server/src/main.rs | 19 | ||||
-rw-r--r-- | server/src/room.rs | 66 |
8 files changed, 113 insertions, 154 deletions
diff --git a/client-web/source/local_user.ts b/client-web/source/local_user.ts index bfd9459..b3d5692 100644 --- a/client-web/source/local_user.ts +++ b/client-web/source/local_user.ts @@ -8,7 +8,6 @@ import { Room } from "./room.ts"; import { TrackHandle } from "./track_handle.ts"; import { User } from "./user.ts"; - export class LocalUser extends User { mic_gain?: GainNode default_gain: number = parameter_number("mic_gain", 1) diff --git a/client-web/source/rnnoise.ts b/client-web/source/rnnoise.ts index 7867682..d6efb3b 100644 --- a/client-web/source/rnnoise.ts +++ b/client-web/source/rnnoise.ts @@ -23,7 +23,7 @@ export async function get_rnnoise_node(context: AudioContext): Promise<RNNoiseNo if (!RNNoiseNode) { log("rnnoise", "loading wasm...") script = document.createElement("script") - script.src = "/_rnnoise/rnnoise-runtime.js" + script.src = "/_assets/rnnoise/rnnoise-runtime.js" script.defer = true document.head.appendChild(script) //@ts-ignore asdfsfad diff --git a/client-web/source/room.ts b/client-web/source/room.ts index af85044..c22a956 100644 --- a/client-web/source/room.ts +++ b/client-web/source/room.ts @@ -13,6 +13,7 @@ export class Room { users: Map<number, User> = new Map() remote_users: Map<number, RemoteUser> = new Map() local_user!: LocalUser + my_id!: number websocket: WebSocket constructor(name: string) { @@ -34,17 +35,21 @@ export class Room { websocket_message(packet: ClientboundPacket) { log("ws", `<- ${packet.message?.sender ?? "control packet"}: `, packet); if (packet.init) { - this.local_user = new LocalUser(this, packet.init.your_id, "..."); - } - if (packet.client_join) { + this.my_id = packet.init.your_id + // no need to check compat for now because this is hosted in the same place + log("*", `server: ${packet.init.version}`) + } else if (packet.client_join) { const p = packet.client_join log("*", `${this.name} ${p.id} joined`); - const ru = new RemoteUser(this, p.id, p.name) - this.local_user.add_initial_to_remote(ru) - ru.offer() - this.users.set(p.id, ru) - this.remote_users.set(p.id, ru) - return + if (p.id == this.my_id) { + this.local_user = new LocalUser(this, p.id, p.name); + } else { + const ru = new RemoteUser(this, p.id, p.name) + this.local_user.add_initial_to_remote(ru) + ru.offer() + this.users.set(p.id, ru) + this.remote_users.set(p.id, ru) + } } else if (packet.client_leave) { const p = packet.client_leave; log("*", `${this.name} ${p.id} left`); @@ -52,13 +57,16 @@ export class Room { this.users.delete(p.id) this.remote_users.delete(p.id) return - } - if (packet.message) { + } else if (packet.message) { const p = packet.message; - const sender = this.remote_users.get(p.sender)! - if (p.message.ice_candidate) sender.add_ice_candidate(p.message.ice_candidate) - if (p.message.offer) sender.on_offer(p.message.offer) - if (p.message.answer) sender.on_answer(p.message.answer) + const sender = this.users.get(p.sender) + if (sender instanceof RemoteUser) { + if (p.message.ice_candidate) sender.add_ice_candidate(p.message.ice_candidate) + if (p.message.offer) sender.on_offer(p.message.offer) + if (p.message.answer) sender.on_answer(p.message.answer) + } else { + console.log("!", p, sender); + } } } websocket_close() { @@ -34,6 +34,26 @@ Booleans can be either `1`, `true`, `yes` or their opposites. | `screen_enabled` | boolean | false | Adds screen track on startup (wont work) | | `mic_gain` | number | 1 | Sets the microphone volume | +## Protocol + +The protocol packets are defined in [packets.d.ts](./common/packets.d.ts). Here is an (simplified) example of how the protocol is used. + +``` +S->C { init: { your_id: 5, version: "..." } } +---- # Your join packet will be the first one. +S->C { client_join: { id: 5, name: "bob" } } +S->C { client_join: { id: 3, name: "alice" } } +---- # Now publish your ICE candidates +C->S { relay: { message: { ice_candiate: <RTCIceCandidateInit> } } } +---- # Whenever you change your streams change: +---- # Send an offer to everybody +C->S { relay: { recipient: 3, offer: <RTCSessionDescriptionInit> } } +---- # Alice answers +S->C { message: { sender: 3, message: { offer: <RTCSessionDescriptionInit> } } } +---- # In case the server uses a reverse-proxy that disconnects inactive connections: Ping every 30s +C->S { ping: null } +``` + ## Licence See `LICENCE` file. diff --git a/server/Cargo.lock b/server/Cargo.lock index b1581bd..208e7f8 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -87,16 +87,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "chashmap" -version = "2.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45" -dependencies = [ - "owning_ref", - "parking_lot 0.4.8", -] - -[[package]] name = "cpufeatures" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -173,12 +163,6 @@ dependencies = [ ] [[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - -[[package]] name = "futures-channel" version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -455,12 +439,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" [[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - -[[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -506,7 +484,7 @@ dependencies = [ "mime", "mime_guess", "quick-error", - "rand 0.8.5", + "rand", "safemem", "tempfile", "twoway", @@ -535,44 +513,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] -name = "owning_ref" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" -dependencies = [ - "stable_deref_trait", -] - -[[package]] -name = "parking_lot" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" -dependencies = [ - "owning_ref", - "parking_lot_core 0.2.14", -] - -[[package]] name = "parking_lot" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.3", -] - -[[package]] -name = "parking_lot_core" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa" -dependencies = [ - "libc", - "rand 0.4.6", - "smallvec 0.6.14", - "winapi", + "parking_lot_core", ] [[package]] @@ -584,7 +531,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "smallvec 1.9.0", + "smallvec", "windows-sys", ] @@ -658,26 +605,13 @@ dependencies = [ [[package]] name = "rand" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" -dependencies = [ - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", - "rdrand", - "winapi", -] - -[[package]] -name = "rand" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core 0.6.3", + "rand_core", ] [[package]] @@ -687,26 +621,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.3", + "rand_core", ] [[package]] name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - -[[package]] -name = "rand_core" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" @@ -715,15 +634,6 @@ dependencies = [ ] [[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", -] - -[[package]] name = "redox_syscall" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -829,7 +739,6 @@ dependencies = [ name = "server" version = "0.1.0" dependencies = [ - "chashmap", "env_logger", "futures-util", "hyper", @@ -885,15 +794,6 @@ dependencies = [ [[package]] name = "smallvec" -version = "0.6.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" -dependencies = [ - "maybe-uninit", -] - -[[package]] -name = "smallvec" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" @@ -909,12 +809,6 @@ dependencies = [ ] [[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - -[[package]] name = "syn" version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -996,7 +890,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1112,7 +1006,7 @@ dependencies = [ "http", "httparse", "log", - "rand 0.8.5", + "rand", "sha-1", "thiserror", "url", diff --git a/server/Cargo.toml b/server/Cargo.toml index 06e52fb..7d32688 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,4 +13,3 @@ listenfd = "1.0.0" hyper = "0.14.20" serde = { version = "1.0.144", features = ["derive"] } serde_json = "*" -chashmap = "2.2.2" diff --git a/server/src/main.rs b/server/src/main.rs index 6a8f11d..268e2a4 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,18 +1,19 @@ pub mod protocol; pub mod room; -use chashmap::CHashMap; use hyper::StatusCode; use listenfd::ListenFd; use log::error; use room::Room; +use std::collections::HashMap; use std::convert::Infallible; use std::sync::Arc; +use tokio::sync::RwLock; use warp::hyper::Server; use warp::ws::WebSocket; use warp::{Filter, Rejection, Reply}; -type Rooms = Arc<CHashMap<String, Arc<Room>>>; +type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>; fn main() { tokio::runtime::Builder::new_multi_thread() @@ -82,16 +83,20 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> { fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply { async fn inner(sock: WebSocket, rname: String, rooms: Rooms) { - let room = match rooms.get(&rname) { - Some(r) => r, + let guard = rooms.read().await; + let room = match guard.get(&rname) { + Some(r) => r.to_owned(), None => { - rooms.insert(rname.to_owned(), Default::default()); - rooms.get(&rname).unwrap() // TODO never expect this to always work!! + drop(guard); // make sure read-lock is dropped to avoid deadlock + let mut guard = rooms.write().await; + guard.insert(rname.to_owned(), Default::default()); + guard.get(&rname).unwrap().to_owned() // TODO never expect this to always work!! } }; + room.client_connect(sock).await; if room.should_remove().await { - rooms.remove(&rname); + rooms.write().await.remove(&rname); } } ws.on_upgrade(move |sock| inner(sock, rname, rooms)) diff --git a/server/src/room.rs b/server/src/room.rs index a47d2e5..db545a6 100644 --- a/server/src/room.rs +++ b/server/src/room.rs @@ -1,7 +1,7 @@ use crate::protocol::{ClientboundPacket, ServerboundPacket}; use futures_util::{SinkExt, StreamExt, TryFutureExt}; -use log::error; -use std::collections::HashMap; +use log::{debug, error}; +use std::{collections::HashMap, sync::atomic::AtomicUsize}; use tokio::sync::{mpsc, RwLock}; use warp::ws::{Message, WebSocket}; @@ -13,17 +13,36 @@ pub struct Client { #[derive(Debug, Default)] pub struct Room { + pub id_counter: AtomicUsize, pub clients: RwLock<HashMap<usize, Client>>, } impl Room { pub async fn client_connect(&self, ws: WebSocket) { + debug!("new client connected"); let (mut user_ws_tx, mut user_ws_rx) = ws.split(); let (tx, mut rx) = mpsc::unbounded_channel(); + let mut g = self.clients.write().await; + // ensure write guard to client exists when using id_counter + let id = self + .id_counter + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let name = format!("user no. {id}"); + g.insert( + id, + Client { + out: tx, + name: name.clone(), + }, + ); + drop(g); + debug!("assigned id={id}, init connection"); + tokio::task::spawn(async move { while let Some(packet) = rx.recv().await { + debug!("{id} -> {packet:?}"); user_ws_tx .send(Message::text(serde_json::to_string(&packet).unwrap())) .unwrap_or_else(|e| { @@ -33,20 +52,33 @@ impl Room { } }); - let mut g = self.clients.write().await; - let id = g.len(); - let name = format!("user no. {id}"); - g.insert( + self.send_to_client( id, - Client { - out: tx, - name: name.clone(), + ClientboundPacket::Init { + your_id: id, + version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")), }, - ); - drop(g); + ) + .await; - self.broadcast(id, ClientboundPacket::ClientJoin { id, name }) + // send join of this client to all clients + self.broadcast(None, ClientboundPacket::ClientJoin { id, name }) .await; + // send join of all other clients to this one + for (&cid, c) in self.clients.read().await.iter() { + // skip self + if cid != id { + self.send_to_client( + id, + ClientboundPacket::ClientJoin { + id: cid, + name: c.name.clone(), + }, + ) + .await; + } + } + debug!("client should be ready!"); while let Some(result) = user_ws_rx.next().await { let msg = match result { @@ -64,16 +96,18 @@ impl Room { break; } }; + debug!("{id} <- {p:?}"); self.client_message(id, p).await; - }; + } } - self.clients.write().await.remove(&id); + self.broadcast(Some(id), ClientboundPacket::ClientLeave { id }) + .await; } - pub async fn broadcast(&self, sender: usize, packet: ClientboundPacket) { + pub async fn broadcast(&self, sender: Option<usize>, packet: ClientboundPacket) { for (&id, tx) in self.clients.read().await.iter() { - if sender != id { + if sender != Some(id) { let _ = tx.out.send(packet.clone()); } } |