diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-07 11:14:42 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-07 11:14:42 +0200 |
commit | 61950198e3bf06555f48e8f51c882a4c3cce5128 (patch) | |
tree | a7701a44804d4a2a634f3410d400545ea82d1c45 /server/src | |
parent | 832f48f29098cc6f840ade90db3b94efa67c6833 (diff) | |
download | keks-meet-61950198e3bf06555f48e8f51c882a4c3cce5128.tar keks-meet-61950198e3bf06555f48e8f51c882a4c3cce5128.tar.bz2 keks-meet-61950198e3bf06555f48e8f51c882a4c3cce5128.tar.zst |
REFACTOR! pt.1
Diffstat (limited to 'server/src')
-rw-r--r-- | server/src/main.rs | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..1128fb1 --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,118 @@ +use futures_util::{SinkExt, StreamExt, TryFutureExt}; +use listenfd::ListenFd; +use log::error; +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use tokio::sync::{mpsc, RwLock}; +use warp::hyper::Server; +use warp::ws::{Message, WebSocket}; +use warp::Filter; + +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); + +type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>; + +#[tokio::main] +async fn main() { + env_logger::init_from_env("LOG"); + + let users = Users::default(); + let users = warp::any().map(move || users.clone()); + + let signaling = warp::path("signaling") + .and(warp::ws()) + .and(users) + .map(|ws: warp::ws::Ws, users| ws.on_upgrade(move |socket| user_connected(socket, users))); + + let static_routes = { + let index = warp::get() + .and(warp::path!()) + .and(warp::fs::file("../client-web/public/index.html")); + let assets = warp::path("_assets").and(warp::fs::dir("../client-web/public/assets")); + + index + }; + + let routes = static_routes.or(signaling).or(four_oh_four_tm); + + // if listender fd is passed from the outside world, use it. + let mut listenfd = ListenFd::from_env(); + let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() { + Server::from_tcp(l).unwrap() + } else { + Server::bind(&([127, 0, 0, 1], 3030).into()) + }; + let service = warp::service(routes); + server + .serve(hyper::service::make_service_fn(|_| { + let service = service.clone(); + async move { Ok::<_, Infallible>(service) } + })) + .await + .unwrap(); +} + +async fn user_connected(ws: WebSocket, users: Users) { + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("new chat user: {}", my_id); + + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + + let (tx, mut rx) = mpsc::unbounded_channel(); + + tokio::task::spawn(async move { + while let Some(message) = rx.recv().await { + user_ws_tx + .send(message) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + + users.write().await.insert(my_id, tx); + + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + error!("websocket error(uid={my_id}): {e}"); + break; + } + }; + user_message(my_id, msg, &users).await; + } + + users.write().await.remove(&my_id); +} + +async fn user_message(my_id: usize, msg: Message, users: &Users) { + // Skip any non-Text messages... + let msg = if let Ok(s) = msg.to_str() { + s + } else { + return; + }; + + let new_msg = format!("<User#{}>: {}", my_id, msg); + + for (&uid, tx) in users.read().await.iter() { + if my_id != uid { + if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { + // The tx is disconnected, our `user_disconnected` code + // should be happening in another task, nothing more to + // do here. + } + } + } +} + +async fn user_disconnected(my_id: usize, users: &Users) { + eprintln!("good bye user: {}", my_id); +} |