diff options
Diffstat (limited to 'client-native-lib/src/signaling.rs')
-rw-r--r-- | client-native-lib/src/signaling.rs | 60 |
1 files changed, 37 insertions, 23 deletions
diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index 2ac3edc..ef49692 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -1,7 +1,9 @@ +use std::time::Duration; + use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{SinkExt, StreamExt}; -use log::{debug, info}; +use log::{debug, error, info, warn}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::tungstenite::{self, Message}; @@ -19,40 +21,52 @@ pub async fn signaling_connect( .unwrap(); info!("connection established"); - let (mut tx, rx) = conn.split(); + let (mut tx, mut rx) = conn.split(); let (in_tx, in_rx) = unbounded_channel(); let (out_tx, mut out_rx) = unbounded_channel(); - tokio::spawn(async { - rx.for_each(move |mesg| { - info!("packet in"); - let mesg = mesg.unwrap(); - match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - in_tx.send(p).unwrap() - } - tungstenite::Message::Close(_) => { - eprintln!("ws closed :("); - unreachable!(); - } - _ => (), - } - Box::pin(async { () }) - }) - .await; + let ping_out_tx = out_tx.clone(); + let ping_task = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + ping_out_tx.send(ServerboundPacket::Ping).unwrap() + } }); - tokio::spawn(async move { + + let send_task = tokio::spawn(async move { while let Some(p) = out_rx.recv().await { debug!(" -> {p:?}"); tx.send(Message::Text( serde_json::to_string::<ServerboundPacket>(&p).unwrap(), )) .await - .unwrap() + .unwrap(); + } + }); + let _receive_task = tokio::spawn(async move { + while let Some(mesg) = rx.next().await { + match mesg { + Ok(mesg) => match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + in_tx.send(p).unwrap() + } + tungstenite::Message::Close(e) => { + error!("ws closed :( {e:?}"); + unreachable!(); + } + _ => (), + }, + Err(_) => { + send_task.abort(); + ping_task.abort(); + break; + } + } } + warn!("recv task stopped"); }); (out_tx, in_rx) |