aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client-native-lib/src/lib.rs11
-rw-r--r--client-native-lib/src/peer.rs6
-rw-r--r--client-native-lib/src/state.rs27
-rw-r--r--client-native-rift/src/main.rs291
4 files changed, 152 insertions, 183 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index d41fc58..70ec2bd 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -8,8 +8,8 @@
use log::debug;
use signaling::signaling_connect;
-use state::{HasPeer, PeerInit, State};
-use std::{marker::Sync, sync::Arc};
+use state::State;
+use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, RwLock};
use webrtc::{
api::{
@@ -31,11 +31,7 @@ pub struct Config {
pub secret: String,
}
-pub async fn connect<I, P>(config: Config, sup: Arc<I>) -> Arc<State<P, I>>
-where
- I: PeerInit<P> + Sync + std::marker::Send + 'static,
- P: HasPeer + Sync + std::marker::Send + 'static,
-{
+pub async fn connect(config: Config) -> Arc<State> {
let (sender, mut recv) = signaling_connect(&config.signaling_host, &config.secret).await;
let key = crypto::Key::derive(&config.secret);
@@ -58,7 +54,6 @@ where
sender,
config,
relay_tx,
- sup,
});
{
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index b682bb7..d2a6fbc 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -5,7 +5,7 @@
*/
use crate::{
protocol::{self, RelayMessage, Sdp},
- state::{HasPeer, PeerInit, State},
+ state::State,
};
use log::info;
use std::sync::Arc;
@@ -25,8 +25,8 @@ pub struct Peer {
}
impl Peer {
- pub async fn create<P: HasPeer, I: PeerInit<P>>(
- state: Arc<State<P, I>>,
+ pub async fn create(
+ state: Arc<State>,
signal: UnboundedSender<(usize, RelayMessage)>,
id: usize,
) -> Arc<Self> {
diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs
index af4a05b..6182e66 100644
--- a/client-native-lib/src/state.rs
+++ b/client-native-lib/src/state.rs
@@ -3,9 +3,7 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2022 metamuffin <metamuffin@disroot.org>
*/
-use std::{collections::HashMap, pin::Pin, sync::Arc};
-
-use futures_util::Future;
+use std::{collections::HashMap, sync::Arc};
use log::warn;
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use webrtc::api::API;
@@ -17,27 +15,16 @@ use crate::{
Config,
};
-pub trait HasPeer {
- fn peer(&self) -> &Arc<Peer>;
-}
-pub trait PeerInit<P> {
- fn add_peer(
- &self,
- p: Arc<Peer>,
- ) -> Pin<Box<dyn Future<Output = Arc<P>> + 'static + Send + Sync>>;
-}
-
-pub struct State<P: HasPeer, I: PeerInit<P>> {
- pub sup: Arc<I>,
+pub struct State {
pub config: Config,
pub api: API,
pub key: Key,
pub my_id: RwLock<Option<usize>>,
pub sender: UnboundedSender<ServerboundPacket>,
- pub peers: RwLock<HashMap<usize, Arc<P>>>,
+ pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
pub relay_tx: UnboundedSender<(usize, RelayMessage)>,
}
-impl<P: HasPeer, I: PeerInit<P>> State<P, I> {
+impl State {
pub async fn my_id(&self) -> usize {
self.my_id.read().await.expect("not initialized yet")
}
@@ -56,9 +43,7 @@ impl<P: HasPeer, I: PeerInit<P>> State<P, I> {
} else {
self.peers.write().await.insert(
id,
- self.sup
- .add_peer(Peer::create(self.clone(), self.relay_tx.clone(), id).await)
- .await,
+ Peer::create(self.clone(), self.relay_tx.clone(), id).await,
);
}
}
@@ -75,7 +60,7 @@ impl<P: HasPeer, I: PeerInit<P>> State<P, I> {
pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
- peer.peer().on_relay(p).await
+ peer.on_relay(p).await
} else {
warn!("got a packet from a non-existent peer")
}
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 2e54379..b5fcca9 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -7,13 +7,7 @@
use bytes::Bytes;
use clap::{Parser, Subcommand};
-use client_native_lib::{
- connect,
- peer::Peer,
- state::{HasPeer, PeerInit},
- webrtc::data_channel::RTCDataChannel,
- Config,
-};
+use client_native_lib::{ connect, peer::Peer, webrtc::data_channel::RTCDataChannel, Config};
use log::{error, info};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
@@ -49,15 +43,10 @@ pub struct Args {
async fn run() {
let args = Args::parse();
- connect(
- Config {
- secret: args.secret.clone(),
- signaling_host: args.signaling_host.clone(),
- },
- Arc::new(Conn {
- args: Arc::new(args),
- }),
- )
+ connect(Config {
+ secret: args.secret.clone(),
+ signaling_host: args.signaling_host.clone(),
+ })
.await;
tokio::signal::ctrl_c().await.unwrap();
@@ -99,141 +88,141 @@ impl Action {
}
}
-pub struct Conn {
- pub args: Arc<Args>,
-}
-pub struct PeerState {
- args: Arc<Args>,
- peer: Arc<Peer>,
-}
+// pub struct Conn {
+// pub args: Arc<Args>,
+// }
+// pub struct PeerState {
+// args: Arc<Args>,
+// peer: Arc<Peer>,
+// }
-impl PeerInit<PeerState> for Conn {
- fn add_peer(
- &self,
- peer: Arc<Peer>,
- ) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> {
- let args = self.args.clone();
- Box::pin(async move {
- let p = Arc::new(PeerState { peer, args });
- p.clone().init().await;
- p
- })
- }
-}
-impl HasPeer for PeerState {
- fn peer(&self) -> &Arc<Peer> {
- &self.peer
- }
-}
-impl PeerState {
- pub async fn init(self: Arc<Self>) {
- let s = self.clone();
- match &self.args.action {
- Action::Send { .. } => self.init_send_channel().await,
- Action::Receive { .. } => {
- self.peer
- .peer_connection
- .on_data_channel(box move |ch| {
- let s = s.clone();
- Box::pin(async move { s.init_receive_channel(ch).await })
- })
- .await;
- }
- }
- }
+// impl PeerInit<PeerState> for Conn {
+// fn add_peer(
+// &self,
+// peer: Arc<Peer>,
+// ) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> {
+// let args = self.args.clone();
+// Box::pin(async move {
+// let p = Arc::new(PeerState { peer, args });
+// p.clone().init().await;
+// p
+// })
+// }
+// }
+// impl HasPeer for PeerState {
+// fn peer(&self) -> &Arc<Peer> {
+// &self.peer
+// }
+// }
+// impl PeerState {
+// pub async fn init(self: Arc<Self>) {
+// let s = self.clone();
+// match &self.args.action {
+// Action::Send { .. } => self.init_send_channel().await,
+// Action::Receive { .. } => {
+// self.peer
+// .peer_connection
+// .on_data_channel(box move |ch| {
+// let s = s.clone();
+// Box::pin(async move { s.init_receive_channel(ch).await })
+// })
+// .await;
+// }
+// }
+// }
- pub async fn init_receive_channel(self: Arc<Self>, channel: Arc<RTCDataChannel>) {
- info!("got a data channel");
- let writer = Arc::new(RwLock::new(None));
- {
- let writer = writer.clone();
- channel
- .on_open(box move || {
- info!("channel opened");
- Box::pin(async move {
- *writer.write().await = Some(self.args.action.create_writer().await);
- })
- })
- .await;
- }
- {
- let writer = writer.clone();
- channel
- .on_close(box move || {
- info!("channel closed");
- let writer = writer.clone();
- Box::pin(async move {
- *writer.write().await = None; // drop the writer, so it closes the file or whatever
- })
- })
- .await;
- }
- {
- let writer = writer.clone();
- channel
- .on_message(box move |mesg| {
- let writer = writer.clone();
- Box::pin(async move {
- writer
- .write()
- .await
- .as_mut()
- .unwrap()
- .write_all(&mesg.data)
- .await
- .unwrap();
- })
- })
- .await;
- }
- channel
- .on_error(box move |err| {
- info!("channel error: {err:?}");
- Box::pin(async {})
- })
- .await;
- }
+// pub async fn init_receive_channel(self: Arc<Self>, channel: Arc<RTCDataChannel>) {
+// info!("got a data channel");
+// let writer = Arc::new(RwLock::new(None));
+// {
+// let writer = writer.clone();
+// channel
+// .on_open(box move || {
+// info!("channel opened");
+// Box::pin(async move {
+// *writer.write().await = Some(self.args.action.create_writer().await);
+// })
+// })
+// .await;
+// }
+// {
+// let writer = writer.clone();
+// channel
+// .on_close(box move || {
+// info!("channel closed");
+// let writer = writer.clone();
+// Box::pin(async move {
+// *writer.write().await = None; // drop the writer, so it closes the file or whatever
+// })
+// })
+// .await;
+// }
+// {
+// let writer = writer.clone();
+// channel
+// .on_message(box move |mesg| {
+// let writer = writer.clone();
+// Box::pin(async move {
+// writer
+// .write()
+// .await
+// .as_mut()
+// .unwrap()
+// .write_all(&mesg.data)
+// .await
+// .unwrap();
+// })
+// })
+// .await;
+// }
+// channel
+// .on_error(box move |err| {
+// info!("channel error: {err:?}");
+// Box::pin(async {})
+// })
+// .await;
+// }
- pub async fn init_send_channel(&self) {
- info!("creating data channel");
- let data_channel = self
- .peer
- .peer_connection
- .create_data_channel("file-transfer", None)
- .await
- .unwrap();
- let weak = Arc::downgrade(&data_channel);
- let args = self.args.clone();
- data_channel
- .on_open(box move || {
- let args = args.clone();
- let data_channel = weak.upgrade().unwrap();
- Box::pin(async move {
- let mut reader = args.action.create_reader().await;
- info!("starting transmission");
- loop {
- let mut buf = [0u8; 4096];
- let size = reader.read(&mut buf).await.unwrap();
- if size == 0 {
- break;
- }
- data_channel
- .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
- .await
- .unwrap();
- }
- info!("transmission finished");
- drop(reader);
- info!("now closing the channel again…");
- data_channel.close().await.unwrap();
- })
- })
- .await;
- data_channel
- .on_close(box || Box::pin(async move { info!("data channel closed") }))
- .await;
- data_channel
- .on_error(box |err| Box::pin(async move { error!("data channel error: {err}") }))
- .await;
- }
-}
+// pub async fn init_send_channel(&self) {
+// info!("creating data channel");
+// let data_channel = self
+// .peer
+// .peer_connection
+// .create_data_channel("file-transfer", None)
+// .await
+// .unwrap();
+// let weak = Arc::downgrade(&data_channel);
+// let args = self.args.clone();
+// data_channel
+// .on_open(box move || {
+// let args = args.clone();
+// let data_channel = weak.upgrade().unwrap();
+// Box::pin(async move {
+// let mut reader = args.action.create_reader().await;
+// info!("starting transmission");
+// loop {
+// let mut buf = [0u8; 4096];
+// let size = reader.read(&mut buf).await.unwrap();
+// if size == 0 {
+// break;
+// }
+// data_channel
+// .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
+// .await
+// .unwrap();
+// }
+// info!("transmission finished");
+// drop(reader);
+// info!("now closing the channel again…");
+// data_channel.close().await.unwrap();
+// })
+// })
+// .await;
+// data_channel
+// .on_close(box || Box::pin(async move { info!("data channel closed") }))
+// .await;
+// data_channel
+// .on_error(box |err| Box::pin(async move { error!("data channel error: {err}") }))
+// .await;
+// }
+// }