From dee67a82e8fb773ceb03c1b38dc0bce7239e1177 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 7 Oct 2022 20:49:11 +0200 Subject: more work on lib: first negotiation --- client-native-lib/src/instance.rs | 136 ++++++++++++++++++++++++++++++++++++++ client-native-lib/src/lib.rs | 8 +-- client-native-lib/src/peer.rs | 48 +++++++++++--- client-native-lib/src/state.rs | 136 -------------------------------------- 4 files changed, 177 insertions(+), 151 deletions(-) create mode 100644 client-native-lib/src/instance.rs delete mode 100644 client-native-lib/src/state.rs (limited to 'client-native-lib/src') diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs new file mode 100644 index 0000000..b3688c3 --- /dev/null +++ b/client-native-lib/src/instance.rs @@ -0,0 +1,136 @@ +/* + This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2022 metamuffin +*/ +use crate::{ + build_api, + crypto::{self, Key}, + peer::Peer, + protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, + signaling::{self, SignalingConnection}, + Config, EventHandler, LocalResource, +}; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, info, warn}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; +use webrtc::api::API; + +pub struct Instance { + pub event_handler: Box, + pub conn: SignalingConnection, + pub config: Config, + pub api: API, + pub key: Key, + local_resources: RwLock>>, + my_id: RwLock>, + pub peers: RwLock>>, +} +impl Instance { + pub async fn new(config: Config, event_handler: Box) -> Arc { + let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await; + let key = crypto::Key::derive(&config.secret); + + Arc::new(Self { + event_handler, + api: build_api(), + my_id: RwLock::new(None), + peers: Default::default(), + local_resources: Default::default(), + config, + conn, + key, + }) + } + + pub async fn my_id(&self) -> usize { + self.my_id.read().await.expect("not initialized yet") + } + + pub async fn receive_loop(self: Arc) { + while let Some(packet) = self.conn.recv.write().await.next().await { + debug!("{packet:?}"); + let inst = self.clone(); + inst.on_message(packet).await + } + } + + pub async fn on_message(self: Arc, packet: ClientboundPacket) { + match packet { + protocol::ClientboundPacket::Init { your_id, version } => { + info!("server is running {version:?}"); + *self.my_id.write().await = Some(your_id); + } + protocol::ClientboundPacket::ClientJoin { id } => { + if id == self.my_id().await { + // we joined - YAY! + } else { + let peer = Peer::create(self.clone(), id).await; + self.peers.write().await.insert(id, peer.clone()); + peer.send_relay(RelayMessage::Identify { + username: self.config.username.clone(), + }) + .await; + } + } + protocol::ClientboundPacket::ClientLeave { id } => { + self.peers + .write() + .await + .remove(&id) + .unwrap() + .on_leave() + .await; + } + protocol::ClientboundPacket::Message { sender, message } => { + let message = self.key.decrypt(&message); + let p = serde_json::from_str::(&message).unwrap(); + self.on_relay(sender, p.inner).await; + } + } + } + + pub async fn on_relay(&self, sender: usize, p: RelayMessage) { + if let Some(peer) = self.peers.read().await.get(&sender).cloned() { + peer.on_relay(p).await + } else { + warn!("got a packet from a non-existent peer") + } + } + + pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { + self.conn + .send + .write() + .await + .send(ServerboundPacket::Relay { + recipient: Some(recipient), + message: self.key.encrypt( + &serde_json::to_string(&RelayMessageWrapper { + sender: self.my_id.read().await.expect("not ready to relay yet.."), + inner, + }) + .unwrap(), + ), + }) + .await + .unwrap() + } + + pub async fn add_local_resource(&self, res: Box) { + for (pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::Provide(res.info())); + } + self.local_resources + .write() + .await + .insert(res.info().id, res); + } + pub async fn remove_local_resource(&self, id: String) { + self.local_resources.write().await.remove(&id); + for (pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }); + } + } +} diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index bb88b9f..27c0595 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -12,7 +12,7 @@ use std::{pin::Pin, sync::Arc}; use futures_util::Future; use peer::Peer; use protocol::ProvideInfo; -use state::State; +use instance::Instance; use tokio::sync::RwLock; use webrtc::{ api::{ @@ -25,7 +25,7 @@ pub mod crypto; pub mod peer; pub mod protocol; pub mod signaling; -pub mod state; +pub mod instance; pub use webrtc; @@ -54,9 +54,9 @@ pub trait LocalResource: Send + Sync + 'static { pub trait EventHandler: Send + Sync + 'static { fn remote_resource_added( &self, - peer: &Peer, + peer: Arc, info: ProvideInfo, ) -> Pin>>; - fn remote_resource_removed(&self, peer: &Peer, id: String) + fn remote_resource_removed(&self, peer: Arc, id: String) -> Pin>>; } diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index 5e672e6..b6ecf55 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -4,13 +4,14 @@ Copyright (C) 2022 metamuffin */ use crate::{ + instance::Instance, protocol::{self, ProvideInfo, RelayMessage, Sdp}, - state::State, }; use log::info; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use webrtc::{ + data::data_channel::DataChannel, ice_transport::{ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, ice_server::RTCIceServer, @@ -19,17 +20,34 @@ use webrtc::{ configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, sdp::session_description::RTCSessionDescription, RTCPeerConnection, }, + track::track_remote::TrackRemote, }; pub struct Peer { - pub state: Arc, + pub inst: Arc, pub peer_connection: RTCPeerConnection, pub resources_provided: RwLock>, pub id: usize, } +// pub struct RemoteResource { +// info: ProvideInfo, +// state: RemoteResourceInner, +// } +// // (Box) -> Pin>>>) +// pub enum RemoteResourceInner { +// Disconnected, +// AwaitConnect, +// Connected(Arc), +// AwaitDisconnect, +// } +// pub enum TransportChannel { +// Track(TrackRemote), +// DataChannel(DataChannel), +// } + impl Peer { - pub async fn create(state: Arc, id: usize) -> Arc { + pub async fn create(inst: Arc, id: usize) -> Arc { info!("({id}) peer joined"); let config = RTCConfiguration { ice_servers: vec![RTCIceServer { @@ -39,10 +57,10 @@ impl Peer { ..Default::default() }; - let peer_connection = state.api.new_peer_connection(config).await.unwrap(); + let peer_connection = inst.api.new_peer_connection(config).await.unwrap(); let peer = Arc::new(Self { resources_provided: Default::default(), - state: state.clone(), + inst: inst.clone(), peer_connection, id, }); @@ -80,6 +98,7 @@ impl Peer { { peer.peer_connection .on_data_channel(box move |dc| { + info!("got a data channel"); Box::pin(async move { dc.on_message(box move |message| { Box::pin(async move { println!("{:?}", message.data) }) @@ -92,11 +111,18 @@ impl Peer { peer } + pub async fn request_resource(&self, id: String) { + self.send_relay(RelayMessage::Request { id }).await; + } + pub async fn request_stop_resource(&self, id: String) { + self.send_relay(RelayMessage::RequestStop { id }).await; + } + pub async fn send_relay(&self, inner: RelayMessage) { - self.state.send_relay(self.id, inner).await + self.inst.send_relay(self.id, inner).await } - pub async fn on_relay(&self, p: RelayMessage) { + pub async fn on_relay(self: Arc, p: RelayMessage) { match p { RelayMessage::Offer(o) => self.on_offer(o).await, RelayMessage::Answer(a) => self.on_answer(a).await, @@ -110,17 +136,17 @@ impl Peer { .write() .await .insert(info.id.clone(), info.clone()); - self.state + self.inst .event_handler - .remote_resource_added(&self, info) + .remote_resource_added(self.clone(), info) .await; } RelayMessage::ProvideStop { id } => { info!("remote resource removed: ({:?}) ", id); self.resources_provided.write().await.remove(&id); - self.state + self.inst .event_handler - .remote_resource_removed(&self, id) + .remote_resource_removed(self.clone(), id) .await; } _ => (), diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs deleted file mode 100644 index d129e32..0000000 --- a/client-native-lib/src/state.rs +++ /dev/null @@ -1,136 +0,0 @@ -/* - This file is part of keks-meet (https://codeberg.org/metamuffin/keks-meet) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2022 metamuffin -*/ -use crate::{ - build_api, - crypto::{self, Key}, - peer::Peer, - protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, - signaling::{self, SignalingConnection}, - Config, EventHandler, LocalResource, -}; -use futures_util::{SinkExt, StreamExt}; -use log::{debug, info, warn}; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock; -use webrtc::api::API; - -pub struct State { - pub event_handler: Box, - pub conn: SignalingConnection, - pub config: Config, - pub api: API, - pub key: Key, - local_resources: RwLock>>, - my_id: RwLock>, - pub peers: RwLock>>, -} -impl State { - pub async fn new(config: Config, event_handler: Box) -> Arc { - let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await; - let key = crypto::Key::derive(&config.secret); - - Arc::new(Self { - event_handler, - api: build_api(), - my_id: RwLock::new(None), - peers: Default::default(), - local_resources: Default::default(), - config, - conn, - key, - }) - } - - pub async fn my_id(&self) -> usize { - self.my_id.read().await.expect("not initialized yet") - } - - pub async fn receive_loop(self: Arc) { - while let Some(packet) = self.conn.recv.write().await.next().await { - debug!("{packet:?}"); - let state = self.clone(); - state.on_message(packet).await - } - } - - pub async fn on_message(self: Arc, packet: ClientboundPacket) { - match packet { - protocol::ClientboundPacket::Init { your_id, version } => { - info!("server is running {version:?}"); - *self.my_id.write().await = Some(your_id); - } - protocol::ClientboundPacket::ClientJoin { id } => { - if id == self.my_id().await { - // we joined - YAY! - } else { - let peer = Peer::create(self.clone(), id).await; - self.peers.write().await.insert(id, peer.clone()); - peer.send_relay(RelayMessage::Identify { - username: self.config.username.clone(), - }) - .await; - } - } - protocol::ClientboundPacket::ClientLeave { id } => { - self.peers - .write() - .await - .remove(&id) - .unwrap() - .on_leave() - .await; - } - protocol::ClientboundPacket::Message { sender, message } => { - let message = self.key.decrypt(&message); - let p = serde_json::from_str::(&message).unwrap(); - self.on_relay(sender, p.inner).await; - } - } - } - - pub async fn on_relay(&self, sender: usize, p: RelayMessage) { - if let Some(peer) = self.peers.read().await.get(&sender).cloned() { - peer.on_relay(p).await - } else { - warn!("got a packet from a non-existent peer") - } - } - - pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { - self.conn - .send - .write() - .await - .send(ServerboundPacket::Relay { - recipient: Some(recipient), - message: self.key.encrypt( - &serde_json::to_string(&RelayMessageWrapper { - sender: self.my_id.read().await.expect("not ready to relay yet.."), - inner, - }) - .unwrap(), - ), - }) - .await - .unwrap() - } - - pub async fn add_local_resource(&self, res: Box) { - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::Provide(res.info())); - } - self.local_resources - .write() - .await - .insert(res.info().id, res); - } - pub async fn remove_local_resource(&self, id: String) { - self.local_resources.write().await.remove(&id); - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }); - } - } -} -- cgit v1.2.3-70-g09d2