diff options
Diffstat (limited to 'client-native-rift/src/signaling.rs')
-rw-r--r-- | client-native-rift/src/signaling.rs | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/client-native-rift/src/signaling.rs b/client-native-rift/src/signaling.rs index c61b982..2ac3edc 100644 --- a/client-native-rift/src/signaling.rs +++ b/client-native-rift/src/signaling.rs @@ -1,40 +1,59 @@ use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{SinkExt, StreamExt}; +use log::{debug, info}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::tungstenite::{self, Message}; pub async fn signaling_connect( host: &str, secret: &str, ) -> ( - impl SinkExt<ServerboundPacket>, - impl StreamExt<Item = Option<ClientboundPacket>>, + UnboundedSender<ServerboundPacket>, + UnboundedReceiver<ClientboundPacket>, ) { - let (conn, _) = tokio_tungstenite::connect_async( - url::Url::parse(&format!("wss://{host}/signaling/{}", hash(secret))).unwrap(), - ) - .await - .unwrap(); + let uri = format!("wss://{host}/signaling/{}", hash(secret)); + info!("connecting to signaling server at {uri:?}"); + let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) + .await + .unwrap(); + info!("connection established"); - let (tx, rx) = conn.split(); - let prx = rx.map(|mesg| { - let mesg = mesg.unwrap(); - match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - Some(p) - } - tungstenite::Message::Close(_) => { - eprintln!("ws closed :("); - None + let (mut tx, rx) = conn.split(); + + let (in_tx, in_rx) = unbounded_channel(); + let (out_tx, mut out_rx) = unbounded_channel(); + + tokio::spawn(async { + rx.for_each(move |mesg| { + info!("packet in"); + let mesg = mesg.unwrap(); + match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + in_tx.send(p).unwrap() + } + tungstenite::Message::Close(_) => { + eprintln!("ws closed :("); + unreachable!(); + } + _ => (), } - _ => None, - } + Box::pin(async { () }) + }) + .await; }); - let ptx = tx.with(async move |p| { - Ok::<_, tokio_tungstenite::tungstenite::error::Error>(Message::Text( - serde_json::to_string::<ServerboundPacket>(&p).unwrap(), - )) + tokio::spawn(async move { + while let Some(p) = out_rx.recv().await { + debug!(" -> {p:?}"); + tx.send(Message::Text( + serde_json::to_string::<ServerboundPacket>(&p).unwrap(), + )) + .await + .unwrap() + } }); - (ptx, prx) + + (out_tx, in_rx) } |