aboutsummaryrefslogtreecommitdiff
path: root/client-native-rift
diff options
context:
space:
mode:
Diffstat (limited to 'client-native-rift')
-rw-r--r--client-native-rift/Cargo.toml2
-rw-r--r--client-native-rift/src/main.rs176
2 files changed, 120 insertions, 58 deletions
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml
index 1693422..08a08bb 100644
--- a/client-native-rift/Cargo.toml
+++ b/client-native-rift/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "client-native-rift"
+name = "rift"
version = "0.1.0"
edition = "2021"
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 299a161..7aa7afe 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -12,7 +12,8 @@ use client_native_lib::{
use log::{error, info};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
- io::{stdin, stdout, AsyncReadExt, AsyncWriteExt},
+ fs::File,
+ io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
@@ -34,11 +35,6 @@ pub struct Args {
#[clap(subcommand)]
action: Action,
}
-#[derive(Subcommand)]
-pub enum Action {
- Send {},
- Receive {},
-}
async fn run() {
let args = Args::parse();
@@ -58,13 +54,45 @@ async fn run() {
error!("interrupt received, exiting");
}
+#[derive(Subcommand)]
+pub enum Action {
+ Send { filename: Option<String> },
+ Receive { filename: Option<String> },
+}
+
+impl Action {
+ pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> {
+ match self {
+ Action::Receive { filename } => {
+ if let Some(filename) = filename {
+ Box::pin(File::create(filename).await.unwrap())
+ } else {
+ Box::pin(stdout())
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+ pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
+ match self {
+ Action::Send { filename } => {
+ if let Some(filename) = filename {
+ Box::pin(File::open(filename).await.unwrap())
+ } else {
+ Box::pin(stdin())
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
pub struct Conn {
pub args: Arc<Args>,
}
pub struct PeerState {
args: Arc<Args>,
peer: Arc<Peer>,
- channel: RwLock<Option<Arc<RTCDataChannel>>>,
}
impl PeerInit<PeerState> for Conn {
@@ -74,11 +102,7 @@ impl PeerInit<PeerState> for Conn {
) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> {
let args = self.args.clone();
Box::pin(async move {
- let p = Arc::new(PeerState {
- peer,
- args,
- channel: Default::default(),
- });
+ let p = Arc::new(PeerState { peer, args });
p.clone().init().await;
p
})
@@ -93,47 +117,72 @@ impl PeerState {
pub async fn init(self: Arc<Self>) {
let s = self.clone();
match &self.args.action {
- Action::Send {} => *s.channel.write().await = Some(self.init_send_channel().await),
- Action::Receive {} => {
+ Action::Send { .. } => self.init_send_channel().await,
+ Action::Receive { .. } => {
self.peer
.peer_connection
.on_data_channel(box move |ch| {
let s = s.clone();
- Box::pin(async move {
- *s.channel.write().await = Some(ch);
- s.init_receive_channel().await
- })
+ Box::pin(async move { s.init_receive_channel(ch).await })
})
.await;
}
}
}
- pub async fn init_receive_channel(self: Arc<Self>) {
+ pub async fn init_receive_channel(self: Arc<Self>, channel: Arc<RTCDataChannel>) {
info!("got a data channel");
- let ch = self.channel.read().await.as_ref().unwrap().clone();
- ch.on_open(box move || {
- info!("channel opened");
- Box::pin(async {})
- })
- .await;
- ch.on_close(box move || {
- info!("channel closed");
- Box::pin(async {})
- })
- .await;
- ch.on_error(box move |err| {
- info!("channel error: {err:?}");
- Box::pin(async {})
- })
- .await;
- ch.on_message(box move |mesg| {
- Box::pin(async move { stdout().write_all(&mesg.data).await.unwrap() })
- })
- .await;
+ let writer = Arc::new(RwLock::new(None));
+ {
+ let writer = writer.clone();
+ channel
+ .on_open(box move || {
+ info!("channel opened");
+ Box::pin(async move {
+ *writer.write().await = Some(self.args.action.create_writer().await);
+ })
+ })
+ .await;
+ }
+ {
+ let writer = writer.clone();
+ channel
+ .on_close(box move || {
+ info!("channel closed");
+ let writer = writer.clone();
+ Box::pin(async move {
+ *writer.write().await = None; // drop the writer, so it closes the file or whatever
+ })
+ })
+ .await;
+ }
+ {
+ let writer = writer.clone();
+ channel
+ .on_message(box move |mesg| {
+ let writer = writer.clone();
+ Box::pin(async move {
+ writer
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .write_all(&mesg.data)
+ .await
+ .unwrap();
+ })
+ })
+ .await;
+ }
+ channel
+ .on_error(box move |err| {
+ info!("channel error: {err:?}");
+ Box::pin(async {})
+ })
+ .await;
}
- pub async fn init_send_channel(&self) -> Arc<RTCDataChannel> {
+ pub async fn init_send_channel(&self) {
info!("creating data channel");
let data_channel = self
.peer
@@ -141,25 +190,38 @@ impl PeerState {
.create_data_channel("file-transfer", None)
.await
.unwrap();
- {
- let dc2 = data_channel.clone();
- data_channel
- .on_open(box move || {
- let data_channel = dc2.clone();
- Box::pin(async move {
- loop {
- let mut buf = [0u8; 1024];
- let size = stdin().read(&mut buf).await.unwrap();
- data_channel
- .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
- .await
- .unwrap();
+ let weak = Arc::downgrade(&data_channel);
+ let args = self.args.clone();
+ data_channel
+ .on_open(box move || {
+ let args = args.clone();
+ let data_channel = weak.upgrade().unwrap();
+ Box::pin(async move {
+ let mut reader = args.action.create_reader().await;
+ info!("starting transmission");
+ loop {
+ let mut buf = [0u8; 4096];
+ let size = reader.read(&mut buf).await.unwrap();
+ if size == 0 {
+ break;
}
- })
+ data_channel
+ .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
+ .await
+ .unwrap();
+ }
+ info!("transmission finished");
+ drop(reader);
+ info!("now closing the channel again…");
+ data_channel.close().await.unwrap();
})
- .await;
- }
-
+ })
+ .await;
+ data_channel
+ .on_close(box || Box::pin(async move { info!("data channel closed") }))
+ .await;
data_channel
+ .on_error(box |err| Box::pin(async move { error!("data channel error: {err}") }))
+ .await;
}
}