summaryrefslogtreecommitdiff
path: root/client-native-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'client-native-lib/src')
-rw-r--r--client-native-lib/src/lib.rs39
-rw-r--r--client-native-lib/src/peer.rs53
-rw-r--r--client-native-lib/src/protocol.rs10
-rw-r--r--client-native-lib/src/state.rs61
4 files changed, 124 insertions, 39 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index 6ffbee3..bb88b9f 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -7,6 +7,11 @@
#![feature(box_syntax)]
#![feature(async_fn_in_trait)]
+use std::{pin::Pin, sync::Arc};
+
+use futures_util::Future;
+use peer::Peer;
+use protocol::ProvideInfo;
use state::State;
use tokio::sync::RwLock;
use webrtc::{
@@ -27,25 +32,10 @@ pub use webrtc;
pub struct Config {
pub signaling_uri: String,
pub secret: String,
+ pub username: String,
}
-impl State {
- pub async fn new(config: Config) -> Self {
- let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await;
- let key = crypto::Key::derive(&config.secret);
-
- Self {
- api: build_api(),
- my_id: RwLock::new(None),
- peers: Default::default(),
- config,
- conn,
- key,
- }
- }
-}
-
-fn build_api() -> webrtc::api::API {
+pub(crate) fn build_api() -> webrtc::api::API {
let mut media_engine = MediaEngine::default();
media_engine.register_default_codecs().unwrap();
let mut registry = Registry::new();
@@ -55,3 +45,18 @@ fn build_api() -> webrtc::api::API {
.with_interceptor_registry(registry)
.build()
}
+
+pub trait LocalResource: Send + Sync + 'static {
+ fn info(&self) -> ProvideInfo;
+ fn on_request(&self, peer: Arc<Peer>) -> Box<dyn Future<Output = ()>>;
+}
+
+pub trait EventHandler: Send + Sync + 'static {
+ fn remote_resource_added(
+ &self,
+ peer: &Peer,
+ info: ProvideInfo,
+ ) -> Pin<Box<dyn Future<Output = ()>>>;
+ fn remote_resource_removed(&self, peer: &Peer, id: String)
+ -> Pin<Box<dyn Future<Output = ()>>>;
+}
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index d45d0e5..5e672e6 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -4,13 +4,17 @@
Copyright (C) 2022 metamuffin <metamuffin@disroot.org>
*/
use crate::{
- protocol::{self, RelayMessage, Sdp},
+ protocol::{self, ProvideInfo, RelayMessage, Sdp},
state::State,
};
use log::info;
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
use webrtc::{
- ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer},
+ ice_transport::{
+ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
+ ice_server::RTCIceServer,
+ },
peer_connection::{
configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
sdp::session_description::RTCSessionDescription, RTCPeerConnection,
@@ -20,6 +24,7 @@ use webrtc::{
pub struct Peer {
pub state: Arc<State>,
pub peer_connection: RTCPeerConnection,
+ pub resources_provided: RwLock<HashMap<String, ProvideInfo>>,
pub id: usize,
}
@@ -36,6 +41,7 @@ impl Peer {
let peer_connection = state.api.new_peer_connection(config).await.unwrap();
let peer = Arc::new(Self {
+ resources_provided: Default::default(),
state: state.clone(),
peer_connection,
id,
@@ -92,22 +98,53 @@ impl Peer {
pub async fn on_relay(&self, p: RelayMessage) {
match p {
- protocol::RelayMessage::Offer(o) => self.on_offer(o).await,
- protocol::RelayMessage::Answer(a) => self.on_answer(a).await,
- protocol::RelayMessage::IceCandidate(c) => {
- info!("received ICE candidate");
- self.peer_connection.add_ice_candidate(c).await.unwrap();
+ RelayMessage::Offer(o) => self.on_offer(o).await,
+ RelayMessage::Answer(a) => self.on_answer(a).await,
+ RelayMessage::IceCandidate(c) => self.on_remote_ice_candidate(c).await,
+ RelayMessage::Provide(info) => {
+ info!(
+ "remote resource provided: ({:?}) {:?} {:?}",
+ info.id, info.kind, info.label
+ );
+ self.resources_provided
+ .write()
+ .await
+ .insert(info.id.clone(), info.clone());
+ self.state
+ .event_handler
+ .remote_resource_added(&self, info)
+ .await;
+ }
+ RelayMessage::ProvideStop { id } => {
+ info!("remote resource removed: ({:?}) ", id);
+ self.resources_provided.write().await.remove(&id);
+ self.state
+ .event_handler
+ .remote_resource_removed(&self, id)
+ .await;
}
_ => (),
}
}
+ pub async fn on_leave(&self) {
+ info!("({}) peer left", self.id);
+ }
+
pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) {
+ info!("publishing local ICE candidate");
self.send_relay(RelayMessage::IceCandidate(
candidate.to_json().await.unwrap(),
))
.await;
}
+ pub async fn on_remote_ice_candidate(&self, candidate: RTCIceCandidateInit) {
+ info!("adding remote ICE candidate");
+ self.peer_connection
+ .add_ice_candidate(candidate)
+ .await
+ .unwrap();
+ }
pub async fn on_negotiation_needed(self: Arc<Self>) {
info!("({}) negotiation needed", self.id);
diff --git a/client-native-lib/src/protocol.rs b/client-native-lib/src/protocol.rs
index 78a5a29..c361f6b 100644
--- a/client-native-lib/src/protocol.rs
+++ b/client-native-lib/src/protocol.rs
@@ -65,9 +65,9 @@ pub enum TrackKind {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProvideInfo {
- id: String,
- kind: String, // not an enum so we dont fail if we dont support it
- track_kind: Option<TrackKind>,
- label: Option<String>,
- size: Option<usize>,
+ pub id: String,
+ pub kind: String, // not an enum so we dont fail if we dont support it
+ pub track_kind: Option<TrackKind>,
+ pub label: Option<String>,
+ pub size: Option<usize>,
}
diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs
index 841c876..d129e32 100644
--- a/client-native-lib/src/state.rs
+++ b/client-native-lib/src/state.rs
@@ -4,11 +4,12 @@
Copyright (C) 2022 metamuffin <metamuffin@disroot.org>
*/
use crate::{
- crypto::Key,
+ build_api,
+ crypto::{self, Key},
peer::Peer,
protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket},
- signaling::SignalingConnection,
- Config,
+ signaling::{self, SignalingConnection},
+ Config, EventHandler, LocalResource,
};
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
@@ -17,14 +18,32 @@ use tokio::sync::RwLock;
use webrtc::api::API;
pub struct State {
+ pub event_handler: Box<dyn EventHandler>,
pub conn: SignalingConnection,
pub config: Config,
pub api: API,
pub key: Key,
- pub my_id: RwLock<Option<usize>>,
+ local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
+ my_id: RwLock<Option<usize>>,
pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
}
impl State {
+ pub async fn new(config: Config, event_handler: Box<dyn EventHandler>) -> Arc<Self> {
+ let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await;
+ let key = crypto::Key::derive(&config.secret);
+
+ Arc::new(Self {
+ event_handler,
+ api: build_api(),
+ my_id: RwLock::new(None),
+ peers: Default::default(),
+ local_resources: Default::default(),
+ config,
+ conn,
+ key,
+ })
+ }
+
pub async fn my_id(&self) -> usize {
self.my_id.read().await.expect("not initialized yet")
}
@@ -47,14 +66,22 @@ impl State {
if id == self.my_id().await {
// we joined - YAY!
} else {
- self.peers
- .write()
- .await
- .insert(id, Peer::create(self.clone(), id).await);
+ let peer = Peer::create(self.clone(), id).await;
+ self.peers.write().await.insert(id, peer.clone());
+ peer.send_relay(RelayMessage::Identify {
+ username: self.config.username.clone(),
+ })
+ .await;
}
}
protocol::ClientboundPacket::ClientLeave { id } => {
- self.peers.write().await.remove(&id);
+ self.peers
+ .write()
+ .await
+ .remove(&id)
+ .unwrap()
+ .on_leave()
+ .await;
}
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
@@ -90,4 +117,20 @@ impl State {
.await
.unwrap()
}
+
+ pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) {
+ for (pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::Provide(res.info()));
+ }
+ self.local_resources
+ .write()
+ .await
+ .insert(res.info().id, res);
+ }
+ pub async fn remove_local_resource(&self, id: String) {
+ self.local_resources.write().await.remove(&id);
+ for (pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::ProvideStop { id: id.clone() });
+ }
+ }
}