aboutsummaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-07 11:14:42 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-07 11:14:42 +0200
commit61950198e3bf06555f48e8f51c882a4c3cce5128 (patch)
treea7701a44804d4a2a634f3410d400545ea82d1c45 /server/src
parent832f48f29098cc6f840ade90db3b94efa67c6833 (diff)
downloadkeks-meet-61950198e3bf06555f48e8f51c882a4c3cce5128.tar
keks-meet-61950198e3bf06555f48e8f51c882a4c3cce5128.tar.bz2
keks-meet-61950198e3bf06555f48e8f51c882a4c3cce5128.tar.zst
REFACTOR! pt.1
Diffstat (limited to 'server/src')
-rw-r--r--server/src/main.rs118
1 files changed, 118 insertions, 0 deletions
diff --git a/server/src/main.rs b/server/src/main.rs
new file mode 100644
index 0000000..1128fb1
--- /dev/null
+++ b/server/src/main.rs
@@ -0,0 +1,118 @@
+use futures_util::{SinkExt, StreamExt, TryFutureExt};
+use listenfd::ListenFd;
+use log::error;
+use std::collections::HashMap;
+use std::convert::Infallible;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+use tokio::sync::{mpsc, RwLock};
+use warp::hyper::Server;
+use warp::ws::{Message, WebSocket};
+use warp::Filter;
+
+static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
+
+type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
+
+#[tokio::main]
+async fn main() {
+ env_logger::init_from_env("LOG");
+
+ let users = Users::default();
+ let users = warp::any().map(move || users.clone());
+
+ let signaling = warp::path("signaling")
+ .and(warp::ws())
+ .and(users)
+ .map(|ws: warp::ws::Ws, users| ws.on_upgrade(move |socket| user_connected(socket, users)));
+
+ let static_routes = {
+ let index = warp::get()
+ .and(warp::path!())
+ .and(warp::fs::file("../client-web/public/index.html"));
+ let assets = warp::path("_assets").and(warp::fs::dir("../client-web/public/assets"));
+
+ index
+ };
+
+ let routes = static_routes.or(signaling).or(four_oh_four_tm);
+
+ // if listender fd is passed from the outside world, use it.
+ let mut listenfd = ListenFd::from_env();
+ let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() {
+ Server::from_tcp(l).unwrap()
+ } else {
+ Server::bind(&([127, 0, 0, 1], 3030).into())
+ };
+ let service = warp::service(routes);
+ server
+ .serve(hyper::service::make_service_fn(|_| {
+ let service = service.clone();
+ async move { Ok::<_, Infallible>(service) }
+ }))
+ .await
+ .unwrap();
+}
+
+async fn user_connected(ws: WebSocket, users: Users) {
+ let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
+
+ eprintln!("new chat user: {}", my_id);
+
+ let (mut user_ws_tx, mut user_ws_rx) = ws.split();
+
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ tokio::task::spawn(async move {
+ while let Some(message) = rx.recv().await {
+ user_ws_tx
+ .send(message)
+ .unwrap_or_else(|e| {
+ eprintln!("websocket send error: {}", e);
+ })
+ .await;
+ }
+ });
+
+ users.write().await.insert(my_id, tx);
+
+ while let Some(result) = user_ws_rx.next().await {
+ let msg = match result {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("websocket error(uid={my_id}): {e}");
+ break;
+ }
+ };
+ user_message(my_id, msg, &users).await;
+ }
+
+ users.write().await.remove(&my_id);
+}
+
+async fn user_message(my_id: usize, msg: Message, users: &Users) {
+ // Skip any non-Text messages...
+ let msg = if let Ok(s) = msg.to_str() {
+ s
+ } else {
+ return;
+ };
+
+ let new_msg = format!("<User#{}>: {}", my_id, msg);
+
+ for (&uid, tx) in users.read().await.iter() {
+ if my_id != uid {
+ if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
+ // The tx is disconnected, our `user_disconnected` code
+ // should be happening in another task, nothing more to
+ // do here.
+ }
+ }
+ }
+}
+
+async fn user_disconnected(my_id: usize, users: &Users) {
+ eprintln!("good bye user: {}", my_id);
+}