summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/Cargo.lock136
-rw-r--r--server/Cargo.toml3
-rw-r--r--server/src/main.rs123
-rw-r--r--server/src/protocol.rs36
-rw-r--r--server/src/room.rs76
5 files changed, 291 insertions, 83 deletions
diff --git a/server/Cargo.lock b/server/Cargo.lock
index 5798db1..b1581bd 100644
--- a/server/Cargo.lock
+++ b/server/Cargo.lock
@@ -87,6 +87,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
+name = "chashmap"
+version = "2.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45"
+dependencies = [
+ "owning_ref",
+ "parking_lot 0.4.8",
+]
+
+[[package]]
name = "cpufeatures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -163,6 +173,12 @@ dependencies = [
]
[[package]]
+name = "fuchsia-cprng"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
+
+[[package]]
name = "futures-channel"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -439,6 +455,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
+name = "maybe-uninit"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
+
+[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -484,7 +506,7 @@ dependencies = [
"mime",
"mime_guess",
"quick-error",
- "rand",
+ "rand 0.8.5",
"safemem",
"tempfile",
"twoway",
@@ -513,13 +535,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
+name = "owning_ref"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37"
+dependencies = [
+ "stable_deref_trait",
+]
+
+[[package]]
+name = "parking_lot"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e"
+dependencies = [
+ "owning_ref",
+ "parking_lot_core 0.2.14",
+]
+
+[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
- "parking_lot_core",
+ "parking_lot_core 0.9.3",
+]
+
+[[package]]
+name = "parking_lot_core"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa"
+dependencies = [
+ "libc",
+ "rand 0.4.6",
+ "smallvec 0.6.14",
+ "winapi",
]
[[package]]
@@ -531,7 +584,7 @@ dependencies = [
"cfg-if",
"libc",
"redox_syscall",
- "smallvec",
+ "smallvec 1.9.0",
"windows-sys",
]
@@ -605,13 +658,26 @@ dependencies = [
[[package]]
name = "rand"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
+dependencies = [
+ "fuchsia-cprng",
+ "libc",
+ "rand_core 0.3.1",
+ "rdrand",
+ "winapi",
+]
+
+[[package]]
+name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
- "rand_core",
+ "rand_core 0.6.3",
]
[[package]]
@@ -621,11 +687,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
- "rand_core",
+ "rand_core 0.6.3",
+]
+
+[[package]]
+name = "rand_core"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
+dependencies = [
+ "rand_core 0.4.2",
]
[[package]]
name = "rand_core"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
+
+[[package]]
+name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
@@ -634,6 +715,15 @@ dependencies = [
]
[[package]]
+name = "rdrand"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -697,6 +787,20 @@ name = "serde"
version = "1.0.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860"
+dependencies = [
+ "serde_derive",
+]
+
+[[package]]
+name = "serde_derive"
+version = "1.0.144"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
[[package]]
name = "serde_json"
@@ -725,11 +829,14 @@ dependencies = [
name = "server"
version = "0.1.0"
dependencies = [
+ "chashmap",
"env_logger",
"futures-util",
"hyper",
"listenfd",
"log",
+ "serde",
+ "serde_json",
"tokio",
"warp",
]
@@ -778,6 +885,15 @@ dependencies = [
[[package]]
name = "smallvec"
+version = "0.6.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0"
+dependencies = [
+ "maybe-uninit",
+]
+
+[[package]]
+name = "smallvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
@@ -793,6 +909,12 @@ dependencies = [
]
[[package]]
+name = "stable_deref_trait"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
+
+[[package]]
name = "syn"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -874,7 +996,7 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
- "parking_lot",
+ "parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@@ -990,7 +1112,7 @@ dependencies = [
"http",
"httparse",
"log",
- "rand",
+ "rand 0.8.5",
"sha-1",
"thiserror",
"url",
diff --git a/server/Cargo.toml b/server/Cargo.toml
index f9b4cd2..06e52fb 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -11,3 +11,6 @@ env_logger = "0.8"
futures-util = "0.3.24"
listenfd = "1.0.0"
hyper = "0.14.20"
+serde = { version = "1.0.144", features = ["derive"] }
+serde_json = "*"
+chashmap = "2.2.2"
diff --git a/server/src/main.rs b/server/src/main.rs
index 1128fb1..ff34ad0 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -1,50 +1,48 @@
-use futures_util::{SinkExt, StreamExt, TryFutureExt};
+#![feature(async_closure)]
+
+pub mod protocol;
+pub mod room;
+
+use chashmap::CHashMap;
+use hyper::StatusCode;
use listenfd::ListenFd;
use log::error;
-use std::collections::HashMap;
+use room::Room;
use std::convert::Infallible;
-use std::sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
-};
-use tokio::sync::{mpsc, RwLock};
+use std::sync::Arc;
use warp::hyper::Server;
-use warp::ws::{Message, WebSocket};
-use warp::Filter;
-
-static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
+use warp::{Filter, Rejection, Reply};
-type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
+type Rooms = Arc<CHashMap<String, Room>>;
#[tokio::main]
async fn main() {
env_logger::init_from_env("LOG");
- let users = Users::default();
- let users = warp::any().map(move || users.clone());
+ let rooms = Rooms::default();
+ let rooms = warp::any().map(move || rooms.clone());
let signaling = warp::path("signaling")
+ .and(warp::path::param::<String>())
+ .and(rooms)
.and(warp::ws())
- .and(users)
- .map(|ws: warp::ws::Ws, users| ws.on_upgrade(move |socket| user_connected(socket, users)));
+ .map(signaling_connect);
let static_routes = {
- let index = warp::get()
- .and(warp::path!())
- .and(warp::fs::file("../client-web/public/index.html"));
+ let index = warp::path::end().and(warp::fs::file("../client-web/public/start.html"));
let assets = warp::path("_assets").and(warp::fs::dir("../client-web/public/assets"));
- index
+ warp::get().and(index.or(assets))
};
- let routes = static_routes.or(signaling).or(four_oh_four_tm);
+ let routes = static_routes.or(signaling).recover(handle_rejection);
// if listender fd is passed from the outside world, use it.
let mut listenfd = ListenFd::from_env();
let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() {
Server::from_tcp(l).unwrap()
} else {
- Server::bind(&([127, 0, 0, 1], 3030).into())
+ Server::bind(&([127, 0, 0, 1], 8080).into())
};
let service = warp::service(routes);
server
@@ -56,63 +54,36 @@ async fn main() {
.unwrap();
}
-async fn user_connected(ws: WebSocket, users: Users) {
- let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
-
- eprintln!("new chat user: {}", my_id);
-
- let (mut user_ws_tx, mut user_ws_rx) = ws.split();
-
- let (tx, mut rx) = mpsc::unbounded_channel();
-
- tokio::task::spawn(async move {
- while let Some(message) = rx.recv().await {
- user_ws_tx
- .send(message)
- .unwrap_or_else(|e| {
- eprintln!("websocket send error: {}", e);
- })
- .await;
- }
- });
-
- users.write().await.insert(my_id, tx);
-
- while let Some(result) = user_ws_rx.next().await {
- let msg = match result {
- Ok(msg) => msg,
- Err(e) => {
- error!("websocket error(uid={my_id}): {e}");
- break;
- }
- };
- user_message(my_id, msg, &users).await;
- }
-
- users.write().await.remove(&my_id);
-}
-
-async fn user_message(my_id: usize, msg: Message, users: &Users) {
- // Skip any non-Text messages...
- let msg = if let Ok(s) = msg.to_str() {
- s
+async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
+ let code = if err.is_not_found() {
+ StatusCode::NOT_FOUND
+ } else if let Some(_) = err.find::<warp::filters::body::BodyDeserializeError>() {
+ StatusCode::BAD_REQUEST
+ } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
+ StatusCode::METHOD_NOT_ALLOWED
} else {
- return;
+ error!("unhandled rejection: {:?}", err);
+ StatusCode::INTERNAL_SERVER_ERROR
};
+ let json = warp::reply::html(format!(
+ "<!DOCTYPE html><html><head></head><body><pre>{}</pre></body></html>",
+ code.canonical_reason().unwrap_or("!?")
+ ));
+ Ok(warp::reply::with_status(json, code))
+}
- let new_msg = format!("<User#{}>: {}", my_id, msg);
-
- for (&uid, tx) in users.read().await.iter() {
- if my_id != uid {
- if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
- // The tx is disconnected, our `user_disconnected` code
- // should be happening in another task, nothing more to
- // do here.
+fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply {
+ ws.on_upgrade(async move |sock| {
+ let room = match rooms.get(&rname) {
+ Some(r) => r,
+ None => {
+ rooms.insert(rname.to_owned(), Room::default());
+ rooms.get(&rname).unwrap() // TODO never expect this to always work!!
}
+ };
+ room.client_connect(sock).await;
+ if room.should_remove().await {
+ rooms.remove(&rname);
}
- }
-}
-
-async fn user_disconnected(my_id: usize, users: &Users) {
- eprintln!("good bye user: {}", my_id);
+ })
}
diff --git a/server/src/protocol.rs b/server/src/protocol.rs
new file mode 100644
index 0000000..d7e94d0
--- /dev/null
+++ b/server/src/protocol.rs
@@ -0,0 +1,36 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum ClientboundPacket {}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum ServerboundPacket {
+ Answer { receiver: usize },
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum RTCSdpType {
+ Answer,
+ Offer,
+ PRAnswer,
+ Rollback,
+}
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RTCSessionDescriptionInit {
+ sdp: String,
+ #[serde(rename = "type")]
+ ty: RTCSdpType,
+}
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RTCIceCandidateInit {
+ candidate: Option<String>,
+ #[serde(rename = "sdpMLineIndex")]
+ sdp_mline_index: Option<i32>,
+ #[serde(rename = "sdpMid")]
+ sdp_mid: Option<String>,
+ #[serde(rename = "usernameFragment")]
+ username_fragment: Option<String>,
+}
diff --git a/server/src/room.rs b/server/src/room.rs
new file mode 100644
index 0000000..14ff9b6
--- /dev/null
+++ b/server/src/room.rs
@@ -0,0 +1,76 @@
+use crate::protocol::{ClientboundPacket, ServerboundPacket};
+use futures_util::{SinkExt, StreamExt, TryFutureExt};
+use log::error;
+use std::collections::HashMap;
+use tokio::sync::{mpsc, RwLock};
+use warp::ws::{Message, WebSocket};
+
+#[derive(Debug)]
+pub struct Client {
+ out: mpsc::UnboundedSender<ClientboundPacket>,
+}
+
+#[derive(Debug, Default)]
+pub struct Room {
+ clients: RwLock<HashMap<usize, Client>>,
+}
+
+impl Room {
+ pub async fn client_connect(&self, ws: WebSocket) {
+ let (mut user_ws_tx, mut user_ws_rx) = ws.split();
+
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ tokio::task::spawn(async move {
+ while let Some(packet) = rx.recv().await {
+ user_ws_tx
+ .send(Message::text(serde_json::to_string(&packet).unwrap()))
+ .unwrap_or_else(|e| {
+ eprintln!("websocket send error: {}", e);
+ })
+ .await;
+ }
+ });
+
+ let id = {
+ let mut g = self.clients.write().await;
+ let id = g.len();
+ g.insert(id, Client { out: tx });
+ id
+ };
+
+ while let Some(result) = user_ws_rx.next().await {
+ let msg = match result {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("websocket error(id={id}): {e}");
+ break;
+ }
+ };
+ if let Ok(s) = msg.to_str() {
+ let p = match serde_json::from_str::<ServerboundPacket>(s) {
+ Ok(p) => p,
+ Err(e) => {
+ error!("client(id={id}) sent invalid packet: {e:?}");
+ break;
+ }
+ };
+ self.client_message(id, p).await;
+ };
+ }
+
+ self.clients.write().await.remove(&id);
+ }
+
+ pub async fn client_message(&self, sender: usize, packet: ServerboundPacket) {
+ for (&id, tx) in self.clients.read().await.iter() {
+ if sender != id {
+ let _ = tx.out.send(todo!());
+ }
+ }
+ }
+
+ pub async fn should_remove(&self) -> bool {
+ self.clients.read().await.len() == 0
+ }
+}