summaryrefslogtreecommitdiff
path: root/client-native-rift/src/main.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-10-15 12:37:28 +0200
committermetamuffin <metamuffin@disroot.org>2022-10-15 12:37:28 +0200
commit77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 (patch)
tree5028a357c4cae08824d1d402c6561121be531329 /client-native-rift/src/main.rs
parentd081461dd7fe2a6db94b196324bc485c10a77c7a (diff)
downloadkeks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar
keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.bz2
keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.zst
sending files works
Diffstat (limited to 'client-native-rift/src/main.rs')
-rw-r--r--client-native-rift/src/main.rs123
1 files changed, 108 insertions, 15 deletions
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index a3fb4e4..48dac0b 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -5,19 +5,18 @@
*/
#![feature(box_syntax)]
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
use clap::{Parser, Subcommand};
use client_native_lib::{
instance::Instance,
peer::{Peer, TransportChannel},
- protocol::ProvideInfo,
- webrtc::data_channel::RTCDataChannel,
+ protocol::{ProvideInfo, RelayMessage},
Config, DynFut, EventHandler, LocalResource,
};
use humansize::DECIMAL;
-use log::{error, info, warn};
+use log::{debug, error, info, warn};
use std::{
- future::Future,
+ os::unix::prelude::MetadataExt,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
@@ -25,7 +24,7 @@ use std::{
},
};
use tokio::{
- fs::File,
+ fs::{self, File},
io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
@@ -72,6 +71,27 @@ async fn run() {
)
.await;
+ match &args.action {
+ Action::Send { filename } => {
+ inst.add_local_resource(box FileSender {
+ info: ProvideInfo {
+ id: "the-file".to_string(), // we only share a single file so its fine
+ kind: "file".to_string(),
+ track_kind: None,
+ label: Some(filename.clone().unwrap_or("stdin".to_string())),
+ size: if let Some(filename) = &filename {
+ Some(fs::metadata(filename).await.unwrap().size() as usize)
+ } else {
+ None
+ },
+ },
+ reader_factory: args.action,
+ })
+ .await;
+ }
+ _ => (),
+ }
+
inst.spawn_ping().await;
inst.receive_loop().await;
@@ -89,7 +109,7 @@ impl EventHandler for Handler {
Box::pin(async move {})
}
- fn peer_leave(&self, peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
+ fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
Box::pin(async move {})
}
fn resource_added(
@@ -101,22 +121,22 @@ impl EventHandler for Handler {
let args = self.args.clone();
Box::pin(async move {
match &args.action {
- Action::Send { filename } => {}
- Action::Receive { filename } => {
+ Action::Receive { .. } => {
if info.kind == "file" {
peer.request_resource(id).await;
}
}
+ _ => (),
}
})
}
- fn resource_removed(&self, peer: Arc<Peer>, id: String) -> DynFut<()> {
+ fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> {
Box::pin(async {})
}
fn resource_connected(
&self,
- peer: Arc<Peer>,
+ _peer: Arc<Peer>,
resource: &ProvideInfo,
channel: TransportChannel,
) -> client_native_lib::DynFut<()> {
@@ -129,7 +149,7 @@ impl EventHandler for Handler {
if resource.kind != "file" {
return error!("we got a non-file resource for some reason…");
}
- let mut pos = Arc::new(AtomicUsize::new(0));
+ let pos = Arc::new(AtomicUsize::new(0));
let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
Arc::new(RwLock::new(None));
{
@@ -163,7 +183,7 @@ impl EventHandler for Handler {
Box::pin(async move {
let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed);
info!(
- "{:?} bytes of data ({} of {})",
+ "recv {:?} ({} of {})",
mesg.data.len(),
humansize::format_size(pos, DECIMAL),
humansize::format_size(resource.size.unwrap_or(0), DECIMAL),
@@ -228,15 +248,88 @@ impl Action {
}
struct FileSender {
+ reader_factory: Action, //TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>,
info: ProvideInfo,
}
+
impl LocalResource for FileSender {
- fn info(&self) -> client_native_lib::protocol::ProvideInfo {
+ fn info(&self) -> ProvideInfo {
self.info.clone()
}
fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
- Box::pin(async move {})
+ let id = self.info().id.clone();
+ let reader_factory = self.reader_factory.clone();
+ Box::pin(async move {
+ let channel = peer
+ .peer_connection
+ .create_data_channel(&id, None)
+ .await
+ .unwrap();
+ let pos = Arc::new(AtomicUsize::new(0));
+ let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
+ Arc::new(RwLock::new(None));
+ {
+ let reader = reader.clone();
+ let reader_factory = reader_factory.clone();
+ channel
+ .on_open(box move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel open");
+ *reader.write().await = Some(reader_factory.create_reader().await);
+ })
+ })
+ .await;
+ }
+ {
+ let reader = reader.clone();
+ channel
+ .on_close(box move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel closed");
+ *reader.write().await = None;
+ })
+ })
+ .await;
+ }
+ {
+ let reader = reader.clone();
+ let channel2 = channel.clone();
+ channel
+ .on_buffered_amount_low(box move || {
+ let reader = reader.clone();
+ let channel = channel2.clone();
+ Box::pin(async move {
+ debug!("buffered amount low");
+ let mut buf = [0u8; 1 << 15];
+ let size = reader
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .read(&mut buf)
+ .await
+ .unwrap();
+ if size == 0 {
+ info!("reached EOF, closing channel");
+ channel.close().await.unwrap();
+ } else {
+ debug!("sending {size} bytes");
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ })
+ })
+ .await;
+ }
+ channel
+ .on_error(box move |err| Box::pin(async move { error!("channel error: {err}") }))
+ .await;
+ })
}
}