diff options
author | metamuffin <metamuffin@disroot.org> | 2022-10-15 12:37:28 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-10-15 12:37:28 +0200 |
commit | 77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 (patch) | |
tree | 5028a357c4cae08824d1d402c6561121be531329 /client-native-lib | |
parent | d081461dd7fe2a6db94b196324bc485c10a77c7a (diff) | |
download | keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.bz2 keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.zst |
sending files works
Diffstat (limited to 'client-native-lib')
-rw-r--r-- | client-native-lib/src/instance.rs | 38 | ||||
-rw-r--r-- | client-native-lib/src/lib.rs | 7 | ||||
-rw-r--r-- | client-native-lib/src/peer.rs | 40 | ||||
-rw-r--r-- | client-native-lib/src/signaling.rs | 18 |
4 files changed, 67 insertions, 36 deletions
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs index 162241d..6303ba7 100644 --- a/client-native-lib/src/instance.rs +++ b/client-native-lib/src/instance.rs @@ -13,7 +13,7 @@ use crate::{ }; use futures_util::{SinkExt, StreamExt}; use log::{debug, info, warn}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::RwLock; use webrtc::api::API; @@ -22,8 +22,8 @@ pub struct Instance { pub conn: SignalingConnection, pub config: Config, pub api: API, - pub key: Key, - local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>, + key: Key, + pub local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>, my_id: RwLock<Option<usize>>, pub peers: RwLock<HashMap<usize, Arc<Peer>>>, } @@ -48,7 +48,9 @@ impl Instance { let blub = self.clone(); tokio::spawn(async move { loop { - blub.ping(); + blub.ping().await; + debug!("ping"); + tokio::time::sleep(Duration::from_secs(30)).await; } }); } @@ -59,7 +61,8 @@ impl Instance { .write() .await .send(ServerboundPacket::Ping) - .await; + .await + .unwrap(); } pub async fn my_id(&self) -> usize { @@ -68,7 +71,6 @@ impl Instance { pub async fn receive_loop(self: Arc<Self>) { while let Some(packet) = self.conn.recv.write().await.next().await { - debug!("{packet:?}"); let inst = self.clone(); inst.on_message(packet).await } @@ -86,10 +88,7 @@ impl Instance { } else { let peer = Peer::create(self.clone(), id).await; self.peers.write().await.insert(id, peer.clone()); - peer.send_relay(RelayMessage::Identify { - username: self.config.username.clone(), - }) - .await; + peer.init_remote().await; self.event_handler.peer_join(peer).await; } } @@ -101,13 +100,18 @@ impl Instance { protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); - self.on_relay(sender, p.inner).await; + if p.sender == sender { + self.on_relay(sender, p.inner).await; + } else { + warn!("dropping packet with inconsistent sender") + } } } } pub async fn on_relay(&self, sender: usize, p: RelayMessage) { - if let Some(peer) = self.peers.read().await.get(&sender).cloned() { + debug!("(relay) <- ({sender}) {p:?}"); + if let Some(peer) = self.peers.read().await.get(&sender) { peer.on_relay(p).await } else { warn!("got a packet from a non-existent peer") @@ -115,6 +119,7 @@ impl Instance { } pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { + debug!("(relay) -> ({recipient}) {inner:?}"); self.conn .send .write() @@ -134,8 +139,8 @@ impl Instance { } pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) { - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::Provide(res.info())); + for (_pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::Provide(res.info())).await; } self.local_resources .write() @@ -144,8 +149,9 @@ impl Instance { } pub async fn remove_local_resource(&self, id: String) { self.local_resources.write().await.remove(&id); - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }); + for (_pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }) + .await; } } } diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index a10a20a..ed434eb 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -5,15 +5,12 @@ */ #![feature(async_closure)] #![feature(box_syntax)] -#![feature(async_fn_in_trait)] - -use std::{pin::Pin, sync::Arc}; +// #![feature(async_fn_in_trait)] use futures_util::Future; -use instance::Instance; use peer::{Peer, TransportChannel}; use protocol::ProvideInfo; -use tokio::sync::RwLock; +use std::{pin::Pin, sync::Arc}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index 36b1754..fa4de25 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -6,12 +6,12 @@ use crate::{ instance::Instance, protocol::{self, ProvideInfo, RelayMessage, Sdp}, + LocalResource, }; use log::{info, warn}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use webrtc::{ - data::data_channel::DataChannel, data_channel::RTCDataChannel, ice_transport::{ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, @@ -27,7 +27,7 @@ use webrtc::{ pub struct Peer { pub inst: Arc<Instance>, pub peer_connection: RTCPeerConnection, - pub resources_provided: RwLock<HashMap<String, ProvideInfo>>, + pub remote_provided: RwLock<HashMap<String, ProvideInfo>>, pub id: usize, } @@ -42,6 +42,7 @@ pub struct Peer { // Connected(Arc<TransportChannel>), // AwaitDisconnect, // } + pub enum TransportChannel { Track(Arc<TrackRemote>), DataChannel(Arc<RTCDataChannel>), @@ -60,7 +61,7 @@ impl Peer { let peer_connection = inst.api.new_peer_connection(config).await.unwrap(); let peer = Arc::new(Self { - resources_provided: Default::default(), + remote_provided: Default::default(), inst: inst.clone(), peer_connection, id, @@ -103,7 +104,7 @@ impl Peer { let peer = weak.upgrade().unwrap(); Box::pin(async move { if let Some(res) = peer - .resources_provided + .remote_provided .read() .await .get(&dc.label().to_string()) @@ -119,7 +120,7 @@ impl Peer { .await; } else { warn!("got unassociated data channel; closed connection"); - dc.close().await; + dc.close().await.unwrap(); } }) }) @@ -128,6 +129,16 @@ impl Peer { peer } + pub async fn init_remote(&self) { + self.send_relay(RelayMessage::Identify { + username: self.inst.config.username.clone(), + }) + .await; + for res in self.inst.local_resources.read().await.values() { + self.send_relay(RelayMessage::Provide(res.info())).await; + } + } + pub async fn request_resource(&self, id: String) { self.send_relay(RelayMessage::Request { id }).await; } @@ -139,7 +150,7 @@ impl Peer { self.inst.send_relay(self.id, inner).await } - pub async fn on_relay(self: Arc<Self>, p: RelayMessage) { + pub async fn on_relay(self: &Arc<Self>, p: RelayMessage) { match p { RelayMessage::Offer(o) => self.on_offer(o).await, RelayMessage::Answer(a) => self.on_answer(a).await, @@ -149,7 +160,7 @@ impl Peer { "remote resource provided: ({:?}) {:?} {:?}", info.id, info.kind, info.label ); - self.resources_provided + self.remote_provided .write() .await .insert(info.id.clone(), info.clone()); @@ -160,13 +171,24 @@ impl Peer { } RelayMessage::ProvideStop { id } => { info!("remote resource removed: ({:?}) ", id); - self.resources_provided.write().await.remove(&id); + self.remote_provided.write().await.remove(&id); self.inst .event_handler .resource_removed(self.clone(), id) .await; } - _ => (), + RelayMessage::Chat(_) => (), + RelayMessage::Identify { username } => { + info!("peer {} is known as {username:?}", self.id) + } + RelayMessage::Request { id } => { + if let Some(res) = self.inst.local_resources.read().await.get(&id) { + res.on_request(self.clone()).await; + } else { + warn!("({}) requested unknown local resource", self.id) + } + } + RelayMessage::RequestStop { id } => {} } } diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index 8f21d85..318ed7d 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -6,7 +6,7 @@ use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{Sink, SinkExt, Stream, StreamExt}; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace}; use std::pin::Pin; use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::{self, Message}; @@ -37,7 +37,10 @@ impl SignalingConnection { let (tx, rx): (_, _) = conn.split(); let tx = tx.with(async move |packet: ServerboundPacket| { - debug!(" -> {packet:?}"); + match packet { + ServerboundPacket::Relay { .. } => trace!(" -> {packet:?}"), + _ => debug!(" -> {packet:?}"), + } Ok::<_, _>(Message::Text( serde_json::to_string::<ServerboundPacket>(&packet).unwrap(), )) @@ -46,9 +49,12 @@ impl SignalingConnection { let rx = rx.filter_map(async move |mesg| match mesg { Ok(mesg) => match mesg { tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - Some(p) + let packet: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + match packet { + ClientboundPacket::Message { .. } => trace!(" <- {packet:?}"), + _ => debug!(" <- {packet:?}"), + } + Some(packet) } tungstenite::Message::Close(e) => { error!("ws closed {e:?}"); @@ -57,7 +63,7 @@ impl SignalingConnection { _ => None, }, Err(e) => { - warn!("websocket error: {e}"); + error!("websocket error: {e}"); None } }); |