diff options
author | metamuffin <metamuffin@disroot.org> | 2022-10-15 12:37:28 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-10-15 12:37:28 +0200 |
commit | 77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 (patch) | |
tree | 5028a357c4cae08824d1d402c6561121be531329 /client-native-rift | |
parent | d081461dd7fe2a6db94b196324bc485c10a77c7a (diff) | |
download | keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.bz2 keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.zst |
sending files works
Diffstat (limited to 'client-native-rift')
-rw-r--r-- | client-native-rift/src/main.rs | 123 |
1 files changed, 108 insertions, 15 deletions
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index a3fb4e4..48dac0b 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -5,19 +5,18 @@ */ #![feature(box_syntax)] -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use clap::{Parser, Subcommand}; use client_native_lib::{ instance::Instance, peer::{Peer, TransportChannel}, - protocol::ProvideInfo, - webrtc::data_channel::RTCDataChannel, + protocol::{ProvideInfo, RelayMessage}, Config, DynFut, EventHandler, LocalResource, }; use humansize::DECIMAL; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use std::{ - future::Future, + os::unix::prelude::MetadataExt, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, @@ -25,7 +24,7 @@ use std::{ }, }; use tokio::{ - fs::File, + fs::{self, File}, io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::RwLock, }; @@ -72,6 +71,27 @@ async fn run() { ) .await; + match &args.action { + Action::Send { filename } => { + inst.add_local_resource(box FileSender { + info: ProvideInfo { + id: "the-file".to_string(), // we only share a single file so its fine + kind: "file".to_string(), + track_kind: None, + label: Some(filename.clone().unwrap_or("stdin".to_string())), + size: if let Some(filename) = &filename { + Some(fs::metadata(filename).await.unwrap().size() as usize) + } else { + None + }, + }, + reader_factory: args.action, + }) + .await; + } + _ => (), + } + inst.spawn_ping().await; inst.receive_loop().await; @@ -89,7 +109,7 @@ impl EventHandler for Handler { Box::pin(async move {}) } - fn peer_leave(&self, peer: Arc<Peer>) -> client_native_lib::DynFut<()> { + fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> { Box::pin(async move {}) } fn resource_added( @@ -101,22 +121,22 @@ impl EventHandler for Handler { let args = self.args.clone(); Box::pin(async move { match &args.action { - Action::Send { filename } => {} - Action::Receive { filename } => { + Action::Receive { .. } => { if info.kind == "file" { peer.request_resource(id).await; } } + _ => (), } }) } - fn resource_removed(&self, peer: Arc<Peer>, id: String) -> DynFut<()> { + fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> { Box::pin(async {}) } fn resource_connected( &self, - peer: Arc<Peer>, + _peer: Arc<Peer>, resource: &ProvideInfo, channel: TransportChannel, ) -> client_native_lib::DynFut<()> { @@ -129,7 +149,7 @@ impl EventHandler for Handler { if resource.kind != "file" { return error!("we got a non-file resource for some reason…"); } - let mut pos = Arc::new(AtomicUsize::new(0)); + let pos = Arc::new(AtomicUsize::new(0)); let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = Arc::new(RwLock::new(None)); { @@ -163,7 +183,7 @@ impl EventHandler for Handler { Box::pin(async move { let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); info!( - "{:?} bytes of data ({} of {})", + "recv {:?} ({} of {})", mesg.data.len(), humansize::format_size(pos, DECIMAL), humansize::format_size(resource.size.unwrap_or(0), DECIMAL), @@ -228,15 +248,88 @@ impl Action { } struct FileSender { + reader_factory: Action, //TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>, info: ProvideInfo, } + impl LocalResource for FileSender { - fn info(&self) -> client_native_lib::protocol::ProvideInfo { + fn info(&self) -> ProvideInfo { self.info.clone() } fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> { - Box::pin(async move {}) + let id = self.info().id.clone(); + let reader_factory = self.reader_factory.clone(); + Box::pin(async move { + let channel = peer + .peer_connection + .create_data_channel(&id, None) + .await + .unwrap(); + let pos = Arc::new(AtomicUsize::new(0)); + let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = + Arc::new(RwLock::new(None)); + { + let reader = reader.clone(); + let reader_factory = reader_factory.clone(); + channel + .on_open(box move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel open"); + *reader.write().await = Some(reader_factory.create_reader().await); + }) + }) + .await; + } + { + let reader = reader.clone(); + channel + .on_close(box move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel closed"); + *reader.write().await = None; + }) + }) + .await; + } + { + let reader = reader.clone(); + let channel2 = channel.clone(); + channel + .on_buffered_amount_low(box move || { + let reader = reader.clone(); + let channel = channel2.clone(); + Box::pin(async move { + debug!("buffered amount low"); + let mut buf = [0u8; 1 << 15]; + let size = reader + .write() + .await + .as_mut() + .unwrap() + .read(&mut buf) + .await + .unwrap(); + if size == 0 { + info!("reached EOF, closing channel"); + channel.close().await.unwrap(); + } else { + debug!("sending {size} bytes"); + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }) + }) + .await; + } + channel + .on_error(box move |err| Box::pin(async move { error!("channel error: {err}") })) + .await; + }) } } |