/* This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2022 metamuffin */ use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{Sink, SinkExt, Stream, StreamExt}; use log::{debug, error, info, warn}; use std::pin::Pin; use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::{self, Message}; pub struct SignalingConnection { pub send: RwLock< Pin< Box< dyn Sink + Send + Sync + 'static, >, >, >, pub recv: RwLock + Send + Sync + 'static>>>, } impl SignalingConnection { pub async fn new(signaling_server: &str, secret: &str) -> Self { let uri = format!("{signaling_server}/signaling/{}", hash(secret)); info!("connecting to signaling server at {uri:?}"); let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) .await .unwrap(); info!("connection established"); let (tx, rx): (_, _) = conn.split(); let tx = tx.with(async move |packet: ServerboundPacket| { debug!(" -> {packet:?}"); Ok::<_, _>(Message::Text( serde_json::to_string::(&packet).unwrap(), )) }); let rx = rx.filter_map(async move |mesg| match mesg { Ok(mesg) => match mesg { tungstenite::Message::Text(t) => { let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); debug!("<- {p:?}"); Some(p) } tungstenite::Message::Close(e) => { error!("ws closed {e:?}"); None } _ => None, }, Err(e) => { warn!("websocket error: {e}"); None } }); Self { recv: RwLock::new(Box::pin(rx)), send: RwLock::new(Box::pin(tx)), } } }