diff options
author | metamuffin <metamuffin@disroot.org> | 2024-03-18 09:51:38 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-03-18 09:51:38 +0100 |
commit | d3e3cfdaa53ae354b1de4f540426af3e6af8096d (patch) | |
tree | 20fc2addda444373f4c50377a4179503883d3cee /client-native-rift/src/port.rs | |
parent | c75b423e461010c722503fb9792f84ebc415c7d8 (diff) | |
download | keks-meet-d3e3cfdaa53ae354b1de4f540426af3e6af8096d.tar keks-meet-d3e3cfdaa53ae354b1de4f540426af3e6af8096d.tar.bz2 keks-meet-d3e3cfdaa53ae354b1de4f540426af3e6af8096d.tar.zst |
reworking rift: part two
Diffstat (limited to 'client-native-rift/src/port.rs')
-rw-r--r-- | client-native-rift/src/port.rs | 66 |
1 files changed, 63 insertions, 3 deletions
diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs index ae56f2c..857623e 100644 --- a/client-native-rift/src/port.rs +++ b/client-native-rift/src/port.rs @@ -1,9 +1,12 @@ +use crate::RequestHandler; use bytes::Bytes; -use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource}; +use libkeks::{ + peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource, +}; use log::{debug, error, info, warn}; -use std::{pin::Pin, sync::Arc}; +use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ - io::{AsyncRead, AsyncReadExt}, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, net::TcpStream, sync::RwLock, }; @@ -97,3 +100,60 @@ impl LocalResource for PortExposer { }) } } + +pub struct ForwardHandler { + pub stream: Arc<RwLock<Option<TcpStream>>>, +} +impl RequestHandler for ForwardHandler { + fn on_connect( + &self, + _resource: ProvideInfo, + channel: Arc<RTCDataChannel>, + ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>> { + let stream = self.stream.clone(); + Box::pin(async move { + let stream = stream.write().await.take().unwrap(); + let (mut read, write) = stream.into_split(); + let write = Arc::new(RwLock::new(write)); + + let channel2 = channel.clone(); + channel.on_open(Box::new(move || { + Box::pin(async move { + info!("channel open"); + let channel = channel2.clone(); + tokio::task::spawn(async move { + let mut buf = [0u8; 1 << 15]; + loop { + let Ok(size) = read.read(&mut buf).await else { + break; + }; + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }); + }) + })); + channel.on_close(Box::new(move || { + Box::pin(async move { + info!("channel closed"); + }) + })); + channel.on_error(Box::new(move |err| { + Box::pin(async move { error!("channel error: {err}") }) + })); + { + let write = write.clone(); + channel.on_message(Box::new(move |message| { + let write = write.clone(); + Box::pin(async move { + write.write().await.write_all(&message.data).await.unwrap(); + }) + })); + } + + Ok(()) + }) + } +} |