aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-14 23:03:52 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-14 23:03:52 +0200
commit7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30 (patch)
treecfcdd0819552f1ccd9afd16f7bb6a2e13b662188
parentac1494fab79444a2da487c2b2376782f6e45efe0 (diff)
downloadkeks-meet-7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30.tar
keks-meet-7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30.tar.bz2
keks-meet-7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30.tar.zst
rift works ™™
-rw-r--r--client-native-lib/src/lib.rs27
-rw-r--r--client-native-lib/src/peer.rs69
-rw-r--r--client-native-lib/src/state.rs36
-rw-r--r--client-native-rift/src/main.rs136
4 files changed, 200 insertions, 68 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index 118d962..68dcecb 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -3,9 +3,9 @@
use log::debug;
use signaling::signaling_connect;
-use state::State;
-use std::sync::Arc;
-use tokio::sync::RwLock;
+use state::{HasPeer, PeerInit, State};
+use std::{marker::Sync, sync::Arc};
+use tokio::sync::{mpsc::unbounded_channel, RwLock};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
@@ -19,12 +19,18 @@ pub mod protocol;
pub mod signaling;
pub mod state;
+pub use webrtc;
+
pub struct Config {
pub signaling_host: String,
pub secret: String,
}
-pub async fn connect(config: Config) -> Arc<State> {
+pub async fn connect<I, P>(config: Config, sup: Arc<I>) -> Arc<State<P, I>>
+where
+ I: PeerInit<P> + Sync + std::marker::Send + 'static,
+ P: HasPeer + Sync + std::marker::Send + 'static,
+{
let (sender, mut recv) = signaling_connect(&config.signaling_host, &config.secret).await;
let key = crypto::Key::derive(&config.secret);
@@ -38,6 +44,7 @@ pub async fn connect(config: Config) -> Arc<State> {
.with_interceptor_registry(registry)
.build();
+ let (relay_tx, mut relay_rx) = unbounded_channel();
let state = Arc::new(State {
peers: Default::default(),
key,
@@ -45,12 +52,24 @@ pub async fn connect(config: Config) -> Arc<State> {
my_id: RwLock::new(None),
sender,
config,
+ relay_tx,
+ sup,
});
{
let state = state.clone();
tokio::spawn(async move {
debug!("receiving packets now");
+ while let Some((r, p)) = relay_rx.recv().await {
+ let state = state.clone();
+ state.send_relay(r, p).await
+ }
+ });
+ }
+ {
+ let state = state.clone();
+ tokio::spawn(async move {
+ debug!("receiving packets now");
while let Some(packet) = recv.recv().await {
debug!("{packet:?}");
let state = state.clone();
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index 64eb641..280cc06 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -1,7 +1,11 @@
+use crate::{
+ protocol::{self, RTCSessionDescriptionInit, RelayMessage},
+ state::{HasPeer, PeerInit, State},
+};
use log::info;
use std::sync::Arc;
+use tokio::sync::mpsc::UnboundedSender;
use webrtc::{
- data_channel::data_channel_message::DataChannelMessage,
ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer},
peer_connection::{
configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
@@ -9,19 +13,18 @@ use webrtc::{
},
};
-use crate::{
- protocol::{self, RTCSessionDescriptionInit, RelayMessage},
- state::State,
-};
-
pub struct Peer {
- state: Arc<State>, // maybe use Weak later
- peer_connection: RTCPeerConnection,
- id: usize,
+ pub signal: UnboundedSender<(usize, RelayMessage)>,
+ pub peer_connection: RTCPeerConnection,
+ pub id: usize,
}
impl Peer {
- pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> {
+ pub async fn create<P: HasPeer, I: PeerInit<P>>(
+ state: Arc<State<P, I>>,
+ signal: UnboundedSender<(usize, RelayMessage)>,
+ id: usize,
+ ) -> Arc<Self> {
info!("({id}) peer joined");
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
@@ -32,15 +35,14 @@ impl Peer {
};
let peer_connection = state.api.new_peer_connection(config).await.unwrap();
-
let peer = Arc::new(Self {
+ signal,
peer_connection,
id,
- state: state.clone(),
});
peer.peer_connection
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
- println!("conn state: {s}");
+ info!("connection state changed: {s}");
Box::pin(async {})
}))
.await;
@@ -81,50 +83,11 @@ impl Peer {
})
.await;
}
-
- // if let Action::Send { .. } = &peer.state.args.action {
- // peer.start_transfer().await
- // }
-
peer
}
pub async fn send_relay(&self, inner: RelayMessage) {
- self.state.send_relay(self.id, inner).await
- }
-
- pub async fn start_transfer(&self) {
- info!("starting data channel");
- let data_channel = self
- .peer_connection
- .create_data_channel("file-transfer", None)
- .await
- .unwrap();
-
- data_channel
- .on_message(Box::new(move |msg: DataChannelMessage| {
- let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
- println!("message! '{}'", msg_str);
- Box::pin(async {})
- }))
- .await;
-
- {
- let dc2 = data_channel.clone();
- data_channel
- .on_open(box move || {
- let data_channel = dc2.clone();
- Box::pin(async move {
- loop {
- data_channel
- .send(&bytes::Bytes::from_static(b"test\n"))
- .await
- .unwrap();
- }
- })
- })
- .await;
- }
+ self.signal.send((self.id, inner)).unwrap()
}
pub async fn on_relay(&self, p: RelayMessage) {
diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs
index 57c4b29..c5e9365 100644
--- a/client-native-lib/src/state.rs
+++ b/client-native-lib/src/state.rs
@@ -1,5 +1,6 @@
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::HashMap, pin::Pin, sync::Arc};
+use futures_util::Future;
use log::warn;
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use webrtc::api::API;
@@ -7,18 +8,31 @@ use webrtc::api::API;
use crate::{
crypto::Key,
peer::Peer,
- protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, Config,
+ protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket},
+ Config,
};
-pub struct State {
+pub trait HasPeer {
+ fn peer(&self) -> &Arc<Peer>;
+}
+pub trait PeerInit<P> {
+ fn add_peer(
+ &self,
+ p: Arc<Peer>,
+ ) -> Pin<Box<dyn Future<Output = Arc<P>> + 'static + Send + Sync>>;
+}
+
+pub struct State<P: HasPeer, I: PeerInit<P>> {
+ pub sup: Arc<I>,
pub config: Config,
pub api: API,
pub key: Key,
pub my_id: RwLock<Option<usize>>,
pub sender: UnboundedSender<ServerboundPacket>,
- pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
+ pub peers: RwLock<HashMap<usize, Arc<P>>>,
+ pub relay_tx: UnboundedSender<(usize, RelayMessage)>,
}
-impl State {
+impl<P: HasPeer, I: PeerInit<P>> State<P, I> {
pub async fn my_id(&self) -> usize {
self.my_id.read().await.expect("not initialized yet")
}
@@ -35,10 +49,12 @@ impl State {
if id == self.my_id().await {
// we joined - YAY!
} else {
- self.peers
- .write()
- .await
- .insert(id, Peer::create(self.clone(), id).await);
+ self.peers.write().await.insert(
+ id,
+ self.sup
+ .add_peer(Peer::create(self.clone(), self.relay_tx.clone(), id).await)
+ .await,
+ );
}
}
protocol::ClientboundPacket::ClientLeave { id: _ } => {}
@@ -52,7 +68,7 @@ impl State {
pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
- peer.on_relay(p).await
+ peer.peer().on_relay(p).await
} else {
warn!("got a packet from a non-existent peer")
}
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index e5415d9..299a161 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -1,5 +1,20 @@
+#![feature(box_syntax)]
+
+use bytes::Bytes;
use clap::{Parser, Subcommand};
-use log::error;
+use client_native_lib::{
+ connect,
+ peer::Peer,
+ state::{HasPeer, PeerInit},
+ webrtc::data_channel::RTCDataChannel,
+ Config,
+};
+use log::{error, info};
+use std::{future::Future, pin::Pin, sync::Arc};
+use tokio::{
+ io::{stdin, stdout, AsyncReadExt, AsyncWriteExt},
+ sync::RwLock,
+};
fn main() {
env_logger::init_from_env("LOG");
@@ -26,6 +41,125 @@ pub enum Action {
}
async fn run() {
+ let args = Args::parse();
+
+ connect(
+ Config {
+ secret: args.secret.clone(),
+ signaling_host: args.signaling_host.clone(),
+ },
+ Arc::new(Conn {
+ args: Arc::new(args),
+ }),
+ )
+ .await;
+
tokio::signal::ctrl_c().await.unwrap();
error!("interrupt received, exiting");
}
+
+pub struct Conn {
+ pub args: Arc<Args>,
+}
+pub struct PeerState {
+ args: Arc<Args>,
+ peer: Arc<Peer>,
+ channel: RwLock<Option<Arc<RTCDataChannel>>>,
+}
+
+impl PeerInit<PeerState> for Conn {
+ fn add_peer(
+ &self,
+ peer: Arc<Peer>,
+ ) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> {
+ let args = self.args.clone();
+ Box::pin(async move {
+ let p = Arc::new(PeerState {
+ peer,
+ args,
+ channel: Default::default(),
+ });
+ p.clone().init().await;
+ p
+ })
+ }
+}
+impl HasPeer for PeerState {
+ fn peer(&self) -> &Arc<Peer> {
+ &self.peer
+ }
+}
+impl PeerState {
+ pub async fn init(self: Arc<Self>) {
+ let s = self.clone();
+ match &self.args.action {
+ Action::Send {} => *s.channel.write().await = Some(self.init_send_channel().await),
+ Action::Receive {} => {
+ self.peer
+ .peer_connection
+ .on_data_channel(box move |ch| {
+ let s = s.clone();
+ Box::pin(async move {
+ *s.channel.write().await = Some(ch);
+ s.init_receive_channel().await
+ })
+ })
+ .await;
+ }
+ }
+ }
+
+ pub async fn init_receive_channel(self: Arc<Self>) {
+ info!("got a data channel");
+ let ch = self.channel.read().await.as_ref().unwrap().clone();
+ ch.on_open(box move || {
+ info!("channel opened");
+ Box::pin(async {})
+ })
+ .await;
+ ch.on_close(box move || {
+ info!("channel closed");
+ Box::pin(async {})
+ })
+ .await;
+ ch.on_error(box move |err| {
+ info!("channel error: {err:?}");
+ Box::pin(async {})
+ })
+ .await;
+ ch.on_message(box move |mesg| {
+ Box::pin(async move { stdout().write_all(&mesg.data).await.unwrap() })
+ })
+ .await;
+ }
+
+ pub async fn init_send_channel(&self) -> Arc<RTCDataChannel> {
+ info!("creating data channel");
+ let data_channel = self
+ .peer
+ .peer_connection
+ .create_data_channel("file-transfer", None)
+ .await
+ .unwrap();
+ {
+ let dc2 = data_channel.clone();
+ data_channel
+ .on_open(box move || {
+ let data_channel = dc2.clone();
+ Box::pin(async move {
+ loop {
+ let mut buf = [0u8; 1024];
+ let size = stdin().read(&mut buf).await.unwrap();
+ data_channel
+ .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
+ .await
+ .unwrap();
+ }
+ })
+ })
+ .await;
+ }
+
+ data_channel
+ }
+}