diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-15 19:08:08 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-15 19:08:08 +0200 |
commit | 1286385698c4d09876abf29cb5ed595f7cfe1a8f (patch) | |
tree | 3adbe53a1393be941c7459c802f7238c5d0e2d64 /client-native-rift | |
parent | 7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30 (diff) | |
download | keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.bz2 keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.zst |
rift works.
Diffstat (limited to 'client-native-rift')
-rw-r--r-- | client-native-rift/Cargo.toml | 2 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 176 |
2 files changed, 120 insertions, 58 deletions
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml index 1693422..08a08bb 100644 --- a/client-native-rift/Cargo.toml +++ b/client-native-rift/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "client-native-rift" +name = "rift" version = "0.1.0" edition = "2021" diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 299a161..7aa7afe 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -12,7 +12,8 @@ use client_native_lib::{ use log::{error, info}; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ - io::{stdin, stdout, AsyncReadExt, AsyncWriteExt}, + fs::File, + io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::RwLock, }; @@ -34,11 +35,6 @@ pub struct Args { #[clap(subcommand)] action: Action, } -#[derive(Subcommand)] -pub enum Action { - Send {}, - Receive {}, -} async fn run() { let args = Args::parse(); @@ -58,13 +54,45 @@ async fn run() { error!("interrupt received, exiting"); } +#[derive(Subcommand)] +pub enum Action { + Send { filename: Option<String> }, + Receive { filename: Option<String> }, +} + +impl Action { + pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { + match self { + Action::Receive { filename } => { + if let Some(filename) = filename { + Box::pin(File::create(filename).await.unwrap()) + } else { + Box::pin(stdout()) + } + } + _ => unreachable!(), + } + } + pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { + match self { + Action::Send { filename } => { + if let Some(filename) = filename { + Box::pin(File::open(filename).await.unwrap()) + } else { + Box::pin(stdin()) + } + } + _ => unreachable!(), + } + } +} + pub struct Conn { pub args: Arc<Args>, } pub struct PeerState { args: Arc<Args>, peer: Arc<Peer>, - channel: RwLock<Option<Arc<RTCDataChannel>>>, } impl PeerInit<PeerState> for Conn { @@ -74,11 +102,7 @@ impl PeerInit<PeerState> for Conn { ) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> { let args = self.args.clone(); Box::pin(async move { - let p = Arc::new(PeerState { - peer, - args, - channel: Default::default(), - }); + let p = Arc::new(PeerState { peer, args }); p.clone().init().await; p }) @@ -93,47 +117,72 @@ impl PeerState { pub async fn init(self: Arc<Self>) { let s = self.clone(); match &self.args.action { - Action::Send {} => *s.channel.write().await = Some(self.init_send_channel().await), - Action::Receive {} => { + Action::Send { .. } => self.init_send_channel().await, + Action::Receive { .. } => { self.peer .peer_connection .on_data_channel(box move |ch| { let s = s.clone(); - Box::pin(async move { - *s.channel.write().await = Some(ch); - s.init_receive_channel().await - }) + Box::pin(async move { s.init_receive_channel(ch).await }) }) .await; } } } - pub async fn init_receive_channel(self: Arc<Self>) { + pub async fn init_receive_channel(self: Arc<Self>, channel: Arc<RTCDataChannel>) { info!("got a data channel"); - let ch = self.channel.read().await.as_ref().unwrap().clone(); - ch.on_open(box move || { - info!("channel opened"); - Box::pin(async {}) - }) - .await; - ch.on_close(box move || { - info!("channel closed"); - Box::pin(async {}) - }) - .await; - ch.on_error(box move |err| { - info!("channel error: {err:?}"); - Box::pin(async {}) - }) - .await; - ch.on_message(box move |mesg| { - Box::pin(async move { stdout().write_all(&mesg.data).await.unwrap() }) - }) - .await; + let writer = Arc::new(RwLock::new(None)); + { + let writer = writer.clone(); + channel + .on_open(box move || { + info!("channel opened"); + Box::pin(async move { + *writer.write().await = Some(self.args.action.create_writer().await); + }) + }) + .await; + } + { + let writer = writer.clone(); + channel + .on_close(box move || { + info!("channel closed"); + let writer = writer.clone(); + Box::pin(async move { + *writer.write().await = None; // drop the writer, so it closes the file or whatever + }) + }) + .await; + } + { + let writer = writer.clone(); + channel + .on_message(box move |mesg| { + let writer = writer.clone(); + Box::pin(async move { + writer + .write() + .await + .as_mut() + .unwrap() + .write_all(&mesg.data) + .await + .unwrap(); + }) + }) + .await; + } + channel + .on_error(box move |err| { + info!("channel error: {err:?}"); + Box::pin(async {}) + }) + .await; } - pub async fn init_send_channel(&self) -> Arc<RTCDataChannel> { + pub async fn init_send_channel(&self) { info!("creating data channel"); let data_channel = self .peer @@ -141,25 +190,38 @@ impl PeerState { .create_data_channel("file-transfer", None) .await .unwrap(); - { - let dc2 = data_channel.clone(); - data_channel - .on_open(box move || { - let data_channel = dc2.clone(); - Box::pin(async move { - loop { - let mut buf = [0u8; 1024]; - let size = stdin().read(&mut buf).await.unwrap(); - data_channel - .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e))) - .await - .unwrap(); + let weak = Arc::downgrade(&data_channel); + let args = self.args.clone(); + data_channel + .on_open(box move || { + let args = args.clone(); + let data_channel = weak.upgrade().unwrap(); + Box::pin(async move { + let mut reader = args.action.create_reader().await; + info!("starting transmission"); + loop { + let mut buf = [0u8; 4096]; + let size = reader.read(&mut buf).await.unwrap(); + if size == 0 { + break; } - }) + data_channel + .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e))) + .await + .unwrap(); + } + info!("transmission finished"); + drop(reader); + info!("now closing the channel again…"); + data_channel.close().await.unwrap(); }) - .await; - } - + }) + .await; + data_channel + .on_close(box || Box::pin(async move { info!("data channel closed") })) + .await; data_channel + .on_error(box |err| Box::pin(async move { error!("data channel error: {err}") })) + .await; } } |