diff options
-rw-r--r-- | client-web/source/local_user.ts | 4 | ||||
-rw-r--r-- | client-web/source/remote_user.ts | 12 | ||||
-rw-r--r-- | client-web/source/room.ts | 58 | ||||
-rw-r--r-- | client-web/source/user.ts | 8 | ||||
-rw-r--r-- | common/packets.d.ts | 24 | ||||
-rw-r--r-- | readme.md | 18 | ||||
-rw-r--r-- | server/src/main.rs | 8 | ||||
-rw-r--r-- | server/src/protocol.rs | 43 | ||||
-rw-r--r-- | server/src/room.rs | 45 |
9 files changed, 145 insertions, 75 deletions
diff --git a/client-web/source/local_user.ts b/client-web/source/local_user.ts index ca91a19..bfd9459 100644 --- a/client-web/source/local_user.ts +++ b/client-web/source/local_user.ts @@ -13,8 +13,8 @@ export class LocalUser extends User { mic_gain?: GainNode default_gain: number = parameter_number("mic_gain", 1) - constructor(room: Room, name: string) { - super(room, name) + constructor(room: Room, id: number, name: string) { + super(room, id, name) this.el.classList.add("local") this.local = true this.create_controls() diff --git a/client-web/source/remote_user.ts b/client-web/source/remote_user.ts index a0fdeaf..6cc57a5 100644 --- a/client-web/source/remote_user.ts +++ b/client-web/source/remote_user.ts @@ -10,13 +10,13 @@ export class RemoteUser extends User { peer: RTCPeerConnection negotiation_busy = false - constructor(room: Room, name: string) { - super(room, name) - log("usermodel", `added remote user: ${name}`) + constructor(room: Room, id: number, name: string) { + super(room, id, name) + log("usermodel", `added remote user: ${id} ${JSON.stringify(name)}`) this.peer = new RTCPeerConnection(servers) this.peer.onicecandidate = ev => { if (!ev.candidate) return - room.websocket_send({ ice_candiate: ev.candidate.toJSON(), receiver: this.name }) + room.websocket_send({ relay: { recipient: this.id, message: { ice_candidate: ev.candidate.toJSON() } } }) } this.peer.ontrack = ev => { const t = ev.track @@ -38,7 +38,7 @@ export class RemoteUser extends User { await this.peer.setLocalDescription(offer_description) const offer = { type: offer_description.type, sdp: offer_description.sdp } log("webrtc", `sent offer: ${this.name}`, { a: offer }) - this.room.websocket_send({ receiver: this.name, offer }) + this.room.websocket_send({ relay: { recipient: this.id, message: { offer } } }) } async on_offer(offer: RTCSessionDescriptionInit) { this.negotiation_busy = true @@ -52,7 +52,7 @@ export class RemoteUser extends User { await this.peer.setLocalDescription(answer_description) const answer = { type: answer_description.type, sdp: answer_description.sdp } log("webrtc", `sent answer: ${this.name}`, { a: answer }) - this.room.websocket_send({ receiver: this.name, answer }) + this.room.websocket_send({ relay: { recipient: this.id, message: { answer } } }) this.negotiation_busy = false } async on_answer(answer: RTCSessionDescriptionInit) { diff --git a/client-web/source/room.ts b/client-web/source/room.ts index f22cff1..3ed7ddf 100644 --- a/client-web/source/room.ts +++ b/client-web/source/room.ts @@ -5,15 +5,15 @@ import { RemoteUser } from "./remote_user.ts"; import { User } from "./user.ts"; import { LocalUser } from "./local_user.ts"; import { hex_id, parameter_string } from "./helper.ts"; -import { PacketS, PacketC } from "../../common/packets.d.ts"; +import { ServerboundPacket, ClientboundPacket } from "../../common/packets.d.ts"; export class Room { el: HTMLElement name: string - users: Map<string, User> = new Map() - remote_users: Map<string, RemoteUser> = new Map() - local_user: LocalUser + users: Map<number, User> = new Map() + remote_users: Map<number, RemoteUser> = new Map() + local_user!: LocalUser websocket: WebSocket constructor(name: string) { @@ -26,37 +26,41 @@ export class Room { this.websocket.onmessage = (ev) => { this.websocket_message(JSON.parse(ev.data)) } - this.local_user = new LocalUser(this, parameter_string("username", `guest-${hex_id()}`)) } - websocket_send(data: PacketS) { - log("ws", `-> ${data.receiver ?? "*"}`, data) + websocket_send(data: ServerboundPacket) { + log("ws", `-> ${data.relay?.recipient ?? "*"}`, data) this.websocket.send(JSON.stringify(data)) } - websocket_message(packet: PacketC) { - if (packet.join) { - log("*", `${this.name} ${packet.sender} joined`); - const ru = new RemoteUser(this, packet.sender) + websocket_message(packet: ClientboundPacket) { + log("ws", `<- ${packet.message?.sender ?? "control packet"}: `, packet); + if (packet.init) { + this.local_user = new LocalUser(this, packet.init.your_id, "..."); + } + if (packet.client_join) { + const p = packet.client_join + log("*", `${this.name} ${p.id} joined`); + const ru = new RemoteUser(this, p.id, p.name) this.local_user.add_initial_to_remote(ru) - if (!packet.stable) ru.offer() - this.users.set(packet.sender, ru) - this.remote_users.set(packet.sender, ru) + ru.offer() + this.users.set(p.id, ru) + this.remote_users.set(p.id, ru) return - } - const sender = this.remote_users.get(packet.sender) - if (!sender) return console.warn(`unknown sender ${packet.sender}`) - if (packet.leave) { - log("*", `${this.name} ${packet.sender} left`); - sender.leave() - this.users.delete(packet.sender) - this.remote_users.delete(packet.sender) + } else if (packet.client_leave) { + const p = packet.client_leave; + log("*", `${this.name} ${p.id} left`); + this.remote_users.get(p.id)!.leave() + this.users.delete(p.id) + this.remote_users.delete(p.id) return } - if (!packet.data) return console.warn("dataless packet") - log("ws", `<- ${packet.sender}: `, packet.data); - if (packet.data.ice_candiate) sender.add_ice_candidate(packet.data.ice_candiate) - if (packet.data.offer) sender.on_offer(packet.data.offer) - if (packet.data.answer) sender.on_answer(packet.data.answer) + if (packet.message) { + const p = packet.message; + const sender = this.remote_users.get(p.sender)! + if (p.message.ice_candidate) sender.add_ice_candidate(p.message.ice_candidate) + if (p.message.offer) sender.on_offer(p.message.offer) + if (p.message.answer) sender.on_answer(p.message.answer) + } } websocket_close() { log("ws", "websocket closed"); diff --git a/client-web/source/user.ts b/client-web/source/user.ts index bda875f..dbf2862 100644 --- a/client-web/source/user.ts +++ b/client-web/source/user.ts @@ -6,18 +6,13 @@ import { TrackHandle } from "./track_handle.ts"; export abstract class User { - name: string - room: Room - el: HTMLElement local = false protected tracks: Set<TrackHandle> = new Set() - constructor(room: Room, name: string) { - this.name = name - this.room = room + constructor(public room: Room, public id: number, public name: string) { this.el = document.createElement("div") this.el.classList.add("user") this.room.el.append(this.el) @@ -60,7 +55,6 @@ export abstract class User { if (this.local) media_el.muted = true - const el = document.createElement("div") if (t.local) { const end_button = document.createElement("button") diff --git a/common/packets.d.ts b/common/packets.d.ts index 776f12f..abfb7dc 100644 --- a/common/packets.d.ts +++ b/common/packets.d.ts @@ -4,17 +4,19 @@ type F_RTCSdpType = "answer" | "offer" | "pranswer" | "rollback"; interface F_RTCSessionDescriptionInit { sdp?: string; type: F_RTCSdpType; } interface F_RTCIceCandidateInit { candidate?: string; sdpMLineIndex?: number | null; sdpMid?: string | null; usernameFragment?: string | null; } -export interface PacketC { - sender: string, - data?: PacketS, - join?: boolean, // user just joined - leave?: boolean, // user left - stable?: boolean // user "joined" because you joined aka. user was already there +export interface ClientboundPacket { + init?: { your_id: number, version: string }, + client_join?: { id: number, name: string }, + client_leave?: { id: number }, + message?: { sender: number, message: RelayMessage }, } -export interface PacketS { - receiver?: string - ice_candidate?: F_RTCIceCandidateInit - offer?: F_RTCSessionDescriptionInit - answer?: F_RTCSessionDescriptionInit + +export interface ServerboundPacket { + relay?: { recipient?: number, message: RelayMessage }, } +export interface RelayMessage { + offer?: F_RTCSessionDescriptionInit, + answer?: F_RTCSessionDescriptionInit, + ice_candidate?: F_RTCIceCandidateInit, +} @@ -1,6 +1,22 @@ # keks-meet -a web conference application +a web conferencing application + +## Features + +- Rooms +- Different stream types + - Camera + - Microphone + - Screen capture +- Multiple streams + +## Todo-List + +- Chat +- Optionally enable video streams +- Settings menu +- Native client ## Parameters diff --git a/server/src/main.rs b/server/src/main.rs index ff34ad0..b4121b9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,5 +1,3 @@ -#![feature(async_closure)] - pub mod protocol; pub mod room; @@ -11,6 +9,7 @@ use room::Room; use std::convert::Infallible; use std::sync::Arc; use warp::hyper::Server; +use warp::ws::WebSocket; use warp::{Filter, Rejection, Reply}; type Rooms = Arc<CHashMap<String, Room>>; @@ -73,7 +72,7 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> { } fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply { - ws.on_upgrade(async move |sock| { + async fn inner(sock: WebSocket, rname: String, rooms: Rooms) { let room = match rooms.get(&rname) { Some(r) => r, None => { @@ -85,5 +84,6 @@ fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Repl if room.should_remove().await { rooms.remove(&rname); } - }) + } + ws.on_upgrade(move |sock| inner(sock, rname, rooms)) } diff --git a/server/src/protocol.rs b/server/src/protocol.rs index d7e94d0..780ae4a 100644 --- a/server/src/protocol.rs +++ b/server/src/protocol.rs @@ -1,16 +1,43 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub enum ClientboundPacket {} +pub enum ClientboundPacket { + Init { + your_id: usize, + version: String, + }, + ClientJoin { + id: usize, + name: String, + }, + ClientLeave { + id: usize, + }, + Message { + sender: usize, + message: RelayMessage, + }, +} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ServerboundPacket { - Answer { receiver: usize }, + Relay { + recipient: Option<usize>, + message: RelayMessage, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RelayMessage { + Offer(RTCSessionDescriptionInit), + Answer(RTCSessionDescriptionInit), + IceCandidate(RTCIceCandidateInit), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum RTCSdpType { Answer, @@ -18,13 +45,15 @@ pub enum RTCSdpType { PRAnswer, Rollback, } -#[derive(Debug, Serialize, Deserialize)] + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RTCSessionDescriptionInit { sdp: String, #[serde(rename = "type")] ty: RTCSdpType, } -#[derive(Debug, Serialize, Deserialize)] + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RTCIceCandidateInit { candidate: Option<String>, #[serde(rename = "sdpMLineIndex")] diff --git a/server/src/room.rs b/server/src/room.rs index 14ff9b6..43cfa90 100644 --- a/server/src/room.rs +++ b/server/src/room.rs @@ -7,12 +7,13 @@ use warp::ws::{Message, WebSocket}; #[derive(Debug)] pub struct Client { - out: mpsc::UnboundedSender<ClientboundPacket>, + pub name: String, + pub out: mpsc::UnboundedSender<ClientboundPacket>, } #[derive(Debug, Default)] pub struct Room { - clients: RwLock<HashMap<usize, Client>>, + pub clients: RwLock<HashMap<usize, Client>>, } impl Room { @@ -32,12 +33,20 @@ impl Room { } }); - let id = { - let mut g = self.clients.write().await; - let id = g.len(); - g.insert(id, Client { out: tx }); - id - }; + let mut g = self.clients.write().await; + let id = g.len(); + let name = format!("user no. {id}"); + g.insert( + id, + Client { + out: tx, + name: name.clone(), + }, + ); + drop(g); + + self.broadcast(id, ClientboundPacket::ClientJoin { id, name }) + .await; while let Some(result) = user_ws_rx.next().await { let msg = match result { @@ -62,10 +71,26 @@ impl Room { self.clients.write().await.remove(&id); } - pub async fn client_message(&self, sender: usize, packet: ServerboundPacket) { + pub async fn broadcast(&self, sender: usize, packet: ClientboundPacket) { for (&id, tx) in self.clients.read().await.iter() { if sender != id { - let _ = tx.out.send(todo!()); + let _ = tx.out.send(packet.clone()); + } + } + } + pub async fn send_to_client(&self, recipient: usize, packet: ClientboundPacket) { + if let Some(c) = self.clients.read().await.get(&recipient) { + let _ = c.out.send(packet); + } + } + + pub async fn client_message(&self, sender: usize, packet: ServerboundPacket) { + match packet { + ServerboundPacket::Relay { recipient, message } => { + if let Some(recipient) = recipient { + self.send_to_client(recipient, ClientboundPacket::Message { sender, message }) + .await; + } } } } |