summaryrefslogtreecommitdiff
path: root/client-native-lib/src/instance.rs
diff options
context:
space:
mode:
Diffstat (limited to 'client-native-lib/src/instance.rs')
-rw-r--r--client-native-lib/src/instance.rs38
1 files changed, 22 insertions, 16 deletions
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs
index 162241d..6303ba7 100644
--- a/client-native-lib/src/instance.rs
+++ b/client-native-lib/src/instance.rs
@@ -13,7 +13,7 @@ use crate::{
};
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use webrtc::api::API;
@@ -22,8 +22,8 @@ pub struct Instance {
pub conn: SignalingConnection,
pub config: Config,
pub api: API,
- pub key: Key,
- local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
+ key: Key,
+ pub local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
my_id: RwLock<Option<usize>>,
pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
}
@@ -48,7 +48,9 @@ impl Instance {
let blub = self.clone();
tokio::spawn(async move {
loop {
- blub.ping();
+ blub.ping().await;
+ debug!("ping");
+ tokio::time::sleep(Duration::from_secs(30)).await;
}
});
}
@@ -59,7 +61,8 @@ impl Instance {
.write()
.await
.send(ServerboundPacket::Ping)
- .await;
+ .await
+ .unwrap();
}
pub async fn my_id(&self) -> usize {
@@ -68,7 +71,6 @@ impl Instance {
pub async fn receive_loop(self: Arc<Self>) {
while let Some(packet) = self.conn.recv.write().await.next().await {
- debug!("{packet:?}");
let inst = self.clone();
inst.on_message(packet).await
}
@@ -86,10 +88,7 @@ impl Instance {
} else {
let peer = Peer::create(self.clone(), id).await;
self.peers.write().await.insert(id, peer.clone());
- peer.send_relay(RelayMessage::Identify {
- username: self.config.username.clone(),
- })
- .await;
+ peer.init_remote().await;
self.event_handler.peer_join(peer).await;
}
}
@@ -101,13 +100,18 @@ impl Instance {
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
- self.on_relay(sender, p.inner).await;
+ if p.sender == sender {
+ self.on_relay(sender, p.inner).await;
+ } else {
+ warn!("dropping packet with inconsistent sender")
+ }
}
}
}
pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
- if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
+ debug!("(relay) <- ({sender}) {p:?}");
+ if let Some(peer) = self.peers.read().await.get(&sender) {
peer.on_relay(p).await
} else {
warn!("got a packet from a non-existent peer")
@@ -115,6 +119,7 @@ impl Instance {
}
pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) {
+ debug!("(relay) -> ({recipient}) {inner:?}");
self.conn
.send
.write()
@@ -134,8 +139,8 @@ impl Instance {
}
pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) {
- for (pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::Provide(res.info()));
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::Provide(res.info())).await;
}
self.local_resources
.write()
@@ -144,8 +149,9 @@ impl Instance {
}
pub async fn remove_local_resource(&self, id: String) {
self.local_resources.write().await.remove(&id);
- for (pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::ProvideStop { id: id.clone() });
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::ProvideStop { id: id.clone() })
+ .await;
}
}
}