diff options
author | metamuffin <metamuffin@disroot.org> | 2022-10-07 19:59:43 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-10-07 19:59:43 +0200 |
commit | 439428f5c3967f5dd76db5540e085cdd91e7b747 (patch) | |
tree | 35be38807c121c833439025721f8bf386b45a003 | |
parent | 8f07699fbe594e1ec007fbc908adced89fc41f7e (diff) | |
download | keks-meet-439428f5c3967f5dd76db5540e085cdd91e7b747.tar keks-meet-439428f5c3967f5dd76db5540e085cdd91e7b747.tar.bz2 keks-meet-439428f5c3967f5dd76db5540e085cdd91e7b747.tar.zst |
some work on native lib
-rw-r--r-- | client-native-lib/src/lib.rs | 39 | ||||
-rw-r--r-- | client-native-lib/src/peer.rs | 53 | ||||
-rw-r--r-- | client-native-lib/src/protocol.rs | 10 | ||||
-rw-r--r-- | client-native-lib/src/state.rs | 61 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 41 | ||||
-rw-r--r-- | readme.md | 1 |
6 files changed, 161 insertions, 44 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index 6ffbee3..bb88b9f 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -7,6 +7,11 @@ #![feature(box_syntax)] #![feature(async_fn_in_trait)] +use std::{pin::Pin, sync::Arc}; + +use futures_util::Future; +use peer::Peer; +use protocol::ProvideInfo; use state::State; use tokio::sync::RwLock; use webrtc::{ @@ -27,25 +32,10 @@ pub use webrtc; pub struct Config { pub signaling_uri: String, pub secret: String, + pub username: String, } -impl State { - pub async fn new(config: Config) -> Self { - let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await; - let key = crypto::Key::derive(&config.secret); - - Self { - api: build_api(), - my_id: RwLock::new(None), - peers: Default::default(), - config, - conn, - key, - } - } -} - -fn build_api() -> webrtc::api::API { +pub(crate) fn build_api() -> webrtc::api::API { let mut media_engine = MediaEngine::default(); media_engine.register_default_codecs().unwrap(); let mut registry = Registry::new(); @@ -55,3 +45,18 @@ fn build_api() -> webrtc::api::API { .with_interceptor_registry(registry) .build() } + +pub trait LocalResource: Send + Sync + 'static { + fn info(&self) -> ProvideInfo; + fn on_request(&self, peer: Arc<Peer>) -> Box<dyn Future<Output = ()>>; +} + +pub trait EventHandler: Send + Sync + 'static { + fn remote_resource_added( + &self, + peer: &Peer, + info: ProvideInfo, + ) -> Pin<Box<dyn Future<Output = ()>>>; + fn remote_resource_removed(&self, peer: &Peer, id: String) + -> Pin<Box<dyn Future<Output = ()>>>; +} diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index d45d0e5..5e672e6 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -4,13 +4,17 @@ Copyright (C) 2022 metamuffin <metamuffin@disroot.org> */ use crate::{ - protocol::{self, RelayMessage, Sdp}, + protocol::{self, ProvideInfo, RelayMessage, Sdp}, state::State, }; use log::info; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use webrtc::{ - ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer}, + ice_transport::{ + ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, + ice_server::RTCIceServer, + }, peer_connection::{ configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, sdp::session_description::RTCSessionDescription, RTCPeerConnection, @@ -20,6 +24,7 @@ use webrtc::{ pub struct Peer { pub state: Arc<State>, pub peer_connection: RTCPeerConnection, + pub resources_provided: RwLock<HashMap<String, ProvideInfo>>, pub id: usize, } @@ -36,6 +41,7 @@ impl Peer { let peer_connection = state.api.new_peer_connection(config).await.unwrap(); let peer = Arc::new(Self { + resources_provided: Default::default(), state: state.clone(), peer_connection, id, @@ -92,22 +98,53 @@ impl Peer { pub async fn on_relay(&self, p: RelayMessage) { match p { - protocol::RelayMessage::Offer(o) => self.on_offer(o).await, - protocol::RelayMessage::Answer(a) => self.on_answer(a).await, - protocol::RelayMessage::IceCandidate(c) => { - info!("received ICE candidate"); - self.peer_connection.add_ice_candidate(c).await.unwrap(); + RelayMessage::Offer(o) => self.on_offer(o).await, + RelayMessage::Answer(a) => self.on_answer(a).await, + RelayMessage::IceCandidate(c) => self.on_remote_ice_candidate(c).await, + RelayMessage::Provide(info) => { + info!( + "remote resource provided: ({:?}) {:?} {:?}", + info.id, info.kind, info.label + ); + self.resources_provided + .write() + .await + .insert(info.id.clone(), info.clone()); + self.state + .event_handler + .remote_resource_added(&self, info) + .await; + } + RelayMessage::ProvideStop { id } => { + info!("remote resource removed: ({:?}) ", id); + self.resources_provided.write().await.remove(&id); + self.state + .event_handler + .remote_resource_removed(&self, id) + .await; } _ => (), } } + pub async fn on_leave(&self) { + info!("({}) peer left", self.id); + } + pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) { + info!("publishing local ICE candidate"); self.send_relay(RelayMessage::IceCandidate( candidate.to_json().await.unwrap(), )) .await; } + pub async fn on_remote_ice_candidate(&self, candidate: RTCIceCandidateInit) { + info!("adding remote ICE candidate"); + self.peer_connection + .add_ice_candidate(candidate) + .await + .unwrap(); + } pub async fn on_negotiation_needed(self: Arc<Self>) { info!("({}) negotiation needed", self.id); diff --git a/client-native-lib/src/protocol.rs b/client-native-lib/src/protocol.rs index 78a5a29..c361f6b 100644 --- a/client-native-lib/src/protocol.rs +++ b/client-native-lib/src/protocol.rs @@ -65,9 +65,9 @@ pub enum TrackKind { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProvideInfo { - id: String, - kind: String, // not an enum so we dont fail if we dont support it - track_kind: Option<TrackKind>, - label: Option<String>, - size: Option<usize>, + pub id: String, + pub kind: String, // not an enum so we dont fail if we dont support it + pub track_kind: Option<TrackKind>, + pub label: Option<String>, + pub size: Option<usize>, } diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs index 841c876..d129e32 100644 --- a/client-native-lib/src/state.rs +++ b/client-native-lib/src/state.rs @@ -4,11 +4,12 @@ Copyright (C) 2022 metamuffin <metamuffin@disroot.org> */ use crate::{ - crypto::Key, + build_api, + crypto::{self, Key}, peer::Peer, protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, - signaling::SignalingConnection, - Config, + signaling::{self, SignalingConnection}, + Config, EventHandler, LocalResource, }; use futures_util::{SinkExt, StreamExt}; use log::{debug, info, warn}; @@ -17,14 +18,32 @@ use tokio::sync::RwLock; use webrtc::api::API; pub struct State { + pub event_handler: Box<dyn EventHandler>, pub conn: SignalingConnection, pub config: Config, pub api: API, pub key: Key, - pub my_id: RwLock<Option<usize>>, + local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>, + my_id: RwLock<Option<usize>>, pub peers: RwLock<HashMap<usize, Arc<Peer>>>, } impl State { + pub async fn new(config: Config, event_handler: Box<dyn EventHandler>) -> Arc<Self> { + let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await; + let key = crypto::Key::derive(&config.secret); + + Arc::new(Self { + event_handler, + api: build_api(), + my_id: RwLock::new(None), + peers: Default::default(), + local_resources: Default::default(), + config, + conn, + key, + }) + } + pub async fn my_id(&self) -> usize { self.my_id.read().await.expect("not initialized yet") } @@ -47,14 +66,22 @@ impl State { if id == self.my_id().await { // we joined - YAY! } else { - self.peers - .write() - .await - .insert(id, Peer::create(self.clone(), id).await); + let peer = Peer::create(self.clone(), id).await; + self.peers.write().await.insert(id, peer.clone()); + peer.send_relay(RelayMessage::Identify { + username: self.config.username.clone(), + }) + .await; } } protocol::ClientboundPacket::ClientLeave { id } => { - self.peers.write().await.remove(&id); + self.peers + .write() + .await + .remove(&id) + .unwrap() + .on_leave() + .await; } protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); @@ -90,4 +117,20 @@ impl State { .await .unwrap() } + + pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) { + for (pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::Provide(res.info())); + } + self.local_resources + .write() + .await + .insert(res.info().id, res); + } + pub async fn remove_local_resource(&self, id: String) { + self.local_resources.write().await.remove(&id); + for (pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }); + } + } } diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 0e458c4..9d97b8e 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -7,7 +7,9 @@ use bytes::Bytes; use clap::{Parser, Subcommand}; -use client_native_lib::{connect, peer::Peer, webrtc::data_channel::RTCDataChannel, Config}; +use client_native_lib::{ + peer::Peer, state::State, webrtc::data_channel::RTCDataChannel, Config, EventHandler, +}; use log::{error, info}; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ @@ -33,6 +35,9 @@ pub struct Args { /// keks-meet server used for establishing p2p connection #[clap(long, default_value = "wss://meet.metamuffin.org")] signaling_uri: String, + /// username for the `identify` packet + #[clap(short, long, default_value = "guest")] + username: String, /// pre-shared secret (aka. room name) #[clap(short, long)] secret: String, @@ -43,16 +48,42 @@ pub struct Args { async fn run() { let args = Args::parse(); - connect(Config { - secret: args.secret.clone(), - signaling_uri: args.signaling_uri.clone(), - }) + let state = State::new( + Config { + secret: args.secret.clone(), + signaling_uri: args.signaling_uri.clone(), + username: args.username.clone(), + }, + Box::new(Handler {}), + ) .await; + state.receive_loop().await; + tokio::signal::ctrl_c().await.unwrap(); error!("interrupt received, exiting"); } +struct Handler {} + +impl EventHandler for Handler { + fn remote_resource_added( + &self, + peer: &Peer, + info: client_native_lib::protocol::ProvideInfo, + ) -> Pin<Box<dyn Future<Output = ()>>> { + todo!() + } + + fn remote_resource_removed( + &self, + peer: &Peer, + id: String, + ) -> Pin<Box<dyn Future<Output = ()>>> { + todo!() + } +} + #[derive(Subcommand)] pub enum Action { /// Send a file @@ -103,6 +103,7 @@ system works as follows: - download files in a streaming manner. - workaround using service worker - service worker to implement manual updates +- open chat links in a new tab ## Parameters |