summaryrefslogtreecommitdiff
path: root/server/src/main.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-07 14:46:09 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-07 14:46:09 +0200
commit3ce9a53e272dc556222bca747461b3ec24796912 (patch)
tree3f0f780801cbb49ef797c361cf67b4709bacac42 /server/src/main.rs
parent61950198e3bf06555f48e8f51c882a4c3cce5128 (diff)
downloadkeks-meet-3ce9a53e272dc556222bca747461b3ec24796912.tar
keks-meet-3ce9a53e272dc556222bca747461b3ec24796912.tar.bz2
keks-meet-3ce9a53e272dc556222bca747461b3ec24796912.tar.zst
start with the new server
Diffstat (limited to 'server/src/main.rs')
-rw-r--r--server/src/main.rs123
1 files changed, 47 insertions, 76 deletions
diff --git a/server/src/main.rs b/server/src/main.rs
index 1128fb1..ff34ad0 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -1,50 +1,48 @@
-use futures_util::{SinkExt, StreamExt, TryFutureExt};
+#![feature(async_closure)]
+
+pub mod protocol;
+pub mod room;
+
+use chashmap::CHashMap;
+use hyper::StatusCode;
use listenfd::ListenFd;
use log::error;
-use std::collections::HashMap;
+use room::Room;
use std::convert::Infallible;
-use std::sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
-};
-use tokio::sync::{mpsc, RwLock};
+use std::sync::Arc;
use warp::hyper::Server;
-use warp::ws::{Message, WebSocket};
-use warp::Filter;
-
-static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
+use warp::{Filter, Rejection, Reply};
-type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
+type Rooms = Arc<CHashMap<String, Room>>;
#[tokio::main]
async fn main() {
env_logger::init_from_env("LOG");
- let users = Users::default();
- let users = warp::any().map(move || users.clone());
+ let rooms = Rooms::default();
+ let rooms = warp::any().map(move || rooms.clone());
let signaling = warp::path("signaling")
+ .and(warp::path::param::<String>())
+ .and(rooms)
.and(warp::ws())
- .and(users)
- .map(|ws: warp::ws::Ws, users| ws.on_upgrade(move |socket| user_connected(socket, users)));
+ .map(signaling_connect);
let static_routes = {
- let index = warp::get()
- .and(warp::path!())
- .and(warp::fs::file("../client-web/public/index.html"));
+ let index = warp::path::end().and(warp::fs::file("../client-web/public/start.html"));
let assets = warp::path("_assets").and(warp::fs::dir("../client-web/public/assets"));
- index
+ warp::get().and(index.or(assets))
};
- let routes = static_routes.or(signaling).or(four_oh_four_tm);
+ let routes = static_routes.or(signaling).recover(handle_rejection);
// if listender fd is passed from the outside world, use it.
let mut listenfd = ListenFd::from_env();
let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() {
Server::from_tcp(l).unwrap()
} else {
- Server::bind(&([127, 0, 0, 1], 3030).into())
+ Server::bind(&([127, 0, 0, 1], 8080).into())
};
let service = warp::service(routes);
server
@@ -56,63 +54,36 @@ async fn main() {
.unwrap();
}
-async fn user_connected(ws: WebSocket, users: Users) {
- let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
-
- eprintln!("new chat user: {}", my_id);
-
- let (mut user_ws_tx, mut user_ws_rx) = ws.split();
-
- let (tx, mut rx) = mpsc::unbounded_channel();
-
- tokio::task::spawn(async move {
- while let Some(message) = rx.recv().await {
- user_ws_tx
- .send(message)
- .unwrap_or_else(|e| {
- eprintln!("websocket send error: {}", e);
- })
- .await;
- }
- });
-
- users.write().await.insert(my_id, tx);
-
- while let Some(result) = user_ws_rx.next().await {
- let msg = match result {
- Ok(msg) => msg,
- Err(e) => {
- error!("websocket error(uid={my_id}): {e}");
- break;
- }
- };
- user_message(my_id, msg, &users).await;
- }
-
- users.write().await.remove(&my_id);
-}
-
-async fn user_message(my_id: usize, msg: Message, users: &Users) {
- // Skip any non-Text messages...
- let msg = if let Ok(s) = msg.to_str() {
- s
+async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
+ let code = if err.is_not_found() {
+ StatusCode::NOT_FOUND
+ } else if let Some(_) = err.find::<warp::filters::body::BodyDeserializeError>() {
+ StatusCode::BAD_REQUEST
+ } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
+ StatusCode::METHOD_NOT_ALLOWED
} else {
- return;
+ error!("unhandled rejection: {:?}", err);
+ StatusCode::INTERNAL_SERVER_ERROR
};
+ let json = warp::reply::html(format!(
+ "<!DOCTYPE html><html><head></head><body><pre>{}</pre></body></html>",
+ code.canonical_reason().unwrap_or("!?")
+ ));
+ Ok(warp::reply::with_status(json, code))
+}
- let new_msg = format!("<User#{}>: {}", my_id, msg);
-
- for (&uid, tx) in users.read().await.iter() {
- if my_id != uid {
- if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
- // The tx is disconnected, our `user_disconnected` code
- // should be happening in another task, nothing more to
- // do here.
+fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply {
+ ws.on_upgrade(async move |sock| {
+ let room = match rooms.get(&rname) {
+ Some(r) => r,
+ None => {
+ rooms.insert(rname.to_owned(), Room::default());
+ rooms.get(&rname).unwrap() // TODO never expect this to always work!!
}
+ };
+ room.client_connect(sock).await;
+ if room.should_remove().await {
+ rooms.remove(&rname);
}
- }
-}
-
-async fn user_disconnected(my_id: usize, users: &Users) {
- eprintln!("good bye user: {}", my_id);
+ })
}