1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
use bytes::Bytes;
use humansize::DECIMAL;
use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
use log::{debug, error, info};
use std::{
path::PathBuf,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt},
sync::RwLock,
};
pub struct FileSender {
pub path: Arc<PathBuf>,
pub info: ProvideInfo,
}
impl LocalResource for FileSender {
fn info(&self) -> ProvideInfo {
self.info.clone()
}
fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
let id = self.info().id.clone();
let total_size = self.info().size.unwrap_or(0);
let path = self.path.clone();
Box::pin(async move {
let channel = peer
.peer_connection
.create_data_channel(&id, None)
.await
.unwrap();
let pos = Arc::new(AtomicUsize::new(0));
let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
Arc::new(RwLock::new(None));
{
let reader = reader.clone();
let path = path.clone();
channel.on_open(Box::new(move || {
let reader = reader.clone();
Box::pin(async move {
info!("channel open");
*reader.write().await = Some(Box::pin(File::open(&*path).await.unwrap()));
})
}))
}
{
let reader = reader.clone();
channel.on_close(Box::new(move || {
let reader = reader.clone();
Box::pin(async move {
info!("channel closed");
*reader.write().await = None;
})
}))
}
{
let reader = reader.clone();
let pos = pos.clone();
let channel2 = channel.clone();
channel
.on_buffered_amount_low(Box::new(move || {
let pos = pos.clone();
let reader = reader.clone();
let channel = channel2.clone();
Box::pin(async move {
debug!("buffered amount low");
let mut buf = [0u8; 1 << 15];
let size = reader
.write()
.await
.as_mut()
.unwrap()
.read(&mut buf)
.await
.unwrap();
if size == 0 {
info!("reached EOF, closing channel");
let _ = channel.send_text("end").await;
channel.close().await.unwrap();
} else {
let progress_size = pos.fetch_add(size, Ordering::Relaxed);
info!(
"sending {size} bytes ({} of {})",
humansize::format_size(progress_size, DECIMAL),
humansize::format_size(total_size, DECIMAL),
);
channel
.send(&Bytes::copy_from_slice(&buf[..size]))
.await
.unwrap();
}
})
}))
.await;
channel.set_buffered_amount_low_threshold(1).await;
}
channel.on_error(Box::new(move |err| {
Box::pin(async move { error!("channel error: {err}") })
}))
})
}
}
|