diff options
author | metamuffin <metamuffin@disroot.org> | 2022-10-07 16:50:05 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-10-07 16:50:05 +0200 |
commit | 76bc7e5e8bee9b3994855b071408a1de582d64f3 (patch) | |
tree | 1cbbb8f008410216caa9499f43e03d08c269dec3 /client-native-lib | |
parent | c8406ff6fe2479acb2cf5a80bb18d0c79e51fb9c (diff) | |
download | keks-meet-76bc7e5e8bee9b3994855b071408a1de582d64f3.tar keks-meet-76bc7e5e8bee9b3994855b071408a1de582d64f3.tar.bz2 keks-meet-76bc7e5e8bee9b3994855b071408a1de582d64f3.tar.zst |
lib cleanup
Diffstat (limited to 'client-native-lib')
-rw-r--r-- | client-native-lib/src/lib.rs | 64 | ||||
-rw-r--r-- | client-native-lib/src/peer.rs | 13 | ||||
-rw-r--r-- | client-native-lib/src/signaling.rs | 112 | ||||
-rw-r--r-- | client-native-lib/src/state.rs | 42 |
4 files changed, 100 insertions, 131 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index 25d6e57..6ffbee3 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -7,19 +7,13 @@ #![feature(box_syntax)] #![feature(async_fn_in_trait)] -use log::debug; -use protocol::ProvideInfo; -use signaling::signaling_connect; use state::State; -use std::sync::Arc; -use tokio::sync::{mpsc::unbounded_channel, RwLock}; +use tokio::sync::RwLock; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, }, - data_channel::RTCDataChannel, interceptor::registry::Registry, - track::{track_local::TrackLocal, track_remote::TrackRemote}, }; pub mod crypto; @@ -35,51 +29,29 @@ pub struct Config { pub secret: String, } -pub async fn connect(config: Config) -> Arc<State> { - let (sender, mut recv) = signaling_connect(&config.signaling_uri, &config.secret).await; +impl State { + pub async fn new(config: Config) -> Self { + let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await; + let key = crypto::Key::derive(&config.secret); - let key = crypto::Key::derive(&config.secret); + Self { + api: build_api(), + my_id: RwLock::new(None), + peers: Default::default(), + config, + conn, + key, + } + } +} +fn build_api() -> webrtc::api::API { let mut media_engine = MediaEngine::default(); media_engine.register_default_codecs().unwrap(); let mut registry = Registry::new(); registry = register_default_interceptors(registry, &mut media_engine).unwrap(); - let api = APIBuilder::new() + APIBuilder::new() .with_media_engine(media_engine) .with_interceptor_registry(registry) - .build(); - - let (relay_tx, mut relay_rx) = unbounded_channel(); - let state = Arc::new(State { - peers: Default::default(), - key, - api, - my_id: RwLock::new(None), - sender, - config, - relay_tx, - }); - - { - let state = state.clone(); - tokio::spawn(async move { - debug!("receiving packets now"); - while let Some((r, p)) = relay_rx.recv().await { - let state = state.clone(); - state.send_relay(r, p).await - } - }); - } - { - let state = state.clone(); - tokio::spawn(async move { - debug!("receiving packets now"); - while let Some(packet) = recv.recv().await { - debug!("{packet:?}"); - let state = state.clone(); - state.on_message(packet).await - } - }); - } - state + .build() } diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index d2a6fbc..d45d0e5 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -9,7 +9,6 @@ use crate::{ }; use log::info; use std::sync::Arc; -use tokio::sync::mpsc::UnboundedSender; use webrtc::{ ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer}, peer_connection::{ @@ -19,17 +18,13 @@ use webrtc::{ }; pub struct Peer { - pub signal: UnboundedSender<(usize, RelayMessage)>, + pub state: Arc<State>, pub peer_connection: RTCPeerConnection, pub id: usize, } impl Peer { - pub async fn create( - state: Arc<State>, - signal: UnboundedSender<(usize, RelayMessage)>, - id: usize, - ) -> Arc<Self> { + pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> { info!("({id}) peer joined"); let config = RTCConfiguration { ice_servers: vec![RTCIceServer { @@ -41,7 +36,7 @@ impl Peer { let peer_connection = state.api.new_peer_connection(config).await.unwrap(); let peer = Arc::new(Self { - signal, + state: state.clone(), peer_connection, id, }); @@ -92,7 +87,7 @@ impl Peer { } pub async fn send_relay(&self, inner: RelayMessage) { - self.signal.send((self.id, inner)).unwrap() + self.state.send_relay(self.id, inner).await } pub async fn on_relay(&self, p: RelayMessage) { diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index cbbd063..8f21d85 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -3,76 +3,68 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2022 metamuffin <metamuffin@disroot.org> */ -use std::time::Duration; - use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{Sink, SinkExt, Stream, StreamExt}; use log::{debug, error, info, warn}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use std::pin::Pin; +use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::{self, Message}; -pub async fn signaling_connect( - signaling_server: &str, - secret: &str, -) -> ( - UnboundedSender<ServerboundPacket>, - UnboundedReceiver<ClientboundPacket>, -) { - let uri = format!("{signaling_server}/signaling/{}", hash(secret)); - info!("connecting to signaling server at {uri:?}"); - let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) - .await - .unwrap(); - info!("connection established"); - - let (mut tx, mut rx) = conn.split(); +pub struct SignalingConnection { + pub send: RwLock< + Pin< + Box< + dyn Sink<ServerboundPacket, Error = tokio_tungstenite::tungstenite::Error> + + Send + + Sync + + 'static, + >, + >, + >, + pub recv: RwLock<Pin<Box<dyn Stream<Item = ClientboundPacket> + Send + Sync + 'static>>>, +} - let (in_tx, in_rx) = unbounded_channel(); - let (out_tx, mut out_rx) = unbounded_channel(); +impl SignalingConnection { + pub async fn new(signaling_server: &str, secret: &str) -> Self { + let uri = format!("{signaling_server}/signaling/{}", hash(secret)); + info!("connecting to signaling server at {uri:?}"); + let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) + .await + .unwrap(); + info!("connection established"); - let ping_out_tx = out_tx.clone(); - let ping_task = tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(30)).await; - ping_out_tx.send(ServerboundPacket::Ping).unwrap() - } - }); + let (tx, rx): (_, _) = conn.split(); - let send_task = tokio::spawn(async move { - while let Some(p) = out_rx.recv().await { - debug!(" -> {p:?}"); - tx.send(Message::Text( - serde_json::to_string::<ServerboundPacket>(&p).unwrap(), + let tx = tx.with(async move |packet: ServerboundPacket| { + debug!(" -> {packet:?}"); + Ok::<_, _>(Message::Text( + serde_json::to_string::<ServerboundPacket>(&packet).unwrap(), )) - .await - .unwrap(); - } - }); - let _receive_task = tokio::spawn(async move { - while let Some(mesg) = rx.next().await { - match mesg { - Ok(mesg) => match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - in_tx.send(p).unwrap() - } - tungstenite::Message::Close(e) => { - error!("ws closed :( {e:?}"); - unreachable!(); - } - _ => (), - }, - Err(_) => { - send_task.abort(); - ping_task.abort(); - break; + }); + + let rx = rx.filter_map(async move |mesg| match mesg { + Ok(mesg) => match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + Some(p) } + tungstenite::Message::Close(e) => { + error!("ws closed {e:?}"); + None + } + _ => None, + }, + Err(e) => { + warn!("websocket error: {e}"); + None } - } - warn!("recv task stopped"); - }); + }); - (out_tx, in_rx) + Self { + recv: RwLock::new(Box::pin(rx)), + send: RwLock::new(Box::pin(tx)), + } + } } diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs index 6182e66..841c876 100644 --- a/client-native-lib/src/state.rs +++ b/client-native-lib/src/state.rs @@ -3,48 +3,54 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2022 metamuffin <metamuffin@disroot.org> */ -use std::{collections::HashMap, sync::Arc}; -use log::warn; -use tokio::sync::{mpsc::UnboundedSender, RwLock}; -use webrtc::api::API; - use crate::{ crypto::Key, peer::Peer, protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, + signaling::SignalingConnection, Config, }; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, info, warn}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; +use webrtc::api::API; pub struct State { + pub conn: SignalingConnection, pub config: Config, pub api: API, pub key: Key, pub my_id: RwLock<Option<usize>>, - pub sender: UnboundedSender<ServerboundPacket>, pub peers: RwLock<HashMap<usize, Arc<Peer>>>, - pub relay_tx: UnboundedSender<(usize, RelayMessage)>, } impl State { pub async fn my_id(&self) -> usize { self.my_id.read().await.expect("not initialized yet") } + pub async fn receive_loop(self: Arc<Self>) { + while let Some(packet) = self.conn.recv.write().await.next().await { + debug!("{packet:?}"); + let state = self.clone(); + state.on_message(packet).await + } + } + pub async fn on_message(self: Arc<Self>, packet: ClientboundPacket) { match packet { - protocol::ClientboundPacket::Init { - your_id, - version: _, - } => { + protocol::ClientboundPacket::Init { your_id, version } => { + info!("server is running {version:?}"); *self.my_id.write().await = Some(your_id); } protocol::ClientboundPacket::ClientJoin { id } => { if id == self.my_id().await { // we joined - YAY! } else { - self.peers.write().await.insert( - id, - Peer::create(self.clone(), self.relay_tx.clone(), id).await, - ); + self.peers + .write() + .await + .insert(id, Peer::create(self.clone(), id).await); } } protocol::ClientboundPacket::ClientLeave { id } => { @@ -67,7 +73,10 @@ impl State { } pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { - self.sender + self.conn + .send + .write() + .await .send(ServerboundPacket::Relay { recipient: Some(recipient), message: self.key.encrypt( @@ -78,6 +87,7 @@ impl State { .unwrap(), ), }) + .await .unwrap() } } |