From 77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sat, 15 Oct 2022 12:37:28 +0200 Subject: sending files works --- client-native-lib/src/instance.rs | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) (limited to 'client-native-lib/src/instance.rs') diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs index 162241d..6303ba7 100644 --- a/client-native-lib/src/instance.rs +++ b/client-native-lib/src/instance.rs @@ -13,7 +13,7 @@ use crate::{ }; use futures_util::{SinkExt, StreamExt}; use log::{debug, info, warn}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::RwLock; use webrtc::api::API; @@ -22,8 +22,8 @@ pub struct Instance { pub conn: SignalingConnection, pub config: Config, pub api: API, - pub key: Key, - local_resources: RwLock>>, + key: Key, + pub local_resources: RwLock>>, my_id: RwLock>, pub peers: RwLock>>, } @@ -48,7 +48,9 @@ impl Instance { let blub = self.clone(); tokio::spawn(async move { loop { - blub.ping(); + blub.ping().await; + debug!("ping"); + tokio::time::sleep(Duration::from_secs(30)).await; } }); } @@ -59,7 +61,8 @@ impl Instance { .write() .await .send(ServerboundPacket::Ping) - .await; + .await + .unwrap(); } pub async fn my_id(&self) -> usize { @@ -68,7 +71,6 @@ impl Instance { pub async fn receive_loop(self: Arc) { while let Some(packet) = self.conn.recv.write().await.next().await { - debug!("{packet:?}"); let inst = self.clone(); inst.on_message(packet).await } @@ -86,10 +88,7 @@ impl Instance { } else { let peer = Peer::create(self.clone(), id).await; self.peers.write().await.insert(id, peer.clone()); - peer.send_relay(RelayMessage::Identify { - username: self.config.username.clone(), - }) - .await; + peer.init_remote().await; self.event_handler.peer_join(peer).await; } } @@ -101,13 +100,18 @@ impl Instance { protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); let p = serde_json::from_str::(&message).unwrap(); - self.on_relay(sender, p.inner).await; + if p.sender == sender { + self.on_relay(sender, p.inner).await; + } else { + warn!("dropping packet with inconsistent sender") + } } } } pub async fn on_relay(&self, sender: usize, p: RelayMessage) { - if let Some(peer) = self.peers.read().await.get(&sender).cloned() { + debug!("(relay) <- ({sender}) {p:?}"); + if let Some(peer) = self.peers.read().await.get(&sender) { peer.on_relay(p).await } else { warn!("got a packet from a non-existent peer") @@ -115,6 +119,7 @@ impl Instance { } pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { + debug!("(relay) -> ({recipient}) {inner:?}"); self.conn .send .write() @@ -134,8 +139,8 @@ impl Instance { } pub async fn add_local_resource(&self, res: Box) { - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::Provide(res.info())); + for (_pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::Provide(res.info())).await; } self.local_resources .write() @@ -144,8 +149,9 @@ impl Instance { } pub async fn remove_local_resource(&self, id: String) { self.local_resources.write().await.remove(&id); - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }); + for (_pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }) + .await; } } } -- cgit v1.2.3-70-g09d2