diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-14 23:03:52 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-14 23:03:52 +0200 |
commit | 7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30 (patch) | |
tree | cfcdd0819552f1ccd9afd16f7bb6a2e13b662188 | |
parent | ac1494fab79444a2da487c2b2376782f6e45efe0 (diff) | |
download | keks-meet-7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30.tar keks-meet-7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30.tar.bz2 keks-meet-7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30.tar.zst |
rift works ™™
-rw-r--r-- | client-native-lib/src/lib.rs | 27 | ||||
-rw-r--r-- | client-native-lib/src/peer.rs | 69 | ||||
-rw-r--r-- | client-native-lib/src/state.rs | 36 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 136 |
4 files changed, 200 insertions, 68 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index 118d962..68dcecb 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -3,9 +3,9 @@ use log::debug; use signaling::signaling_connect; -use state::State; -use std::sync::Arc; -use tokio::sync::RwLock; +use state::{HasPeer, PeerInit, State}; +use std::{marker::Sync, sync::Arc}; +use tokio::sync::{mpsc::unbounded_channel, RwLock}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, @@ -19,12 +19,18 @@ pub mod protocol; pub mod signaling; pub mod state; +pub use webrtc; + pub struct Config { pub signaling_host: String, pub secret: String, } -pub async fn connect(config: Config) -> Arc<State> { +pub async fn connect<I, P>(config: Config, sup: Arc<I>) -> Arc<State<P, I>> +where + I: PeerInit<P> + Sync + std::marker::Send + 'static, + P: HasPeer + Sync + std::marker::Send + 'static, +{ let (sender, mut recv) = signaling_connect(&config.signaling_host, &config.secret).await; let key = crypto::Key::derive(&config.secret); @@ -38,6 +44,7 @@ pub async fn connect(config: Config) -> Arc<State> { .with_interceptor_registry(registry) .build(); + let (relay_tx, mut relay_rx) = unbounded_channel(); let state = Arc::new(State { peers: Default::default(), key, @@ -45,12 +52,24 @@ pub async fn connect(config: Config) -> Arc<State> { my_id: RwLock::new(None), sender, config, + relay_tx, + sup, }); { let state = state.clone(); tokio::spawn(async move { debug!("receiving packets now"); + while let Some((r, p)) = relay_rx.recv().await { + let state = state.clone(); + state.send_relay(r, p).await + } + }); + } + { + let state = state.clone(); + tokio::spawn(async move { + debug!("receiving packets now"); while let Some(packet) = recv.recv().await { debug!("{packet:?}"); let state = state.clone(); diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index 64eb641..280cc06 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -1,7 +1,11 @@ +use crate::{ + protocol::{self, RTCSessionDescriptionInit, RelayMessage}, + state::{HasPeer, PeerInit, State}, +}; use log::info; use std::sync::Arc; +use tokio::sync::mpsc::UnboundedSender; use webrtc::{ - data_channel::data_channel_message::DataChannelMessage, ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer}, peer_connection::{ configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, @@ -9,19 +13,18 @@ use webrtc::{ }, }; -use crate::{ - protocol::{self, RTCSessionDescriptionInit, RelayMessage}, - state::State, -}; - pub struct Peer { - state: Arc<State>, // maybe use Weak later - peer_connection: RTCPeerConnection, - id: usize, + pub signal: UnboundedSender<(usize, RelayMessage)>, + pub peer_connection: RTCPeerConnection, + pub id: usize, } impl Peer { - pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> { + pub async fn create<P: HasPeer, I: PeerInit<P>>( + state: Arc<State<P, I>>, + signal: UnboundedSender<(usize, RelayMessage)>, + id: usize, + ) -> Arc<Self> { info!("({id}) peer joined"); let config = RTCConfiguration { ice_servers: vec![RTCIceServer { @@ -32,15 +35,14 @@ impl Peer { }; let peer_connection = state.api.new_peer_connection(config).await.unwrap(); - let peer = Arc::new(Self { + signal, peer_connection, id, - state: state.clone(), }); peer.peer_connection .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - println!("conn state: {s}"); + info!("connection state changed: {s}"); Box::pin(async {}) })) .await; @@ -81,50 +83,11 @@ impl Peer { }) .await; } - - // if let Action::Send { .. } = &peer.state.args.action { - // peer.start_transfer().await - // } - peer } pub async fn send_relay(&self, inner: RelayMessage) { - self.state.send_relay(self.id, inner).await - } - - pub async fn start_transfer(&self) { - info!("starting data channel"); - let data_channel = self - .peer_connection - .create_data_channel("file-transfer", None) - .await - .unwrap(); - - data_channel - .on_message(Box::new(move |msg: DataChannelMessage| { - let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); - println!("message! '{}'", msg_str); - Box::pin(async {}) - })) - .await; - - { - let dc2 = data_channel.clone(); - data_channel - .on_open(box move || { - let data_channel = dc2.clone(); - Box::pin(async move { - loop { - data_channel - .send(&bytes::Bytes::from_static(b"test\n")) - .await - .unwrap(); - } - }) - }) - .await; - } + self.signal.send((self.id, inner)).unwrap() } pub async fn on_relay(&self, p: RelayMessage) { diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs index 57c4b29..c5e9365 100644 --- a/client-native-lib/src/state.rs +++ b/client-native-lib/src/state.rs @@ -1,5 +1,6 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; +use futures_util::Future; use log::warn; use tokio::sync::{mpsc::UnboundedSender, RwLock}; use webrtc::api::API; @@ -7,18 +8,31 @@ use webrtc::api::API; use crate::{ crypto::Key, peer::Peer, - protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, Config, + protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, + Config, }; -pub struct State { +pub trait HasPeer { + fn peer(&self) -> &Arc<Peer>; +} +pub trait PeerInit<P> { + fn add_peer( + &self, + p: Arc<Peer>, + ) -> Pin<Box<dyn Future<Output = Arc<P>> + 'static + Send + Sync>>; +} + +pub struct State<P: HasPeer, I: PeerInit<P>> { + pub sup: Arc<I>, pub config: Config, pub api: API, pub key: Key, pub my_id: RwLock<Option<usize>>, pub sender: UnboundedSender<ServerboundPacket>, - pub peers: RwLock<HashMap<usize, Arc<Peer>>>, + pub peers: RwLock<HashMap<usize, Arc<P>>>, + pub relay_tx: UnboundedSender<(usize, RelayMessage)>, } -impl State { +impl<P: HasPeer, I: PeerInit<P>> State<P, I> { pub async fn my_id(&self) -> usize { self.my_id.read().await.expect("not initialized yet") } @@ -35,10 +49,12 @@ impl State { if id == self.my_id().await { // we joined - YAY! } else { - self.peers - .write() - .await - .insert(id, Peer::create(self.clone(), id).await); + self.peers.write().await.insert( + id, + self.sup + .add_peer(Peer::create(self.clone(), self.relay_tx.clone(), id).await) + .await, + ); } } protocol::ClientboundPacket::ClientLeave { id: _ } => {} @@ -52,7 +68,7 @@ impl State { pub async fn on_relay(&self, sender: usize, p: RelayMessage) { if let Some(peer) = self.peers.read().await.get(&sender).cloned() { - peer.on_relay(p).await + peer.peer().on_relay(p).await } else { warn!("got a packet from a non-existent peer") } diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index e5415d9..299a161 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -1,5 +1,20 @@ +#![feature(box_syntax)] + +use bytes::Bytes; use clap::{Parser, Subcommand}; -use log::error; +use client_native_lib::{ + connect, + peer::Peer, + state::{HasPeer, PeerInit}, + webrtc::data_channel::RTCDataChannel, + Config, +}; +use log::{error, info}; +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::{ + io::{stdin, stdout, AsyncReadExt, AsyncWriteExt}, + sync::RwLock, +}; fn main() { env_logger::init_from_env("LOG"); @@ -26,6 +41,125 @@ pub enum Action { } async fn run() { + let args = Args::parse(); + + connect( + Config { + secret: args.secret.clone(), + signaling_host: args.signaling_host.clone(), + }, + Arc::new(Conn { + args: Arc::new(args), + }), + ) + .await; + tokio::signal::ctrl_c().await.unwrap(); error!("interrupt received, exiting"); } + +pub struct Conn { + pub args: Arc<Args>, +} +pub struct PeerState { + args: Arc<Args>, + peer: Arc<Peer>, + channel: RwLock<Option<Arc<RTCDataChannel>>>, +} + +impl PeerInit<PeerState> for Conn { + fn add_peer( + &self, + peer: Arc<Peer>, + ) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> { + let args = self.args.clone(); + Box::pin(async move { + let p = Arc::new(PeerState { + peer, + args, + channel: Default::default(), + }); + p.clone().init().await; + p + }) + } +} +impl HasPeer for PeerState { + fn peer(&self) -> &Arc<Peer> { + &self.peer + } +} +impl PeerState { + pub async fn init(self: Arc<Self>) { + let s = self.clone(); + match &self.args.action { + Action::Send {} => *s.channel.write().await = Some(self.init_send_channel().await), + Action::Receive {} => { + self.peer + .peer_connection + .on_data_channel(box move |ch| { + let s = s.clone(); + Box::pin(async move { + *s.channel.write().await = Some(ch); + s.init_receive_channel().await + }) + }) + .await; + } + } + } + + pub async fn init_receive_channel(self: Arc<Self>) { + info!("got a data channel"); + let ch = self.channel.read().await.as_ref().unwrap().clone(); + ch.on_open(box move || { + info!("channel opened"); + Box::pin(async {}) + }) + .await; + ch.on_close(box move || { + info!("channel closed"); + Box::pin(async {}) + }) + .await; + ch.on_error(box move |err| { + info!("channel error: {err:?}"); + Box::pin(async {}) + }) + .await; + ch.on_message(box move |mesg| { + Box::pin(async move { stdout().write_all(&mesg.data).await.unwrap() }) + }) + .await; + } + + pub async fn init_send_channel(&self) -> Arc<RTCDataChannel> { + info!("creating data channel"); + let data_channel = self + .peer + .peer_connection + .create_data_channel("file-transfer", None) + .await + .unwrap(); + { + let dc2 = data_channel.clone(); + data_channel + .on_open(box move || { + let data_channel = dc2.clone(); + Box::pin(async move { + loop { + let mut buf = [0u8; 1024]; + let size = stdin().read(&mut buf).await.unwrap(); + data_channel + .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e))) + .await + .unwrap(); + } + }) + }) + .await; + } + + data_channel + } +} |