diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-15 19:08:08 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-15 19:08:08 +0200 |
commit | 1286385698c4d09876abf29cb5ed595f7cfe1a8f (patch) | |
tree | 3adbe53a1393be941c7459c802f7238c5d0e2d64 /client-native-lib/src | |
parent | 7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30 (diff) | |
download | keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.bz2 keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.zst |
rift works.
Diffstat (limited to 'client-native-lib/src')
-rw-r--r-- | client-native-lib/src/peer.rs | 16 | ||||
-rw-r--r-- | client-native-lib/src/signaling.rs | 60 | ||||
-rw-r--r-- | client-native-lib/src/state.rs | 4 |
3 files changed, 48 insertions, 32 deletions
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index 280cc06..d6ca308 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -48,10 +48,10 @@ impl Peer { .await; { - let peer2 = peer.clone(); + let weak = Arc::<Peer>::downgrade(&peer); peer.peer_connection .on_ice_candidate(box move |c| { - let peer = peer2.clone(); + let peer = weak.upgrade().unwrap(); Box::pin(async move { if let Some(c) = c { peer.on_ice_candidate(c).await @@ -62,10 +62,10 @@ impl Peer { } { - let peer2 = peer.clone(); + let weak = Arc::<Peer>::downgrade(&peer); peer.peer_connection .on_negotiation_needed(box move || { - let peer = peer2.clone(); + let peer = weak.upgrade().unwrap(); Box::pin(async { peer.on_negotiation_needed().await }) }) .await; @@ -114,7 +114,7 @@ impl Peer { } pub async fn offer(&self) { - info!("sending offer"); + info!("({}) sending offer", self.id); let offer = self.peer_connection.create_offer(None).await.unwrap(); self.peer_connection .set_local_description(offer.clone()) @@ -127,7 +127,7 @@ impl Peer { .await } pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) { - info!("received offer"); + info!("({}) received offer", self.id); let offer = RTCSessionDescription::offer(offer.sdp).unwrap(); self.peer_connection .set_remote_description(offer) @@ -136,7 +136,7 @@ impl Peer { self.answer().await } pub async fn answer(&self) { - info!("sending answer"); + info!("({}) sending answer", self.id); let offer = self.peer_connection.create_answer(None).await.unwrap(); self.peer_connection .set_local_description(offer.clone()) @@ -149,7 +149,7 @@ impl Peer { .await } pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) { - info!("received answer"); + info!("({}) received answer", self.id); let offer = RTCSessionDescription::answer(answer.sdp).unwrap(); self.peer_connection .set_remote_description(offer) diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index 2ac3edc..ef49692 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -1,7 +1,9 @@ +use std::time::Duration; + use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{SinkExt, StreamExt}; -use log::{debug, info}; +use log::{debug, error, info, warn}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::tungstenite::{self, Message}; @@ -19,40 +21,52 @@ pub async fn signaling_connect( .unwrap(); info!("connection established"); - let (mut tx, rx) = conn.split(); + let (mut tx, mut rx) = conn.split(); let (in_tx, in_rx) = unbounded_channel(); let (out_tx, mut out_rx) = unbounded_channel(); - tokio::spawn(async { - rx.for_each(move |mesg| { - info!("packet in"); - let mesg = mesg.unwrap(); - match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - in_tx.send(p).unwrap() - } - tungstenite::Message::Close(_) => { - eprintln!("ws closed :("); - unreachable!(); - } - _ => (), - } - Box::pin(async { () }) - }) - .await; + let ping_out_tx = out_tx.clone(); + let ping_task = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + ping_out_tx.send(ServerboundPacket::Ping).unwrap() + } }); - tokio::spawn(async move { + + let send_task = tokio::spawn(async move { while let Some(p) = out_rx.recv().await { debug!(" -> {p:?}"); tx.send(Message::Text( serde_json::to_string::<ServerboundPacket>(&p).unwrap(), )) .await - .unwrap() + .unwrap(); + } + }); + let _receive_task = tokio::spawn(async move { + while let Some(mesg) = rx.next().await { + match mesg { + Ok(mesg) => match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + in_tx.send(p).unwrap() + } + tungstenite::Message::Close(e) => { + error!("ws closed :( {e:?}"); + unreachable!(); + } + _ => (), + }, + Err(_) => { + send_task.abort(); + ping_task.abort(); + break; + } + } } + warn!("recv task stopped"); }); (out_tx, in_rx) diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs index c5e9365..8501c3d 100644 --- a/client-native-lib/src/state.rs +++ b/client-native-lib/src/state.rs @@ -57,7 +57,9 @@ impl<P: HasPeer, I: PeerInit<P>> State<P, I> { ); } } - protocol::ClientboundPacket::ClientLeave { id: _ } => {} + protocol::ClientboundPacket::ClientLeave { id } => { + self.peers.write().await.remove(&id); + } protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); |