From d3e3cfdaa53ae354b1de4f540426af3e6af8096d Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 18 Mar 2024 09:51:38 +0100 Subject: reworking rift: part two --- client-native-rift/src/file.rs | 112 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 3 deletions(-) (limited to 'client-native-rift/src/file.rs') diff --git a/client-native-rift/src/file.rs b/client-native-rift/src/file.rs index bfe32fd..93dde62 100644 --- a/client-native-rift/src/file.rs +++ b/client-native-rift/src/file.rs @@ -1,8 +1,12 @@ +use crate::RequestHandler; use bytes::Bytes; use humansize::DECIMAL; -use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource}; +use libkeks::{ + peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource, +}; use log::{debug, error, info}; use std::{ + future::Future, path::PathBuf, pin::Pin, sync::{ @@ -11,8 +15,8 @@ use std::{ }, }; use tokio::{ - fs::File, - io::{AsyncRead, AsyncReadExt}, + fs::{File, OpenOptions}, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::RwLock, }; @@ -107,3 +111,105 @@ impl LocalResource for FileSender { }) } } + +pub struct DownloadHandler { + pub path: Option, +} +impl RequestHandler for DownloadHandler { + fn on_connect( + &self, + resource: ProvideInfo, + channel: Arc, + ) -> Pin> + Send + Sync>> { + let path = self.path.clone().unwrap_or_else(|| { + resource + .label + .clone() + .unwrap_or("download".to_owned()) + .replace("/", "_") + .replace("..", "_") + .into() + }); + if path.exists() {} + Box::pin(async move { + let pos = Arc::new(AtomicUsize::new(0)); + let writer: Arc>>>> = + Arc::new(RwLock::new(None)); + { + let writer = writer.clone(); + let path = path.clone(); + let channel2 = channel.clone(); + channel.on_open(Box::new(move || { + let path = path.clone(); + let writer = writer.clone(); + Box::pin(async move { + info!("channel opened"); + match OpenOptions::new() + .write(true) + .read(false) + .create_new(true) + .open(path) + .await + { + Ok(file) => { + *writer.write().await = Some(Box::pin(file)); + } + Err(e) => { + error!("cannot write download: {e}"); + channel2.close().await.unwrap(); + } + } + }) + })); + } + { + let writer = writer.clone(); + channel.on_close(Box::new(move || { + let writer = writer.clone(); + Box::pin(async move { + info!("channel closed"); + *writer.write().await = None; + }) + })); + } + { + let writer = writer.clone(); + channel.on_message(Box::new(move |mesg| { + let writer = writer.clone(); + let pos = pos.clone(); + Box::pin(async move { + // TODO + if mesg.is_string { + let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); + if &s == "end" { + info!("transfer complete") + } + } else { + let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); + info!( + "recv {:?} ({} of {})", + mesg.data.len(), + humansize::format_size(pos, DECIMAL), + humansize::format_size(resource.size.unwrap_or(0), DECIMAL), + ); + writer + .write() + .await + .as_mut() + .unwrap() + .write_all(&mesg.data) + .await + .unwrap(); + } + }) + })) + } + channel.on_error(Box::new(move |err| { + Box::pin(async move { + error!("data channel errored: {err}"); + }) + })); + Ok(()) + }) + } +} -- cgit v1.2.3-70-g09d2