diff options
author | metamuffin <metamuffin@disroot.org> | 2022-10-15 12:37:28 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-10-15 12:37:28 +0200 |
commit | 77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 (patch) | |
tree | 5028a357c4cae08824d1d402c6561121be531329 | |
parent | d081461dd7fe2a6db94b196324bc485c10a77c7a (diff) | |
download | keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.bz2 keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.zst |
sending files works
-rw-r--r-- | client-native-lib/src/instance.rs | 38 | ||||
-rw-r--r-- | client-native-lib/src/lib.rs | 7 | ||||
-rw-r--r-- | client-native-lib/src/peer.rs | 40 | ||||
-rw-r--r-- | client-native-lib/src/signaling.rs | 18 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 123 |
5 files changed, 175 insertions, 51 deletions
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs index 162241d..6303ba7 100644 --- a/client-native-lib/src/instance.rs +++ b/client-native-lib/src/instance.rs @@ -13,7 +13,7 @@ use crate::{ }; use futures_util::{SinkExt, StreamExt}; use log::{debug, info, warn}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::RwLock; use webrtc::api::API; @@ -22,8 +22,8 @@ pub struct Instance { pub conn: SignalingConnection, pub config: Config, pub api: API, - pub key: Key, - local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>, + key: Key, + pub local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>, my_id: RwLock<Option<usize>>, pub peers: RwLock<HashMap<usize, Arc<Peer>>>, } @@ -48,7 +48,9 @@ impl Instance { let blub = self.clone(); tokio::spawn(async move { loop { - blub.ping(); + blub.ping().await; + debug!("ping"); + tokio::time::sleep(Duration::from_secs(30)).await; } }); } @@ -59,7 +61,8 @@ impl Instance { .write() .await .send(ServerboundPacket::Ping) - .await; + .await + .unwrap(); } pub async fn my_id(&self) -> usize { @@ -68,7 +71,6 @@ impl Instance { pub async fn receive_loop(self: Arc<Self>) { while let Some(packet) = self.conn.recv.write().await.next().await { - debug!("{packet:?}"); let inst = self.clone(); inst.on_message(packet).await } @@ -86,10 +88,7 @@ impl Instance { } else { let peer = Peer::create(self.clone(), id).await; self.peers.write().await.insert(id, peer.clone()); - peer.send_relay(RelayMessage::Identify { - username: self.config.username.clone(), - }) - .await; + peer.init_remote().await; self.event_handler.peer_join(peer).await; } } @@ -101,13 +100,18 @@ impl Instance { protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); - self.on_relay(sender, p.inner).await; + if p.sender == sender { + self.on_relay(sender, p.inner).await; + } else { + warn!("dropping packet with inconsistent sender") + } } } } pub async fn on_relay(&self, sender: usize, p: RelayMessage) { - if let Some(peer) = self.peers.read().await.get(&sender).cloned() { + debug!("(relay) <- ({sender}) {p:?}"); + if let Some(peer) = self.peers.read().await.get(&sender) { peer.on_relay(p).await } else { warn!("got a packet from a non-existent peer") @@ -115,6 +119,7 @@ impl Instance { } pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { + debug!("(relay) -> ({recipient}) {inner:?}"); self.conn .send .write() @@ -134,8 +139,8 @@ impl Instance { } pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) { - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::Provide(res.info())); + for (_pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::Provide(res.info())).await; } self.local_resources .write() @@ -144,8 +149,9 @@ impl Instance { } pub async fn remove_local_resource(&self, id: String) { self.local_resources.write().await.remove(&id); - for (pid, peer) in self.peers.read().await.iter() { - peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }); + for (_pid, peer) in self.peers.read().await.iter() { + peer.send_relay(RelayMessage::ProvideStop { id: id.clone() }) + .await; } } } diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs index a10a20a..ed434eb 100644 --- a/client-native-lib/src/lib.rs +++ b/client-native-lib/src/lib.rs @@ -5,15 +5,12 @@ */ #![feature(async_closure)] #![feature(box_syntax)] -#![feature(async_fn_in_trait)] - -use std::{pin::Pin, sync::Arc}; +// #![feature(async_fn_in_trait)] use futures_util::Future; -use instance::Instance; use peer::{Peer, TransportChannel}; use protocol::ProvideInfo; -use tokio::sync::RwLock; +use std::{pin::Pin, sync::Arc}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index 36b1754..fa4de25 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -6,12 +6,12 @@ use crate::{ instance::Instance, protocol::{self, ProvideInfo, RelayMessage, Sdp}, + LocalResource, }; use log::{info, warn}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use webrtc::{ - data::data_channel::DataChannel, data_channel::RTCDataChannel, ice_transport::{ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, @@ -27,7 +27,7 @@ use webrtc::{ pub struct Peer { pub inst: Arc<Instance>, pub peer_connection: RTCPeerConnection, - pub resources_provided: RwLock<HashMap<String, ProvideInfo>>, + pub remote_provided: RwLock<HashMap<String, ProvideInfo>>, pub id: usize, } @@ -42,6 +42,7 @@ pub struct Peer { // Connected(Arc<TransportChannel>), // AwaitDisconnect, // } + pub enum TransportChannel { Track(Arc<TrackRemote>), DataChannel(Arc<RTCDataChannel>), @@ -60,7 +61,7 @@ impl Peer { let peer_connection = inst.api.new_peer_connection(config).await.unwrap(); let peer = Arc::new(Self { - resources_provided: Default::default(), + remote_provided: Default::default(), inst: inst.clone(), peer_connection, id, @@ -103,7 +104,7 @@ impl Peer { let peer = weak.upgrade().unwrap(); Box::pin(async move { if let Some(res) = peer - .resources_provided + .remote_provided .read() .await .get(&dc.label().to_string()) @@ -119,7 +120,7 @@ impl Peer { .await; } else { warn!("got unassociated data channel; closed connection"); - dc.close().await; + dc.close().await.unwrap(); } }) }) @@ -128,6 +129,16 @@ impl Peer { peer } + pub async fn init_remote(&self) { + self.send_relay(RelayMessage::Identify { + username: self.inst.config.username.clone(), + }) + .await; + for res in self.inst.local_resources.read().await.values() { + self.send_relay(RelayMessage::Provide(res.info())).await; + } + } + pub async fn request_resource(&self, id: String) { self.send_relay(RelayMessage::Request { id }).await; } @@ -139,7 +150,7 @@ impl Peer { self.inst.send_relay(self.id, inner).await } - pub async fn on_relay(self: Arc<Self>, p: RelayMessage) { + pub async fn on_relay(self: &Arc<Self>, p: RelayMessage) { match p { RelayMessage::Offer(o) => self.on_offer(o).await, RelayMessage::Answer(a) => self.on_answer(a).await, @@ -149,7 +160,7 @@ impl Peer { "remote resource provided: ({:?}) {:?} {:?}", info.id, info.kind, info.label ); - self.resources_provided + self.remote_provided .write() .await .insert(info.id.clone(), info.clone()); @@ -160,13 +171,24 @@ impl Peer { } RelayMessage::ProvideStop { id } => { info!("remote resource removed: ({:?}) ", id); - self.resources_provided.write().await.remove(&id); + self.remote_provided.write().await.remove(&id); self.inst .event_handler .resource_removed(self.clone(), id) .await; } - _ => (), + RelayMessage::Chat(_) => (), + RelayMessage::Identify { username } => { + info!("peer {} is known as {username:?}", self.id) + } + RelayMessage::Request { id } => { + if let Some(res) = self.inst.local_resources.read().await.get(&id) { + res.on_request(self.clone()).await; + } else { + warn!("({}) requested unknown local resource", self.id) + } + } + RelayMessage::RequestStop { id } => {} } } diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index 8f21d85..318ed7d 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -6,7 +6,7 @@ use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{Sink, SinkExt, Stream, StreamExt}; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace}; use std::pin::Pin; use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::{self, Message}; @@ -37,7 +37,10 @@ impl SignalingConnection { let (tx, rx): (_, _) = conn.split(); let tx = tx.with(async move |packet: ServerboundPacket| { - debug!(" -> {packet:?}"); + match packet { + ServerboundPacket::Relay { .. } => trace!(" -> {packet:?}"), + _ => debug!(" -> {packet:?}"), + } Ok::<_, _>(Message::Text( serde_json::to_string::<ServerboundPacket>(&packet).unwrap(), )) @@ -46,9 +49,12 @@ impl SignalingConnection { let rx = rx.filter_map(async move |mesg| match mesg { Ok(mesg) => match mesg { tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - Some(p) + let packet: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + match packet { + ClientboundPacket::Message { .. } => trace!(" <- {packet:?}"), + _ => debug!(" <- {packet:?}"), + } + Some(packet) } tungstenite::Message::Close(e) => { error!("ws closed {e:?}"); @@ -57,7 +63,7 @@ impl SignalingConnection { _ => None, }, Err(e) => { - warn!("websocket error: {e}"); + error!("websocket error: {e}"); None } }); diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index a3fb4e4..48dac0b 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -5,19 +5,18 @@ */ #![feature(box_syntax)] -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use clap::{Parser, Subcommand}; use client_native_lib::{ instance::Instance, peer::{Peer, TransportChannel}, - protocol::ProvideInfo, - webrtc::data_channel::RTCDataChannel, + protocol::{ProvideInfo, RelayMessage}, Config, DynFut, EventHandler, LocalResource, }; use humansize::DECIMAL; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use std::{ - future::Future, + os::unix::prelude::MetadataExt, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, @@ -25,7 +24,7 @@ use std::{ }, }; use tokio::{ - fs::File, + fs::{self, File}, io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::RwLock, }; @@ -72,6 +71,27 @@ async fn run() { ) .await; + match &args.action { + Action::Send { filename } => { + inst.add_local_resource(box FileSender { + info: ProvideInfo { + id: "the-file".to_string(), // we only share a single file so its fine + kind: "file".to_string(), + track_kind: None, + label: Some(filename.clone().unwrap_or("stdin".to_string())), + size: if let Some(filename) = &filename { + Some(fs::metadata(filename).await.unwrap().size() as usize) + } else { + None + }, + }, + reader_factory: args.action, + }) + .await; + } + _ => (), + } + inst.spawn_ping().await; inst.receive_loop().await; @@ -89,7 +109,7 @@ impl EventHandler for Handler { Box::pin(async move {}) } - fn peer_leave(&self, peer: Arc<Peer>) -> client_native_lib::DynFut<()> { + fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> { Box::pin(async move {}) } fn resource_added( @@ -101,22 +121,22 @@ impl EventHandler for Handler { let args = self.args.clone(); Box::pin(async move { match &args.action { - Action::Send { filename } => {} - Action::Receive { filename } => { + Action::Receive { .. } => { if info.kind == "file" { peer.request_resource(id).await; } } + _ => (), } }) } - fn resource_removed(&self, peer: Arc<Peer>, id: String) -> DynFut<()> { + fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> { Box::pin(async {}) } fn resource_connected( &self, - peer: Arc<Peer>, + _peer: Arc<Peer>, resource: &ProvideInfo, channel: TransportChannel, ) -> client_native_lib::DynFut<()> { @@ -129,7 +149,7 @@ impl EventHandler for Handler { if resource.kind != "file" { return error!("we got a non-file resource for some reason…"); } - let mut pos = Arc::new(AtomicUsize::new(0)); + let pos = Arc::new(AtomicUsize::new(0)); let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = Arc::new(RwLock::new(None)); { @@ -163,7 +183,7 @@ impl EventHandler for Handler { Box::pin(async move { let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); info!( - "{:?} bytes of data ({} of {})", + "recv {:?} ({} of {})", mesg.data.len(), humansize::format_size(pos, DECIMAL), humansize::format_size(resource.size.unwrap_or(0), DECIMAL), @@ -228,15 +248,88 @@ impl Action { } struct FileSender { + reader_factory: Action, //TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>, info: ProvideInfo, } + impl LocalResource for FileSender { - fn info(&self) -> client_native_lib::protocol::ProvideInfo { + fn info(&self) -> ProvideInfo { self.info.clone() } fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> { - Box::pin(async move {}) + let id = self.info().id.clone(); + let reader_factory = self.reader_factory.clone(); + Box::pin(async move { + let channel = peer + .peer_connection + .create_data_channel(&id, None) + .await + .unwrap(); + let pos = Arc::new(AtomicUsize::new(0)); + let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = + Arc::new(RwLock::new(None)); + { + let reader = reader.clone(); + let reader_factory = reader_factory.clone(); + channel + .on_open(box move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel open"); + *reader.write().await = Some(reader_factory.create_reader().await); + }) + }) + .await; + } + { + let reader = reader.clone(); + channel + .on_close(box move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel closed"); + *reader.write().await = None; + }) + }) + .await; + } + { + let reader = reader.clone(); + let channel2 = channel.clone(); + channel + .on_buffered_amount_low(box move || { + let reader = reader.clone(); + let channel = channel2.clone(); + Box::pin(async move { + debug!("buffered amount low"); + let mut buf = [0u8; 1 << 15]; + let size = reader + .write() + .await + .as_mut() + .unwrap() + .read(&mut buf) + .await + .unwrap(); + if size == 0 { + info!("reached EOF, closing channel"); + channel.close().await.unwrap(); + } else { + debug!("sending {size} bytes"); + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }) + }) + .await; + } + channel + .on_error(box move |err| Box::pin(async move { error!("channel error: {err}") })) + .await; + }) } } |