From 3ce9a53e272dc556222bca747461b3ec24796912 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 7 Sep 2022 14:46:09 +0200 Subject: start with the new server --- server/src/main.rs | 123 +++++++++++++++++++------------------------------ server/src/protocol.rs | 36 +++++++++++++++ server/src/room.rs | 76 ++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 76 deletions(-) create mode 100644 server/src/protocol.rs create mode 100644 server/src/room.rs (limited to 'server/src') diff --git a/server/src/main.rs b/server/src/main.rs index 1128fb1..ff34ad0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,50 +1,48 @@ -use futures_util::{SinkExt, StreamExt, TryFutureExt}; +#![feature(async_closure)] + +pub mod protocol; +pub mod room; + +use chashmap::CHashMap; +use hyper::StatusCode; use listenfd::ListenFd; use log::error; -use std::collections::HashMap; +use room::Room; use std::convert::Infallible; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use tokio::sync::{mpsc, RwLock}; +use std::sync::Arc; use warp::hyper::Server; -use warp::ws::{Message, WebSocket}; -use warp::Filter; - -static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); +use warp::{Filter, Rejection, Reply}; -type Users = Arc>>>; +type Rooms = Arc>; #[tokio::main] async fn main() { env_logger::init_from_env("LOG"); - let users = Users::default(); - let users = warp::any().map(move || users.clone()); + let rooms = Rooms::default(); + let rooms = warp::any().map(move || rooms.clone()); let signaling = warp::path("signaling") + .and(warp::path::param::()) + .and(rooms) .and(warp::ws()) - .and(users) - .map(|ws: warp::ws::Ws, users| ws.on_upgrade(move |socket| user_connected(socket, users))); + .map(signaling_connect); let static_routes = { - let index = warp::get() - .and(warp::path!()) - .and(warp::fs::file("../client-web/public/index.html")); + let index = warp::path::end().and(warp::fs::file("../client-web/public/start.html")); let assets = warp::path("_assets").and(warp::fs::dir("../client-web/public/assets")); - index + warp::get().and(index.or(assets)) }; - let routes = static_routes.or(signaling).or(four_oh_four_tm); + let routes = static_routes.or(signaling).recover(handle_rejection); // if listender fd is passed from the outside world, use it. let mut listenfd = ListenFd::from_env(); let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() { Server::from_tcp(l).unwrap() } else { - Server::bind(&([127, 0, 0, 1], 3030).into()) + Server::bind(&([127, 0, 0, 1], 8080).into()) }; let service = warp::service(routes); server @@ -56,63 +54,36 @@ async fn main() { .unwrap(); } -async fn user_connected(ws: WebSocket, users: Users) { - let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); - - eprintln!("new chat user: {}", my_id); - - let (mut user_ws_tx, mut user_ws_rx) = ws.split(); - - let (tx, mut rx) = mpsc::unbounded_channel(); - - tokio::task::spawn(async move { - while let Some(message) = rx.recv().await { - user_ws_tx - .send(message) - .unwrap_or_else(|e| { - eprintln!("websocket send error: {}", e); - }) - .await; - } - }); - - users.write().await.insert(my_id, tx); - - while let Some(result) = user_ws_rx.next().await { - let msg = match result { - Ok(msg) => msg, - Err(e) => { - error!("websocket error(uid={my_id}): {e}"); - break; - } - }; - user_message(my_id, msg, &users).await; - } - - users.write().await.remove(&my_id); -} - -async fn user_message(my_id: usize, msg: Message, users: &Users) { - // Skip any non-Text messages... - let msg = if let Ok(s) = msg.to_str() { - s +async fn handle_rejection(err: Rejection) -> Result { + let code = if err.is_not_found() { + StatusCode::NOT_FOUND + } else if let Some(_) = err.find::() { + StatusCode::BAD_REQUEST + } else if let Some(_) = err.find::() { + StatusCode::METHOD_NOT_ALLOWED } else { - return; + error!("unhandled rejection: {:?}", err); + StatusCode::INTERNAL_SERVER_ERROR }; + let json = warp::reply::html(format!( + "
{}
", + code.canonical_reason().unwrap_or("!?") + )); + Ok(warp::reply::with_status(json, code)) +} - let new_msg = format!(": {}", my_id, msg); - - for (&uid, tx) in users.read().await.iter() { - if my_id != uid { - if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { - // The tx is disconnected, our `user_disconnected` code - // should be happening in another task, nothing more to - // do here. +fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply { + ws.on_upgrade(async move |sock| { + let room = match rooms.get(&rname) { + Some(r) => r, + None => { + rooms.insert(rname.to_owned(), Room::default()); + rooms.get(&rname).unwrap() // TODO never expect this to always work!! } + }; + room.client_connect(sock).await; + if room.should_remove().await { + rooms.remove(&rname); } - } -} - -async fn user_disconnected(my_id: usize, users: &Users) { - eprintln!("good bye user: {}", my_id); + }) } diff --git a/server/src/protocol.rs b/server/src/protocol.rs new file mode 100644 index 0000000..d7e94d0 --- /dev/null +++ b/server/src/protocol.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ClientboundPacket {} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ServerboundPacket { + Answer { receiver: usize }, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RTCSdpType { + Answer, + Offer, + PRAnswer, + Rollback, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct RTCSessionDescriptionInit { + sdp: String, + #[serde(rename = "type")] + ty: RTCSdpType, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct RTCIceCandidateInit { + candidate: Option, + #[serde(rename = "sdpMLineIndex")] + sdp_mline_index: Option, + #[serde(rename = "sdpMid")] + sdp_mid: Option, + #[serde(rename = "usernameFragment")] + username_fragment: Option, +} diff --git a/server/src/room.rs b/server/src/room.rs new file mode 100644 index 0000000..14ff9b6 --- /dev/null +++ b/server/src/room.rs @@ -0,0 +1,76 @@ +use crate::protocol::{ClientboundPacket, ServerboundPacket}; +use futures_util::{SinkExt, StreamExt, TryFutureExt}; +use log::error; +use std::collections::HashMap; +use tokio::sync::{mpsc, RwLock}; +use warp::ws::{Message, WebSocket}; + +#[derive(Debug)] +pub struct Client { + out: mpsc::UnboundedSender, +} + +#[derive(Debug, Default)] +pub struct Room { + clients: RwLock>, +} + +impl Room { + pub async fn client_connect(&self, ws: WebSocket) { + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + + let (tx, mut rx) = mpsc::unbounded_channel(); + + tokio::task::spawn(async move { + while let Some(packet) = rx.recv().await { + user_ws_tx + .send(Message::text(serde_json::to_string(&packet).unwrap())) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + + let id = { + let mut g = self.clients.write().await; + let id = g.len(); + g.insert(id, Client { out: tx }); + id + }; + + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + error!("websocket error(id={id}): {e}"); + break; + } + }; + if let Ok(s) = msg.to_str() { + let p = match serde_json::from_str::(s) { + Ok(p) => p, + Err(e) => { + error!("client(id={id}) sent invalid packet: {e:?}"); + break; + } + }; + self.client_message(id, p).await; + }; + } + + self.clients.write().await.remove(&id); + } + + pub async fn client_message(&self, sender: usize, packet: ServerboundPacket) { + for (&id, tx) in self.clients.read().await.iter() { + if sender != id { + let _ = tx.out.send(todo!()); + } + } + } + + pub async fn should_remove(&self) -> bool { + self.clients.read().await.len() == 0 + } +} -- cgit v1.2.3-70-g09d2