aboutsummaryrefslogtreecommitdiff
path: root/client-native-rift
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-03-18 10:07:53 +0100
committermetamuffin <metamuffin@disroot.org>2024-03-18 10:07:53 +0100
commite6e7a70b172c56677f5623899b03d1d555a95f89 (patch)
treefe322f7b1d19055383086fc10ea0874ceb644619 /client-native-rift
parentd3e3cfdaa53ae354b1de4f540426af3e6af8096d (diff)
downloadkeks-meet-e6e7a70b172c56677f5623899b03d1d555a95f89.tar
keks-meet-e6e7a70b172c56677f5623899b03d1d555a95f89.tar.bz2
keks-meet-e6e7a70b172c56677f5623899b03d1d555a95f89.tar.zst
rift: port forward poc works
Diffstat (limited to 'client-native-rift')
-rw-r--r--client-native-rift/src/port.rs85
1 files changed, 46 insertions, 39 deletions
diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs
index 857623e..6e69c5b 100644
--- a/client-native-rift/src/port.rs
+++ b/client-native-rift/src/port.rs
@@ -6,7 +6,7 @@ use libkeks::{
use log::{debug, error, info, warn};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
- io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
+ io::{AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::TcpStream,
sync::RwLock,
};
@@ -30,18 +30,36 @@ impl LocalResource for PortExposer {
.create_data_channel(&id, None)
.await
.unwrap();
- let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
+ let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
Arc::new(RwLock::new(None));
{
- let reader = reader.clone();
+ let writer = writer.clone();
let channel2 = channel.clone();
channel.on_open(Box::new(move || {
- let reader = reader.clone();
+ let writer = writer.clone();
Box::pin(async move {
info!("channel open");
match TcpStream::connect(("127.0.0.1", port)).await {
Ok(stream) => {
- *reader.write().await = Some(Box::pin(stream));
+ let (mut read, write) = stream.into_split();
+ *writer.write().await = Some(Box::pin(write));
+ let channel = channel2.clone();
+ tokio::task::spawn(async move {
+ let mut buf = [0u8; 1 << 15];
+ loop {
+ let Ok(size) = read.read(&mut buf).await else {
+ break;
+ };
+ if size == 0 {
+ break;
+ }
+ debug!("send {size}");
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ });
}
Err(e) => {
warn!("upstream connect failed: {e}");
@@ -52,47 +70,31 @@ impl LocalResource for PortExposer {
}))
}
{
- let reader = reader.clone();
+ let writer = writer.clone();
channel.on_close(Box::new(move || {
- let reader = reader.clone();
+ let writer = writer.clone();
Box::pin(async move {
info!("channel closed");
- *reader.write().await = None;
+ *writer.write().await = None;
})
}))
}
{
- let reader = reader.clone();
- let channel2 = channel.clone();
- channel
- .on_buffered_amount_low(Box::new(move || {
- let reader = reader.clone();
- let channel = channel2.clone();
- Box::pin(async move {
- debug!("buffered amount low");
- let mut buf = [0u8; 1 << 15];
- let size = reader
- .write()
- .await
- .as_mut()
- .unwrap()
- .read(&mut buf)
- .await
- .unwrap();
- if size == 0 {
- info!("reached EOF, closing channel");
- let _ = channel.send_text("end").await;
- channel.close().await.unwrap();
- } else {
- channel
- .send(&Bytes::copy_from_slice(&buf[..size]))
- .await
- .unwrap();
- }
- })
- }))
- .await;
- channel.set_buffered_amount_low_threshold(1).await;
+ let writer = writer.clone();
+ channel.on_message(Box::new(move |message| {
+ let writer = writer.clone();
+ Box::pin(async move {
+ debug!("recv {}", message.data.len());
+ writer
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .write_all(&message.data)
+ .await
+ .unwrap();
+ })
+ }));
}
channel.on_error(Box::new(move |err| {
Box::pin(async move { error!("channel error: {err}") })
@@ -127,6 +129,10 @@ impl RequestHandler for ForwardHandler {
let Ok(size) = read.read(&mut buf).await else {
break;
};
+ if size == 0 {
+ break;
+ }
+ debug!("send {size}");
channel
.send(&Bytes::copy_from_slice(&buf[..size]))
.await
@@ -148,6 +154,7 @@ impl RequestHandler for ForwardHandler {
channel.on_message(Box::new(move |message| {
let write = write.clone();
Box::pin(async move {
+ debug!("recv {}", message.data.len());
write.write().await.write_all(&message.data).await.unwrap();
})
}));