diff options
author | metamuffin <metamuffin@disroot.org> | 2022-10-07 16:50:05 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-10-07 16:50:05 +0200 |
commit | 76bc7e5e8bee9b3994855b071408a1de582d64f3 (patch) | |
tree | 1cbbb8f008410216caa9499f43e03d08c269dec3 /client-native-lib/src/signaling.rs | |
parent | c8406ff6fe2479acb2cf5a80bb18d0c79e51fb9c (diff) | |
download | keks-meet-76bc7e5e8bee9b3994855b071408a1de582d64f3.tar keks-meet-76bc7e5e8bee9b3994855b071408a1de582d64f3.tar.bz2 keks-meet-76bc7e5e8bee9b3994855b071408a1de582d64f3.tar.zst |
lib cleanup
Diffstat (limited to 'client-native-lib/src/signaling.rs')
-rw-r--r-- | client-native-lib/src/signaling.rs | 112 |
1 files changed, 52 insertions, 60 deletions
diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index cbbd063..8f21d85 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -3,76 +3,68 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2022 metamuffin <metamuffin@disroot.org> */ -use std::time::Duration; - use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{Sink, SinkExt, Stream, StreamExt}; use log::{debug, error, info, warn}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use std::pin::Pin; +use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::{self, Message}; -pub async fn signaling_connect( - signaling_server: &str, - secret: &str, -) -> ( - UnboundedSender<ServerboundPacket>, - UnboundedReceiver<ClientboundPacket>, -) { - let uri = format!("{signaling_server}/signaling/{}", hash(secret)); - info!("connecting to signaling server at {uri:?}"); - let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) - .await - .unwrap(); - info!("connection established"); - - let (mut tx, mut rx) = conn.split(); +pub struct SignalingConnection { + pub send: RwLock< + Pin< + Box< + dyn Sink<ServerboundPacket, Error = tokio_tungstenite::tungstenite::Error> + + Send + + Sync + + 'static, + >, + >, + >, + pub recv: RwLock<Pin<Box<dyn Stream<Item = ClientboundPacket> + Send + Sync + 'static>>>, +} - let (in_tx, in_rx) = unbounded_channel(); - let (out_tx, mut out_rx) = unbounded_channel(); +impl SignalingConnection { + pub async fn new(signaling_server: &str, secret: &str) -> Self { + let uri = format!("{signaling_server}/signaling/{}", hash(secret)); + info!("connecting to signaling server at {uri:?}"); + let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) + .await + .unwrap(); + info!("connection established"); - let ping_out_tx = out_tx.clone(); - let ping_task = tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(30)).await; - ping_out_tx.send(ServerboundPacket::Ping).unwrap() - } - }); + let (tx, rx): (_, _) = conn.split(); - let send_task = tokio::spawn(async move { - while let Some(p) = out_rx.recv().await { - debug!(" -> {p:?}"); - tx.send(Message::Text( - serde_json::to_string::<ServerboundPacket>(&p).unwrap(), + let tx = tx.with(async move |packet: ServerboundPacket| { + debug!(" -> {packet:?}"); + Ok::<_, _>(Message::Text( + serde_json::to_string::<ServerboundPacket>(&packet).unwrap(), )) - .await - .unwrap(); - } - }); - let _receive_task = tokio::spawn(async move { - while let Some(mesg) = rx.next().await { - match mesg { - Ok(mesg) => match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - in_tx.send(p).unwrap() - } - tungstenite::Message::Close(e) => { - error!("ws closed :( {e:?}"); - unreachable!(); - } - _ => (), - }, - Err(_) => { - send_task.abort(); - ping_task.abort(); - break; + }); + + let rx = rx.filter_map(async move |mesg| match mesg { + Ok(mesg) => match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + Some(p) } + tungstenite::Message::Close(e) => { + error!("ws closed {e:?}"); + None + } + _ => None, + }, + Err(e) => { + warn!("websocket error: {e}"); + None } - } - warn!("recv task stopped"); - }); + }); - (out_tx, in_rx) + Self { + recv: RwLock::new(Box::pin(rx)), + send: RwLock::new(Box::pin(tx)), + } + } } |