diff options
author | metamuffin <metamuffin@disroot.org> | 2023-09-07 19:17:49 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-09-07 19:17:49 +0200 |
commit | 2d0761b8932f11b01e241e2db3a8f08250efe878 (patch) | |
tree | 29814137e2960286d36f3ae4f7c20299eb6d22b0 /server/src/main.rs | |
parent | 6f644481f397af070e2b91b69846e375caafdbda (diff) | |
download | keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.bz2 keks-meet-2d0761b8932f11b01e241e2db3a8f08250efe878.tar.zst |
new protocol
Diffstat (limited to 'server/src/main.rs')
-rw-r--r-- | server/src/main.rs | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/server/src/main.rs b/server/src/main.rs index 3d0af50..9ea0f94 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,26 +6,27 @@ #![feature(lazy_cell)] pub mod assets; pub mod config; +pub mod logic; pub mod protocol; -pub mod room; +pub mod idgen; use assets::css; use config::{AppearanceConfig, Config}; +use futures_util::{SinkExt, StreamExt, TryFutureExt}; use hyper::{header, StatusCode}; use listenfd::ListenFd; use log::{debug, error}; -use room::Room; -use std::collections::HashMap; +use logic::State; use std::convert::Infallible; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::mpsc; use warp::hyper::Server; use warp::ws::WebSocket; -use warp::{reply, Filter, Rejection, Reply}; +use warp::{reply, ws::Message, Filter, Rejection, Reply}; -type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>; +use crate::protocol::ClientboundPacket; fn main() { tokio::runtime::Builder::new_multi_thread() @@ -43,11 +44,11 @@ async fn run() { let client_config_json = serde_json::to_string(&config).unwrap(); let client_config_css = css_overrides(&config.appearance); - let rooms: _ = Rooms::default(); - let rooms: _ = warp::any().map(move || rooms.clone()); + let state: _ = Arc::new(State::default()); + let state: _ = warp::any().map(move || state.clone()); - let signaling: _ = warp::path!("signaling" / String) - .and(rooms) + let signaling: _ = warp::path!("signaling") + .and(state) .and(warp::ws()) .map(signaling_connect); @@ -135,22 +136,25 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> { Ok(warp::reply::with_status(json, code)) } -fn signaling_connect(rsecret: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply { - async fn inner(sock: WebSocket, rsecret: String, rooms: Rooms) { +fn signaling_connect(state: Arc<State>, ws: warp::ws::Ws) -> impl Reply { + async fn inner(sock: WebSocket, state: Arc<State>) { debug!("ws upgrade"); - let mut guard = rooms.write().await; - let room = guard - .entry(rsecret.clone()) - .or_insert_with(|| Default::default()) - .to_owned(); - drop(guard); - - room.client_connect(sock).await; - if room.should_remove().await { - rooms.write().await.remove(&rsecret); - } + let (mut user_ws_tx, user_ws_rx) = sock.split(); + let (tx, mut rx) = mpsc::channel::<ClientboundPacket>(64); + tokio::task::spawn(async move { + while let Some(packet) = rx.recv().await { + debug!(" -> {packet:?}"); + user_ws_tx + .send(Message::text(serde_json::to_string(&packet).unwrap())) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + state.connect(user_ws_rx, tx).await; } - ws.on_upgrade(move |sock| inner(sock, rsecret, rooms)) + ws.on_upgrade(move |sock| inner(sock, state)) } fn css_overrides( |