aboutsummaryrefslogtreecommitdiff
path: root/client-native-lib/src/peer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'client-native-lib/src/peer.rs')
-rw-r--r--client-native-lib/src/peer.rs41
1 files changed, 29 insertions, 12 deletions
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index b6ecf55..36b1754 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -7,11 +7,12 @@ use crate::{
instance::Instance,
protocol::{self, ProvideInfo, RelayMessage, Sdp},
};
-use log::info;
+use log::{info, warn};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use webrtc::{
data::data_channel::DataChannel,
+ data_channel::RTCDataChannel,
ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
ice_server::RTCIceServer,
@@ -41,10 +42,10 @@ pub struct Peer {
// Connected(Arc<TransportChannel>),
// AwaitDisconnect,
// }
-// pub enum TransportChannel {
-// Track(TrackRemote),
-// DataChannel(DataChannel),
-// }
+pub enum TransportChannel {
+ Track(Arc<TrackRemote>),
+ DataChannel(Arc<RTCDataChannel>),
+}
impl Peer {
pub async fn create(inst: Arc<Instance>, id: usize) -> Arc<Self> {
@@ -96,14 +97,30 @@ impl Peer {
}
{
+ let weak = Arc::<Peer>::downgrade(&peer);
peer.peer_connection
.on_data_channel(box move |dc| {
- info!("got a data channel");
+ let peer = weak.upgrade().unwrap();
Box::pin(async move {
- dc.on_message(box move |message| {
- Box::pin(async move { println!("{:?}", message.data) })
- })
- .await
+ if let Some(res) = peer
+ .resources_provided
+ .read()
+ .await
+ .get(&dc.label().to_string())
+ {
+ info!("data channel for ({:?}) '{:?}'", res.id, res.label);
+ peer.inst
+ .event_handler
+ .resource_connected(
+ peer.clone(),
+ res,
+ TransportChannel::DataChannel(dc),
+ )
+ .await;
+ } else {
+ warn!("got unassociated data channel; closed connection");
+ dc.close().await;
+ }
})
})
.await;
@@ -138,7 +155,7 @@ impl Peer {
.insert(info.id.clone(), info.clone());
self.inst
.event_handler
- .remote_resource_added(self.clone(), info)
+ .resource_added(self.clone(), info)
.await;
}
RelayMessage::ProvideStop { id } => {
@@ -146,7 +163,7 @@ impl Peer {
self.resources_provided.write().await.remove(&id);
self.inst
.event_handler
- .remote_resource_removed(self.clone(), id)
+ .resource_removed(self.clone(), id)
.await;
}
_ => (),