aboutsummaryrefslogtreecommitdiff
path: root/client-native-lib
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-10-15 12:37:28 +0200
committermetamuffin <metamuffin@disroot.org>2022-10-15 12:37:28 +0200
commit77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 (patch)
tree5028a357c4cae08824d1d402c6561121be531329 /client-native-lib
parentd081461dd7fe2a6db94b196324bc485c10a77c7a (diff)
downloadkeks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar
keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.bz2
keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.zst
sending files works
Diffstat (limited to 'client-native-lib')
-rw-r--r--client-native-lib/src/instance.rs38
-rw-r--r--client-native-lib/src/lib.rs7
-rw-r--r--client-native-lib/src/peer.rs40
-rw-r--r--client-native-lib/src/signaling.rs18
4 files changed, 67 insertions, 36 deletions
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs
index 162241d..6303ba7 100644
--- a/client-native-lib/src/instance.rs
+++ b/client-native-lib/src/instance.rs
@@ -13,7 +13,7 @@ use crate::{
};
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use webrtc::api::API;
@@ -22,8 +22,8 @@ pub struct Instance {
pub conn: SignalingConnection,
pub config: Config,
pub api: API,
- pub key: Key,
- local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
+ key: Key,
+ pub local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
my_id: RwLock<Option<usize>>,
pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
}
@@ -48,7 +48,9 @@ impl Instance {
let blub = self.clone();
tokio::spawn(async move {
loop {
- blub.ping();
+ blub.ping().await;
+ debug!("ping");
+ tokio::time::sleep(Duration::from_secs(30)).await;
}
});
}
@@ -59,7 +61,8 @@ impl Instance {
.write()
.await
.send(ServerboundPacket::Ping)
- .await;
+ .await
+ .unwrap();
}
pub async fn my_id(&self) -> usize {
@@ -68,7 +71,6 @@ impl Instance {
pub async fn receive_loop(self: Arc<Self>) {
while let Some(packet) = self.conn.recv.write().await.next().await {
- debug!("{packet:?}");
let inst = self.clone();
inst.on_message(packet).await
}
@@ -86,10 +88,7 @@ impl Instance {
} else {
let peer = Peer::create(self.clone(), id).await;
self.peers.write().await.insert(id, peer.clone());
- peer.send_relay(RelayMessage::Identify {
- username: self.config.username.clone(),
- })
- .await;
+ peer.init_remote().await;
self.event_handler.peer_join(peer).await;
}
}
@@ -101,13 +100,18 @@ impl Instance {
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
- self.on_relay(sender, p.inner).await;
+ if p.sender == sender {
+ self.on_relay(sender, p.inner).await;
+ } else {
+ warn!("dropping packet with inconsistent sender")
+ }
}
}
}
pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
- if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
+ debug!("(relay) <- ({sender}) {p:?}");
+ if let Some(peer) = self.peers.read().await.get(&sender) {
peer.on_relay(p).await
} else {
warn!("got a packet from a non-existent peer")
@@ -115,6 +119,7 @@ impl Instance {
}
pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) {
+ debug!("(relay) -> ({recipient}) {inner:?}");
self.conn
.send
.write()
@@ -134,8 +139,8 @@ impl Instance {
}
pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) {
- for (pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::Provide(res.info()));
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::Provide(res.info())).await;
}
self.local_resources
.write()
@@ -144,8 +149,9 @@ impl Instance {
}
pub async fn remove_local_resource(&self, id: String) {
self.local_resources.write().await.remove(&id);
- for (pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::ProvideStop { id: id.clone() });
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::ProvideStop { id: id.clone() })
+ .await;
}
}
}
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index a10a20a..ed434eb 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -5,15 +5,12 @@
*/
#![feature(async_closure)]
#![feature(box_syntax)]
-#![feature(async_fn_in_trait)]
-
-use std::{pin::Pin, sync::Arc};
+// #![feature(async_fn_in_trait)]
use futures_util::Future;
-use instance::Instance;
use peer::{Peer, TransportChannel};
use protocol::ProvideInfo;
-use tokio::sync::RwLock;
+use std::{pin::Pin, sync::Arc};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index 36b1754..fa4de25 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -6,12 +6,12 @@
use crate::{
instance::Instance,
protocol::{self, ProvideInfo, RelayMessage, Sdp},
+ LocalResource,
};
use log::{info, warn};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use webrtc::{
- data::data_channel::DataChannel,
data_channel::RTCDataChannel,
ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
@@ -27,7 +27,7 @@ use webrtc::{
pub struct Peer {
pub inst: Arc<Instance>,
pub peer_connection: RTCPeerConnection,
- pub resources_provided: RwLock<HashMap<String, ProvideInfo>>,
+ pub remote_provided: RwLock<HashMap<String, ProvideInfo>>,
pub id: usize,
}
@@ -42,6 +42,7 @@ pub struct Peer {
// Connected(Arc<TransportChannel>),
// AwaitDisconnect,
// }
+
pub enum TransportChannel {
Track(Arc<TrackRemote>),
DataChannel(Arc<RTCDataChannel>),
@@ -60,7 +61,7 @@ impl Peer {
let peer_connection = inst.api.new_peer_connection(config).await.unwrap();
let peer = Arc::new(Self {
- resources_provided: Default::default(),
+ remote_provided: Default::default(),
inst: inst.clone(),
peer_connection,
id,
@@ -103,7 +104,7 @@ impl Peer {
let peer = weak.upgrade().unwrap();
Box::pin(async move {
if let Some(res) = peer
- .resources_provided
+ .remote_provided
.read()
.await
.get(&dc.label().to_string())
@@ -119,7 +120,7 @@ impl Peer {
.await;
} else {
warn!("got unassociated data channel; closed connection");
- dc.close().await;
+ dc.close().await.unwrap();
}
})
})
@@ -128,6 +129,16 @@ impl Peer {
peer
}
+ pub async fn init_remote(&self) {
+ self.send_relay(RelayMessage::Identify {
+ username: self.inst.config.username.clone(),
+ })
+ .await;
+ for res in self.inst.local_resources.read().await.values() {
+ self.send_relay(RelayMessage::Provide(res.info())).await;
+ }
+ }
+
pub async fn request_resource(&self, id: String) {
self.send_relay(RelayMessage::Request { id }).await;
}
@@ -139,7 +150,7 @@ impl Peer {
self.inst.send_relay(self.id, inner).await
}
- pub async fn on_relay(self: Arc<Self>, p: RelayMessage) {
+ pub async fn on_relay(self: &Arc<Self>, p: RelayMessage) {
match p {
RelayMessage::Offer(o) => self.on_offer(o).await,
RelayMessage::Answer(a) => self.on_answer(a).await,
@@ -149,7 +160,7 @@ impl Peer {
"remote resource provided: ({:?}) {:?} {:?}",
info.id, info.kind, info.label
);
- self.resources_provided
+ self.remote_provided
.write()
.await
.insert(info.id.clone(), info.clone());
@@ -160,13 +171,24 @@ impl Peer {
}
RelayMessage::ProvideStop { id } => {
info!("remote resource removed: ({:?}) ", id);
- self.resources_provided.write().await.remove(&id);
+ self.remote_provided.write().await.remove(&id);
self.inst
.event_handler
.resource_removed(self.clone(), id)
.await;
}
- _ => (),
+ RelayMessage::Chat(_) => (),
+ RelayMessage::Identify { username } => {
+ info!("peer {} is known as {username:?}", self.id)
+ }
+ RelayMessage::Request { id } => {
+ if let Some(res) = self.inst.local_resources.read().await.get(&id) {
+ res.on_request(self.clone()).await;
+ } else {
+ warn!("({}) requested unknown local resource", self.id)
+ }
+ }
+ RelayMessage::RequestStop { id } => {}
}
}
diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs
index 8f21d85..318ed7d 100644
--- a/client-native-lib/src/signaling.rs
+++ b/client-native-lib/src/signaling.rs
@@ -6,7 +6,7 @@
use crate::protocol::ClientboundPacket;
use crate::{crypto::hash, protocol::ServerboundPacket};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
-use log::{debug, error, info, warn};
+use log::{debug, error, info, trace};
use std::pin::Pin;
use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite::{self, Message};
@@ -37,7 +37,10 @@ impl SignalingConnection {
let (tx, rx): (_, _) = conn.split();
let tx = tx.with(async move |packet: ServerboundPacket| {
- debug!(" -> {packet:?}");
+ match packet {
+ ServerboundPacket::Relay { .. } => trace!(" -> {packet:?}"),
+ _ => debug!(" -> {packet:?}"),
+ }
Ok::<_, _>(Message::Text(
serde_json::to_string::<ServerboundPacket>(&packet).unwrap(),
))
@@ -46,9 +49,12 @@ impl SignalingConnection {
let rx = rx.filter_map(async move |mesg| match mesg {
Ok(mesg) => match mesg {
tungstenite::Message::Text(t) => {
- let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
- debug!("<- {p:?}");
- Some(p)
+ let packet: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
+ match packet {
+ ClientboundPacket::Message { .. } => trace!(" <- {packet:?}"),
+ _ => debug!(" <- {packet:?}"),
+ }
+ Some(packet)
}
tungstenite::Message::Close(e) => {
error!("ws closed {e:?}");
@@ -57,7 +63,7 @@ impl SignalingConnection {
_ => None,
},
Err(e) => {
- warn!("websocket error: {e}");
+ error!("websocket error: {e}");
None
}
});