summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-08 14:51:54 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-08 14:51:54 +0200
commit2cac71d01b23c2566b797160fd70f254c58550f3 (patch)
tree524150959b6e7cc0edf361cfa4fb77a1857a35dc
parentcd255848196da0d8732d31049cc0f98388205a30 (diff)
downloadkeks-meet-2cac71d01b23c2566b797160fd70f254c58550f3.tar
keks-meet-2cac71d01b23c2566b797160fd70f254c58550f3.tar.bz2
keks-meet-2cac71d01b23c2566b797160fd70f254c58550f3.tar.zst
server rewrite works
-rw-r--r--client-web/source/local_user.ts1
-rw-r--r--client-web/source/rnnoise.ts2
-rw-r--r--client-web/source/room.ts38
-rw-r--r--readme.md20
-rw-r--r--server/Cargo.lock120
-rw-r--r--server/Cargo.toml1
-rw-r--r--server/src/main.rs19
-rw-r--r--server/src/room.rs66
8 files changed, 113 insertions, 154 deletions
diff --git a/client-web/source/local_user.ts b/client-web/source/local_user.ts
index bfd9459..b3d5692 100644
--- a/client-web/source/local_user.ts
+++ b/client-web/source/local_user.ts
@@ -8,7 +8,6 @@ import { Room } from "./room.ts";
import { TrackHandle } from "./track_handle.ts";
import { User } from "./user.ts";
-
export class LocalUser extends User {
mic_gain?: GainNode
default_gain: number = parameter_number("mic_gain", 1)
diff --git a/client-web/source/rnnoise.ts b/client-web/source/rnnoise.ts
index 7867682..d6efb3b 100644
--- a/client-web/source/rnnoise.ts
+++ b/client-web/source/rnnoise.ts
@@ -23,7 +23,7 @@ export async function get_rnnoise_node(context: AudioContext): Promise<RNNoiseNo
if (!RNNoiseNode) {
log("rnnoise", "loading wasm...")
script = document.createElement("script")
- script.src = "/_rnnoise/rnnoise-runtime.js"
+ script.src = "/_assets/rnnoise/rnnoise-runtime.js"
script.defer = true
document.head.appendChild(script)
//@ts-ignore asdfsfad
diff --git a/client-web/source/room.ts b/client-web/source/room.ts
index af85044..c22a956 100644
--- a/client-web/source/room.ts
+++ b/client-web/source/room.ts
@@ -13,6 +13,7 @@ export class Room {
users: Map<number, User> = new Map()
remote_users: Map<number, RemoteUser> = new Map()
local_user!: LocalUser
+ my_id!: number
websocket: WebSocket
constructor(name: string) {
@@ -34,17 +35,21 @@ export class Room {
websocket_message(packet: ClientboundPacket) {
log("ws", `<- ${packet.message?.sender ?? "control packet"}: `, packet);
if (packet.init) {
- this.local_user = new LocalUser(this, packet.init.your_id, "...");
- }
- if (packet.client_join) {
+ this.my_id = packet.init.your_id
+ // no need to check compat for now because this is hosted in the same place
+ log("*", `server: ${packet.init.version}`)
+ } else if (packet.client_join) {
const p = packet.client_join
log("*", `${this.name} ${p.id} joined`);
- const ru = new RemoteUser(this, p.id, p.name)
- this.local_user.add_initial_to_remote(ru)
- ru.offer()
- this.users.set(p.id, ru)
- this.remote_users.set(p.id, ru)
- return
+ if (p.id == this.my_id) {
+ this.local_user = new LocalUser(this, p.id, p.name);
+ } else {
+ const ru = new RemoteUser(this, p.id, p.name)
+ this.local_user.add_initial_to_remote(ru)
+ ru.offer()
+ this.users.set(p.id, ru)
+ this.remote_users.set(p.id, ru)
+ }
} else if (packet.client_leave) {
const p = packet.client_leave;
log("*", `${this.name} ${p.id} left`);
@@ -52,13 +57,16 @@ export class Room {
this.users.delete(p.id)
this.remote_users.delete(p.id)
return
- }
- if (packet.message) {
+ } else if (packet.message) {
const p = packet.message;
- const sender = this.remote_users.get(p.sender)!
- if (p.message.ice_candidate) sender.add_ice_candidate(p.message.ice_candidate)
- if (p.message.offer) sender.on_offer(p.message.offer)
- if (p.message.answer) sender.on_answer(p.message.answer)
+ const sender = this.users.get(p.sender)
+ if (sender instanceof RemoteUser) {
+ if (p.message.ice_candidate) sender.add_ice_candidate(p.message.ice_candidate)
+ if (p.message.offer) sender.on_offer(p.message.offer)
+ if (p.message.answer) sender.on_answer(p.message.answer)
+ } else {
+ console.log("!", p, sender);
+ }
}
}
websocket_close() {
diff --git a/readme.md b/readme.md
index fd6ce47..28729cf 100644
--- a/readme.md
+++ b/readme.md
@@ -34,6 +34,26 @@ Booleans can be either `1`, `true`, `yes` or their opposites.
| `screen_enabled` | boolean | false | Adds screen track on startup (wont work) |
| `mic_gain` | number | 1 | Sets the microphone volume |
+## Protocol
+
+The protocol packets are defined in [packets.d.ts](./common/packets.d.ts). Here is an (simplified) example of how the protocol is used.
+
+```
+S->C { init: { your_id: 5, version: "..." } }
+---- # Your join packet will be the first one.
+S->C { client_join: { id: 5, name: "bob" } }
+S->C { client_join: { id: 3, name: "alice" } }
+---- # Now publish your ICE candidates
+C->S { relay: { message: { ice_candiate: <RTCIceCandidateInit> } } }
+---- # Whenever you change your streams change:
+---- # Send an offer to everybody
+C->S { relay: { recipient: 3, offer: <RTCSessionDescriptionInit> } }
+---- # Alice answers
+S->C { message: { sender: 3, message: { offer: <RTCSessionDescriptionInit> } } }
+---- # In case the server uses a reverse-proxy that disconnects inactive connections: Ping every 30s
+C->S { ping: null }
+```
+
## Licence
See `LICENCE` file.
diff --git a/server/Cargo.lock b/server/Cargo.lock
index b1581bd..208e7f8 100644
--- a/server/Cargo.lock
+++ b/server/Cargo.lock
@@ -87,16 +87,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
-name = "chashmap"
-version = "2.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45"
-dependencies = [
- "owning_ref",
- "parking_lot 0.4.8",
-]
-
-[[package]]
name = "cpufeatures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -173,12 +163,6 @@ dependencies = [
]
[[package]]
-name = "fuchsia-cprng"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
-
-[[package]]
name = "futures-channel"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -455,12 +439,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
-name = "maybe-uninit"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
-
-[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -506,7 +484,7 @@ dependencies = [
"mime",
"mime_guess",
"quick-error",
- "rand 0.8.5",
+ "rand",
"safemem",
"tempfile",
"twoway",
@@ -535,44 +513,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
-name = "owning_ref"
-version = "0.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37"
-dependencies = [
- "stable_deref_trait",
-]
-
-[[package]]
-name = "parking_lot"
-version = "0.4.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e"
-dependencies = [
- "owning_ref",
- "parking_lot_core 0.2.14",
-]
-
-[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
- "parking_lot_core 0.9.3",
-]
-
-[[package]]
-name = "parking_lot_core"
-version = "0.2.14"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa"
-dependencies = [
- "libc",
- "rand 0.4.6",
- "smallvec 0.6.14",
- "winapi",
+ "parking_lot_core",
]
[[package]]
@@ -584,7 +531,7 @@ dependencies = [
"cfg-if",
"libc",
"redox_syscall",
- "smallvec 1.9.0",
+ "smallvec",
"windows-sys",
]
@@ -658,26 +605,13 @@ dependencies = [
[[package]]
name = "rand"
-version = "0.4.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
-dependencies = [
- "fuchsia-cprng",
- "libc",
- "rand_core 0.3.1",
- "rdrand",
- "winapi",
-]
-
-[[package]]
-name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
- "rand_core 0.6.3",
+ "rand_core",
]
[[package]]
@@ -687,26 +621,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
- "rand_core 0.6.3",
+ "rand_core",
]
[[package]]
name = "rand_core"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
-dependencies = [
- "rand_core 0.4.2",
-]
-
-[[package]]
-name = "rand_core"
-version = "0.4.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
-
-[[package]]
-name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
@@ -715,15 +634,6 @@ dependencies = [
]
[[package]]
-name = "rdrand"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
-dependencies = [
- "rand_core 0.3.1",
-]
-
-[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -829,7 +739,6 @@ dependencies = [
name = "server"
version = "0.1.0"
dependencies = [
- "chashmap",
"env_logger",
"futures-util",
"hyper",
@@ -885,15 +794,6 @@ dependencies = [
[[package]]
name = "smallvec"
-version = "0.6.14"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0"
-dependencies = [
- "maybe-uninit",
-]
-
-[[package]]
-name = "smallvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
@@ -909,12 +809,6 @@ dependencies = [
]
[[package]]
-name = "stable_deref_trait"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
-
-[[package]]
name = "syn"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -996,7 +890,7 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
- "parking_lot 0.12.1",
+ "parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@@ -1112,7 +1006,7 @@ dependencies = [
"http",
"httparse",
"log",
- "rand 0.8.5",
+ "rand",
"sha-1",
"thiserror",
"url",
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 06e52fb..7d32688 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -13,4 +13,3 @@ listenfd = "1.0.0"
hyper = "0.14.20"
serde = { version = "1.0.144", features = ["derive"] }
serde_json = "*"
-chashmap = "2.2.2"
diff --git a/server/src/main.rs b/server/src/main.rs
index 6a8f11d..268e2a4 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -1,18 +1,19 @@
pub mod protocol;
pub mod room;
-use chashmap::CHashMap;
use hyper::StatusCode;
use listenfd::ListenFd;
use log::error;
use room::Room;
+use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
+use tokio::sync::RwLock;
use warp::hyper::Server;
use warp::ws::WebSocket;
use warp::{Filter, Rejection, Reply};
-type Rooms = Arc<CHashMap<String, Arc<Room>>>;
+type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>;
fn main() {
tokio::runtime::Builder::new_multi_thread()
@@ -82,16 +83,20 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply {
async fn inner(sock: WebSocket, rname: String, rooms: Rooms) {
- let room = match rooms.get(&rname) {
- Some(r) => r,
+ let guard = rooms.read().await;
+ let room = match guard.get(&rname) {
+ Some(r) => r.to_owned(),
None => {
- rooms.insert(rname.to_owned(), Default::default());
- rooms.get(&rname).unwrap() // TODO never expect this to always work!!
+ drop(guard); // make sure read-lock is dropped to avoid deadlock
+ let mut guard = rooms.write().await;
+ guard.insert(rname.to_owned(), Default::default());
+ guard.get(&rname).unwrap().to_owned() // TODO never expect this to always work!!
}
};
+
room.client_connect(sock).await;
if room.should_remove().await {
- rooms.remove(&rname);
+ rooms.write().await.remove(&rname);
}
}
ws.on_upgrade(move |sock| inner(sock, rname, rooms))
diff --git a/server/src/room.rs b/server/src/room.rs
index a47d2e5..db545a6 100644
--- a/server/src/room.rs
+++ b/server/src/room.rs
@@ -1,7 +1,7 @@
use crate::protocol::{ClientboundPacket, ServerboundPacket};
use futures_util::{SinkExt, StreamExt, TryFutureExt};
-use log::error;
-use std::collections::HashMap;
+use log::{debug, error};
+use std::{collections::HashMap, sync::atomic::AtomicUsize};
use tokio::sync::{mpsc, RwLock};
use warp::ws::{Message, WebSocket};
@@ -13,17 +13,36 @@ pub struct Client {
#[derive(Debug, Default)]
pub struct Room {
+ pub id_counter: AtomicUsize,
pub clients: RwLock<HashMap<usize, Client>>,
}
impl Room {
pub async fn client_connect(&self, ws: WebSocket) {
+ debug!("new client connected");
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
let (tx, mut rx) = mpsc::unbounded_channel();
+ let mut g = self.clients.write().await;
+ // ensure write guard to client exists when using id_counter
+ let id = self
+ .id_counter
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let name = format!("user no. {id}");
+ g.insert(
+ id,
+ Client {
+ out: tx,
+ name: name.clone(),
+ },
+ );
+ drop(g);
+ debug!("assigned id={id}, init connection");
+
tokio::task::spawn(async move {
while let Some(packet) = rx.recv().await {
+ debug!("{id} -> {packet:?}");
user_ws_tx
.send(Message::text(serde_json::to_string(&packet).unwrap()))
.unwrap_or_else(|e| {
@@ -33,20 +52,33 @@ impl Room {
}
});
- let mut g = self.clients.write().await;
- let id = g.len();
- let name = format!("user no. {id}");
- g.insert(
+ self.send_to_client(
id,
- Client {
- out: tx,
- name: name.clone(),
+ ClientboundPacket::Init {
+ your_id: id,
+ version: format!("keks-meet {}", env!("CARGO_PKG_VERSION")),
},
- );
- drop(g);
+ )
+ .await;
- self.broadcast(id, ClientboundPacket::ClientJoin { id, name })
+ // send join of this client to all clients
+ self.broadcast(None, ClientboundPacket::ClientJoin { id, name })
.await;
+ // send join of all other clients to this one
+ for (&cid, c) in self.clients.read().await.iter() {
+ // skip self
+ if cid != id {
+ self.send_to_client(
+ id,
+ ClientboundPacket::ClientJoin {
+ id: cid,
+ name: c.name.clone(),
+ },
+ )
+ .await;
+ }
+ }
+ debug!("client should be ready!");
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
@@ -64,16 +96,18 @@ impl Room {
break;
}
};
+ debug!("{id} <- {p:?}");
self.client_message(id, p).await;
- };
+ }
}
-
self.clients.write().await.remove(&id);
+ self.broadcast(Some(id), ClientboundPacket::ClientLeave { id })
+ .await;
}
- pub async fn broadcast(&self, sender: usize, packet: ClientboundPacket) {
+ pub async fn broadcast(&self, sender: Option<usize>, packet: ClientboundPacket) {
for (&id, tx) in self.clients.read().await.iter() {
- if sender != id {
+ if sender != Some(id) {
let _ = tx.out.send(packet.clone());
}
}