From 1d23f7b835ec6714cda248981e642c48fc551dce Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 7 Oct 2022 23:25:39 +0200 Subject: transport works. --- client-native-lib/src/instance.rs | 15 ++++++-------- client-native-lib/src/lib.rs | 23 +++++++++++++--------- client-native-lib/src/peer.rs | 41 +++++++++++++++++++++++++++------------ 3 files changed, 49 insertions(+), 30 deletions(-) (limited to 'client-native-lib/src') diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs index b3688c3..cd720f1 100644 --- a/client-native-lib/src/instance.rs +++ b/client-native-lib/src/instance.rs @@ -18,7 +18,7 @@ use tokio::sync::RwLock; use webrtc::api::API; pub struct Instance { - pub event_handler: Box, + pub event_handler: Arc, pub conn: SignalingConnection, pub config: Config, pub api: API, @@ -28,7 +28,7 @@ pub struct Instance { pub peers: RwLock>>, } impl Instance { - pub async fn new(config: Config, event_handler: Box) -> Arc { + pub async fn new(config: Config, event_handler: Arc) -> Arc { let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await; let key = crypto::Key::derive(&config.secret); @@ -72,16 +72,13 @@ impl Instance { username: self.config.username.clone(), }) .await; + self.event_handler.peer_join(peer).await; } } protocol::ClientboundPacket::ClientLeave { id } => { - self.peers - .write() - .await - .remove(&id) - .unwrap() - .on_leave() - .await; + let peer = self.peers.write().await.remove(&id).unwrap(); + peer.on_leave().await; + self.event_handler.peer_leave(peer).await; } protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index 27c0595..7a0d0d0 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -10,9 +10,9 @@ use std::{pin::Pin, sync::Arc}; use futures_util::Future; -use peer::Peer; -use protocol::ProvideInfo; use instance::Instance; +use peer::{Peer, TransportChannel}; +use protocol::ProvideInfo; use tokio::sync::RwLock; use webrtc::{ api::{ @@ -22,10 +22,10 @@ use webrtc::{ }; pub mod crypto; +pub mod instance; pub mod peer; pub mod protocol; pub mod signaling; -pub mod instance; pub use webrtc; @@ -46,17 +46,22 @@ pub(crate) fn build_api() -> webrtc::api::API { .build() } +pub type DynFut = Pin + Send>>; + pub trait LocalResource: Send + Sync + 'static { fn info(&self) -> ProvideInfo; - fn on_request(&self, peer: Arc) -> Box>; + fn on_request(&self, peer: Arc) -> DynFut<()>; } pub trait EventHandler: Send + Sync + 'static { - fn remote_resource_added( + fn peer_join(&self, peer: Arc) -> DynFut<()>; + fn peer_leave(&self, peer: Arc) -> DynFut<()>; + fn resource_added(&self, peer: Arc, info: ProvideInfo) -> DynFut<()>; + fn resource_removed(&self, peer: Arc, id: String) -> DynFut<()>; + fn resource_connected( &self, peer: Arc, - info: ProvideInfo, - ) -> Pin>>; - fn remote_resource_removed(&self, peer: Arc, id: String) - -> Pin>>; + resource: &ProvideInfo, + channel: TransportChannel, + ) -> DynFut<()>; } diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index b6ecf55..36b1754 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -7,11 +7,12 @@ use crate::{ instance::Instance, protocol::{self, ProvideInfo, RelayMessage, Sdp}, }; -use log::info; +use log::{info, warn}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use webrtc::{ data::data_channel::DataChannel, + data_channel::RTCDataChannel, ice_transport::{ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, ice_server::RTCIceServer, @@ -41,10 +42,10 @@ pub struct Peer { // Connected(Arc), // AwaitDisconnect, // } -// pub enum TransportChannel { -// Track(TrackRemote), -// DataChannel(DataChannel), -// } +pub enum TransportChannel { + Track(Arc), + DataChannel(Arc), +} impl Peer { pub async fn create(inst: Arc, id: usize) -> Arc { @@ -96,14 +97,30 @@ impl Peer { } { + let weak = Arc::::downgrade(&peer); peer.peer_connection .on_data_channel(box move |dc| { - info!("got a data channel"); + let peer = weak.upgrade().unwrap(); Box::pin(async move { - dc.on_message(box move |message| { - Box::pin(async move { println!("{:?}", message.data) }) - }) - .await + if let Some(res) = peer + .resources_provided + .read() + .await + .get(&dc.label().to_string()) + { + info!("data channel for ({:?}) '{:?}'", res.id, res.label); + peer.inst + .event_handler + .resource_connected( + peer.clone(), + res, + TransportChannel::DataChannel(dc), + ) + .await; + } else { + warn!("got unassociated data channel; closed connection"); + dc.close().await; + } }) }) .await; @@ -138,7 +155,7 @@ impl Peer { .insert(info.id.clone(), info.clone()); self.inst .event_handler - .remote_resource_added(self.clone(), info) + .resource_added(self.clone(), info) .await; } RelayMessage::ProvideStop { id } => { @@ -146,7 +163,7 @@ impl Peer { self.resources_provided.write().await.remove(&id); self.inst .event_handler - .remote_resource_removed(self.clone(), id) + .resource_removed(self.clone(), id) .await; } _ => (), -- cgit v1.2.3-70-g09d2