summaryrefslogtreecommitdiff
path: root/client-native-rift/src/port.rs
diff options
context:
space:
mode:
Diffstat (limited to 'client-native-rift/src/port.rs')
-rw-r--r--client-native-rift/src/port.rs66
1 files changed, 63 insertions, 3 deletions
diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs
index ae56f2c..857623e 100644
--- a/client-native-rift/src/port.rs
+++ b/client-native-rift/src/port.rs
@@ -1,9 +1,12 @@
+use crate::RequestHandler;
use bytes::Bytes;
-use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
+use libkeks::{
+ peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource,
+};
use log::{debug, error, info, warn};
-use std::{pin::Pin, sync::Arc};
+use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
- io::{AsyncRead, AsyncReadExt},
+ io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::RwLock,
};
@@ -97,3 +100,60 @@ impl LocalResource for PortExposer {
})
}
}
+
+pub struct ForwardHandler {
+ pub stream: Arc<RwLock<Option<TcpStream>>>,
+}
+impl RequestHandler for ForwardHandler {
+ fn on_connect(
+ &self,
+ _resource: ProvideInfo,
+ channel: Arc<RTCDataChannel>,
+ ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>> {
+ let stream = self.stream.clone();
+ Box::pin(async move {
+ let stream = stream.write().await.take().unwrap();
+ let (mut read, write) = stream.into_split();
+ let write = Arc::new(RwLock::new(write));
+
+ let channel2 = channel.clone();
+ channel.on_open(Box::new(move || {
+ Box::pin(async move {
+ info!("channel open");
+ let channel = channel2.clone();
+ tokio::task::spawn(async move {
+ let mut buf = [0u8; 1 << 15];
+ loop {
+ let Ok(size) = read.read(&mut buf).await else {
+ break;
+ };
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ });
+ })
+ }));
+ channel.on_close(Box::new(move || {
+ Box::pin(async move {
+ info!("channel closed");
+ })
+ }));
+ channel.on_error(Box::new(move |err| {
+ Box::pin(async move { error!("channel error: {err}") })
+ }));
+ {
+ let write = write.clone();
+ channel.on_message(Box::new(move |message| {
+ let write = write.clone();
+ Box::pin(async move {
+ write.write().await.write_all(&message.data).await.unwrap();
+ })
+ }));
+ }
+
+ Ok(())
+ })
+ }
+}