diff options
author | metamuffin <metamuffin@disroot.org> | 2024-03-18 10:07:53 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-03-18 10:07:53 +0100 |
commit | e6e7a70b172c56677f5623899b03d1d555a95f89 (patch) | |
tree | fe322f7b1d19055383086fc10ea0874ceb644619 /client-native-rift | |
parent | d3e3cfdaa53ae354b1de4f540426af3e6af8096d (diff) | |
download | keks-meet-e6e7a70b172c56677f5623899b03d1d555a95f89.tar keks-meet-e6e7a70b172c56677f5623899b03d1d555a95f89.tar.bz2 keks-meet-e6e7a70b172c56677f5623899b03d1d555a95f89.tar.zst |
rift: port forward poc works
Diffstat (limited to 'client-native-rift')
-rw-r--r-- | client-native-rift/src/port.rs | 85 |
1 files changed, 46 insertions, 39 deletions
diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs index 857623e..6e69c5b 100644 --- a/client-native-rift/src/port.rs +++ b/client-native-rift/src/port.rs @@ -6,7 +6,7 @@ use libkeks::{ use log::{debug, error, info, warn}; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}, net::TcpStream, sync::RwLock, }; @@ -30,18 +30,36 @@ impl LocalResource for PortExposer { .create_data_channel(&id, None) .await .unwrap(); - let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = + let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = Arc::new(RwLock::new(None)); { - let reader = reader.clone(); + let writer = writer.clone(); let channel2 = channel.clone(); channel.on_open(Box::new(move || { - let reader = reader.clone(); + let writer = writer.clone(); Box::pin(async move { info!("channel open"); match TcpStream::connect(("127.0.0.1", port)).await { Ok(stream) => { - *reader.write().await = Some(Box::pin(stream)); + let (mut read, write) = stream.into_split(); + *writer.write().await = Some(Box::pin(write)); + let channel = channel2.clone(); + tokio::task::spawn(async move { + let mut buf = [0u8; 1 << 15]; + loop { + let Ok(size) = read.read(&mut buf).await else { + break; + }; + if size == 0 { + break; + } + debug!("send {size}"); + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }); } Err(e) => { warn!("upstream connect failed: {e}"); @@ -52,47 +70,31 @@ impl LocalResource for PortExposer { })) } { - let reader = reader.clone(); + let writer = writer.clone(); channel.on_close(Box::new(move || { - let reader = reader.clone(); + let writer = writer.clone(); Box::pin(async move { info!("channel closed"); - *reader.write().await = None; + *writer.write().await = None; }) })) } { - let reader = reader.clone(); - let channel2 = channel.clone(); - channel - .on_buffered_amount_low(Box::new(move || { - let reader = reader.clone(); - let channel = channel2.clone(); - Box::pin(async move { - debug!("buffered amount low"); - let mut buf = [0u8; 1 << 15]; - let size = reader - .write() - .await - .as_mut() - .unwrap() - .read(&mut buf) - .await - .unwrap(); - if size == 0 { - info!("reached EOF, closing channel"); - let _ = channel.send_text("end").await; - channel.close().await.unwrap(); - } else { - channel - .send(&Bytes::copy_from_slice(&buf[..size])) - .await - .unwrap(); - } - }) - })) - .await; - channel.set_buffered_amount_low_threshold(1).await; + let writer = writer.clone(); + channel.on_message(Box::new(move |message| { + let writer = writer.clone(); + Box::pin(async move { + debug!("recv {}", message.data.len()); + writer + .write() + .await + .as_mut() + .unwrap() + .write_all(&message.data) + .await + .unwrap(); + }) + })); } channel.on_error(Box::new(move |err| { Box::pin(async move { error!("channel error: {err}") }) @@ -127,6 +129,10 @@ impl RequestHandler for ForwardHandler { let Ok(size) = read.read(&mut buf).await else { break; }; + if size == 0 { + break; + } + debug!("send {size}"); channel .send(&Bytes::copy_from_slice(&buf[..size])) .await @@ -148,6 +154,7 @@ impl RequestHandler for ForwardHandler { channel.on_message(Box::new(move |message| { let write = write.clone(); Box::pin(async move { + debug!("recv {}", message.data.len()); write.write().await.write_all(&message.data).await.unwrap(); }) })); |