aboutsummaryrefslogtreecommitdiff
path: root/server/src/main.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-09-07 19:17:49 +0200
committermetamuffin <metamuffin@disroot.org>2023-09-07 19:17:49 +0200
commit2d0761b8932f11b01e241e2db3a8f08250efe878 (patch)
tree29814137e2960286d36f3ae4f7c20299eb6d22b0 /server/src/main.rs
parent6f644481f397af070e2b91b69846e375caafdbda (diff)
downloadkeks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar
keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.bz2
keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.zst
new protocol
Diffstat (limited to 'server/src/main.rs')
-rw-r--r--server/src/main.rs52
1 files changed, 28 insertions, 24 deletions
diff --git a/server/src/main.rs b/server/src/main.rs
index 3d0af50..9ea0f94 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -6,26 +6,27 @@
#![feature(lazy_cell)]
pub mod assets;
pub mod config;
+pub mod logic;
pub mod protocol;
-pub mod room;
+pub mod idgen;
use assets::css;
use config::{AppearanceConfig, Config};
+use futures_util::{SinkExt, StreamExt, TryFutureExt};
use hyper::{header, StatusCode};
use listenfd::ListenFd;
use log::{debug, error};
-use room::Room;
-use std::collections::HashMap;
+use logic::State;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
-use tokio::sync::RwLock;
+use tokio::sync::mpsc;
use warp::hyper::Server;
use warp::ws::WebSocket;
-use warp::{reply, Filter, Rejection, Reply};
+use warp::{reply, ws::Message, Filter, Rejection, Reply};
-type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>;
+use crate::protocol::ClientboundPacket;
fn main() {
tokio::runtime::Builder::new_multi_thread()
@@ -43,11 +44,11 @@ async fn run() {
let client_config_json = serde_json::to_string(&config).unwrap();
let client_config_css = css_overrides(&config.appearance);
- let rooms: _ = Rooms::default();
- let rooms: _ = warp::any().map(move || rooms.clone());
+ let state: _ = Arc::new(State::default());
+ let state: _ = warp::any().map(move || state.clone());
- let signaling: _ = warp::path!("signaling" / String)
- .and(rooms)
+ let signaling: _ = warp::path!("signaling")
+ .and(state)
.and(warp::ws())
.map(signaling_connect);
@@ -135,22 +136,25 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
Ok(warp::reply::with_status(json, code))
}
-fn signaling_connect(rsecret: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply {
- async fn inner(sock: WebSocket, rsecret: String, rooms: Rooms) {
+fn signaling_connect(state: Arc<State>, ws: warp::ws::Ws) -> impl Reply {
+ async fn inner(sock: WebSocket, state: Arc<State>) {
debug!("ws upgrade");
- let mut guard = rooms.write().await;
- let room = guard
- .entry(rsecret.clone())
- .or_insert_with(|| Default::default())
- .to_owned();
- drop(guard);
-
- room.client_connect(sock).await;
- if room.should_remove().await {
- rooms.write().await.remove(&rsecret);
- }
+ let (mut user_ws_tx, user_ws_rx) = sock.split();
+ let (tx, mut rx) = mpsc::channel::<ClientboundPacket>(64);
+ tokio::task::spawn(async move {
+ while let Some(packet) = rx.recv().await {
+ debug!(" -> {packet:?}");
+ user_ws_tx
+ .send(Message::text(serde_json::to_string(&packet).unwrap()))
+ .unwrap_or_else(|e| {
+ eprintln!("websocket send error: {}", e);
+ })
+ .await;
+ }
+ });
+ state.connect(user_ws_rx, tx).await;
}
- ws.on_upgrade(move |sock| inner(sock, rsecret, rooms))
+ ws.on_upgrade(move |sock| inner(sock, state))
}
fn css_overrides(