summaryrefslogtreecommitdiff
path: root/client-native-rift/src/file.rs
blob: bfe32fda80580796fe5d25222688d78b31477ce1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use bytes::Bytes;
use humansize::DECIMAL;
use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
use log::{debug, error, info};
use std::{
    path::PathBuf,
    pin::Pin,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};
use tokio::{
    fs::File,
    io::{AsyncRead, AsyncReadExt},
    sync::RwLock,
};

pub struct FileSender {
    pub path: Arc<PathBuf>,
    pub info: ProvideInfo,
}

impl LocalResource for FileSender {
    fn info(&self) -> ProvideInfo {
        self.info.clone()
    }

    fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
        let id = self.info().id.clone();
        let total_size = self.info().size.unwrap_or(0);
        let path = self.path.clone();
        Box::pin(async move {
            let channel = peer
                .peer_connection
                .create_data_channel(&id, None)
                .await
                .unwrap();
            let pos = Arc::new(AtomicUsize::new(0));
            let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
                Arc::new(RwLock::new(None));
            {
                let reader = reader.clone();
                let path = path.clone();
                channel.on_open(Box::new(move || {
                    let reader = reader.clone();
                    Box::pin(async move {
                        info!("channel open");
                        *reader.write().await = Some(Box::pin(File::open(&*path).await.unwrap()));
                    })
                }))
            }
            {
                let reader = reader.clone();
                channel.on_close(Box::new(move || {
                    let reader = reader.clone();
                    Box::pin(async move {
                        info!("channel closed");
                        *reader.write().await = None;
                    })
                }))
            }
            {
                let reader = reader.clone();
                let pos = pos.clone();
                let channel2 = channel.clone();
                channel
                    .on_buffered_amount_low(Box::new(move || {
                        let pos = pos.clone();
                        let reader = reader.clone();
                        let channel = channel2.clone();
                        Box::pin(async move {
                            debug!("buffered amount low");
                            let mut buf = [0u8; 1 << 15];
                            let size = reader
                                .write()
                                .await
                                .as_mut()
                                .unwrap()
                                .read(&mut buf)
                                .await
                                .unwrap();
                            if size == 0 {
                                info!("reached EOF, closing channel");
                                let _ = channel.send_text("end").await;
                                channel.close().await.unwrap();
                            } else {
                                let progress_size = pos.fetch_add(size, Ordering::Relaxed);
                                info!(
                                    "sending {size} bytes ({} of {})",
                                    humansize::format_size(progress_size, DECIMAL),
                                    humansize::format_size(total_size, DECIMAL),
                                );
                                channel
                                    .send(&Bytes::copy_from_slice(&buf[..size]))
                                    .await
                                    .unwrap();
                            }
                        })
                    }))
                    .await;
                channel.set_buffered_amount_low_threshold(1).await;
            }
            channel.on_error(Box::new(move |err| {
                Box::pin(async move { error!("channel error: {err}") })
            }))
        })
    }
}