1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
use bytes::Bytes;
use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
use log::{debug, error, info, warn};
use std::{pin::Pin, sync::Arc};
use tokio::{
io::{AsyncRead, AsyncReadExt},
net::TcpStream,
sync::RwLock,
};
pub struct PortExposer {
pub port: u16,
pub info: ProvideInfo,
}
impl LocalResource for PortExposer {
fn info(&self) -> ProvideInfo {
self.info.clone()
}
fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
let id = self.info().id.clone();
let port = self.port;
Box::pin(async move {
let channel = peer
.peer_connection
.create_data_channel(&id, None)
.await
.unwrap();
let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
Arc::new(RwLock::new(None));
{
let reader = reader.clone();
let channel2 = channel.clone();
channel.on_open(Box::new(move || {
let reader = reader.clone();
Box::pin(async move {
info!("channel open");
match TcpStream::connect(("127.0.0.1", port)).await {
Ok(stream) => {
*reader.write().await = Some(Box::pin(stream));
}
Err(e) => {
warn!("upstream connect failed: {e}");
channel2.close().await.unwrap();
}
}
})
}))
}
{
let reader = reader.clone();
channel.on_close(Box::new(move || {
let reader = reader.clone();
Box::pin(async move {
info!("channel closed");
*reader.write().await = None;
})
}))
}
{
let reader = reader.clone();
let channel2 = channel.clone();
channel
.on_buffered_amount_low(Box::new(move || {
let reader = reader.clone();
let channel = channel2.clone();
Box::pin(async move {
debug!("buffered amount low");
let mut buf = [0u8; 1 << 15];
let size = reader
.write()
.await
.as_mut()
.unwrap()
.read(&mut buf)
.await
.unwrap();
if size == 0 {
info!("reached EOF, closing channel");
let _ = channel.send_text("end").await;
channel.close().await.unwrap();
} else {
channel
.send(&Bytes::copy_from_slice(&buf[..size]))
.await
.unwrap();
}
})
}))
.await;
channel.set_buffered_amount_low_threshold(1).await;
}
channel.on_error(Box::new(move |err| {
Box::pin(async move { error!("channel error: {err}") })
}))
})
}
}
|