aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-10-15 12:37:28 +0200
committermetamuffin <metamuffin@disroot.org>2022-10-15 12:37:28 +0200
commit77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5 (patch)
tree5028a357c4cae08824d1d402c6561121be531329
parentd081461dd7fe2a6db94b196324bc485c10a77c7a (diff)
downloadkeks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar
keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.bz2
keks-meet-77f6b0d44c2d653cfcdb511a9f91f11aab81ddc5.tar.zst
sending files works
-rw-r--r--client-native-lib/src/instance.rs38
-rw-r--r--client-native-lib/src/lib.rs7
-rw-r--r--client-native-lib/src/peer.rs40
-rw-r--r--client-native-lib/src/signaling.rs18
-rw-r--r--client-native-rift/src/main.rs123
5 files changed, 175 insertions, 51 deletions
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs
index 162241d..6303ba7 100644
--- a/client-native-lib/src/instance.rs
+++ b/client-native-lib/src/instance.rs
@@ -13,7 +13,7 @@ use crate::{
};
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use webrtc::api::API;
@@ -22,8 +22,8 @@ pub struct Instance {
pub conn: SignalingConnection,
pub config: Config,
pub api: API,
- pub key: Key,
- local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
+ key: Key,
+ pub local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
my_id: RwLock<Option<usize>>,
pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
}
@@ -48,7 +48,9 @@ impl Instance {
let blub = self.clone();
tokio::spawn(async move {
loop {
- blub.ping();
+ blub.ping().await;
+ debug!("ping");
+ tokio::time::sleep(Duration::from_secs(30)).await;
}
});
}
@@ -59,7 +61,8 @@ impl Instance {
.write()
.await
.send(ServerboundPacket::Ping)
- .await;
+ .await
+ .unwrap();
}
pub async fn my_id(&self) -> usize {
@@ -68,7 +71,6 @@ impl Instance {
pub async fn receive_loop(self: Arc<Self>) {
while let Some(packet) = self.conn.recv.write().await.next().await {
- debug!("{packet:?}");
let inst = self.clone();
inst.on_message(packet).await
}
@@ -86,10 +88,7 @@ impl Instance {
} else {
let peer = Peer::create(self.clone(), id).await;
self.peers.write().await.insert(id, peer.clone());
- peer.send_relay(RelayMessage::Identify {
- username: self.config.username.clone(),
- })
- .await;
+ peer.init_remote().await;
self.event_handler.peer_join(peer).await;
}
}
@@ -101,13 +100,18 @@ impl Instance {
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
- self.on_relay(sender, p.inner).await;
+ if p.sender == sender {
+ self.on_relay(sender, p.inner).await;
+ } else {
+ warn!("dropping packet with inconsistent sender")
+ }
}
}
}
pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
- if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
+ debug!("(relay) <- ({sender}) {p:?}");
+ if let Some(peer) = self.peers.read().await.get(&sender) {
peer.on_relay(p).await
} else {
warn!("got a packet from a non-existent peer")
@@ -115,6 +119,7 @@ impl Instance {
}
pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) {
+ debug!("(relay) -> ({recipient}) {inner:?}");
self.conn
.send
.write()
@@ -134,8 +139,8 @@ impl Instance {
}
pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) {
- for (pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::Provide(res.info()));
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::Provide(res.info())).await;
}
self.local_resources
.write()
@@ -144,8 +149,9 @@ impl Instance {
}
pub async fn remove_local_resource(&self, id: String) {
self.local_resources.write().await.remove(&id);
- for (pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::ProvideStop { id: id.clone() });
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::ProvideStop { id: id.clone() })
+ .await;
}
}
}
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index a10a20a..ed434eb 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -5,15 +5,12 @@
*/
#![feature(async_closure)]
#![feature(box_syntax)]
-#![feature(async_fn_in_trait)]
-
-use std::{pin::Pin, sync::Arc};
+// #![feature(async_fn_in_trait)]
use futures_util::Future;
-use instance::Instance;
use peer::{Peer, TransportChannel};
use protocol::ProvideInfo;
-use tokio::sync::RwLock;
+use std::{pin::Pin, sync::Arc};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index 36b1754..fa4de25 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -6,12 +6,12 @@
use crate::{
instance::Instance,
protocol::{self, ProvideInfo, RelayMessage, Sdp},
+ LocalResource,
};
use log::{info, warn};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use webrtc::{
- data::data_channel::DataChannel,
data_channel::RTCDataChannel,
ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
@@ -27,7 +27,7 @@ use webrtc::{
pub struct Peer {
pub inst: Arc<Instance>,
pub peer_connection: RTCPeerConnection,
- pub resources_provided: RwLock<HashMap<String, ProvideInfo>>,
+ pub remote_provided: RwLock<HashMap<String, ProvideInfo>>,
pub id: usize,
}
@@ -42,6 +42,7 @@ pub struct Peer {
// Connected(Arc<TransportChannel>),
// AwaitDisconnect,
// }
+
pub enum TransportChannel {
Track(Arc<TrackRemote>),
DataChannel(Arc<RTCDataChannel>),
@@ -60,7 +61,7 @@ impl Peer {
let peer_connection = inst.api.new_peer_connection(config).await.unwrap();
let peer = Arc::new(Self {
- resources_provided: Default::default(),
+ remote_provided: Default::default(),
inst: inst.clone(),
peer_connection,
id,
@@ -103,7 +104,7 @@ impl Peer {
let peer = weak.upgrade().unwrap();
Box::pin(async move {
if let Some(res) = peer
- .resources_provided
+ .remote_provided
.read()
.await
.get(&dc.label().to_string())
@@ -119,7 +120,7 @@ impl Peer {
.await;
} else {
warn!("got unassociated data channel; closed connection");
- dc.close().await;
+ dc.close().await.unwrap();
}
})
})
@@ -128,6 +129,16 @@ impl Peer {
peer
}
+ pub async fn init_remote(&self) {
+ self.send_relay(RelayMessage::Identify {
+ username: self.inst.config.username.clone(),
+ })
+ .await;
+ for res in self.inst.local_resources.read().await.values() {
+ self.send_relay(RelayMessage::Provide(res.info())).await;
+ }
+ }
+
pub async fn request_resource(&self, id: String) {
self.send_relay(RelayMessage::Request { id }).await;
}
@@ -139,7 +150,7 @@ impl Peer {
self.inst.send_relay(self.id, inner).await
}
- pub async fn on_relay(self: Arc<Self>, p: RelayMessage) {
+ pub async fn on_relay(self: &Arc<Self>, p: RelayMessage) {
match p {
RelayMessage::Offer(o) => self.on_offer(o).await,
RelayMessage::Answer(a) => self.on_answer(a).await,
@@ -149,7 +160,7 @@ impl Peer {
"remote resource provided: ({:?}) {:?} {:?}",
info.id, info.kind, info.label
);
- self.resources_provided
+ self.remote_provided
.write()
.await
.insert(info.id.clone(), info.clone());
@@ -160,13 +171,24 @@ impl Peer {
}
RelayMessage::ProvideStop { id } => {
info!("remote resource removed: ({:?}) ", id);
- self.resources_provided.write().await.remove(&id);
+ self.remote_provided.write().await.remove(&id);
self.inst
.event_handler
.resource_removed(self.clone(), id)
.await;
}
- _ => (),
+ RelayMessage::Chat(_) => (),
+ RelayMessage::Identify { username } => {
+ info!("peer {} is known as {username:?}", self.id)
+ }
+ RelayMessage::Request { id } => {
+ if let Some(res) = self.inst.local_resources.read().await.get(&id) {
+ res.on_request(self.clone()).await;
+ } else {
+ warn!("({}) requested unknown local resource", self.id)
+ }
+ }
+ RelayMessage::RequestStop { id } => {}
}
}
diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs
index 8f21d85..318ed7d 100644
--- a/client-native-lib/src/signaling.rs
+++ b/client-native-lib/src/signaling.rs
@@ -6,7 +6,7 @@
use crate::protocol::ClientboundPacket;
use crate::{crypto::hash, protocol::ServerboundPacket};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
-use log::{debug, error, info, warn};
+use log::{debug, error, info, trace};
use std::pin::Pin;
use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite::{self, Message};
@@ -37,7 +37,10 @@ impl SignalingConnection {
let (tx, rx): (_, _) = conn.split();
let tx = tx.with(async move |packet: ServerboundPacket| {
- debug!(" -> {packet:?}");
+ match packet {
+ ServerboundPacket::Relay { .. } => trace!(" -> {packet:?}"),
+ _ => debug!(" -> {packet:?}"),
+ }
Ok::<_, _>(Message::Text(
serde_json::to_string::<ServerboundPacket>(&packet).unwrap(),
))
@@ -46,9 +49,12 @@ impl SignalingConnection {
let rx = rx.filter_map(async move |mesg| match mesg {
Ok(mesg) => match mesg {
tungstenite::Message::Text(t) => {
- let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
- debug!("<- {p:?}");
- Some(p)
+ let packet: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
+ match packet {
+ ClientboundPacket::Message { .. } => trace!(" <- {packet:?}"),
+ _ => debug!(" <- {packet:?}"),
+ }
+ Some(packet)
}
tungstenite::Message::Close(e) => {
error!("ws closed {e:?}");
@@ -57,7 +63,7 @@ impl SignalingConnection {
_ => None,
},
Err(e) => {
- warn!("websocket error: {e}");
+ error!("websocket error: {e}");
None
}
});
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index a3fb4e4..48dac0b 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -5,19 +5,18 @@
*/
#![feature(box_syntax)]
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
use clap::{Parser, Subcommand};
use client_native_lib::{
instance::Instance,
peer::{Peer, TransportChannel},
- protocol::ProvideInfo,
- webrtc::data_channel::RTCDataChannel,
+ protocol::{ProvideInfo, RelayMessage},
Config, DynFut, EventHandler, LocalResource,
};
use humansize::DECIMAL;
-use log::{error, info, warn};
+use log::{debug, error, info, warn};
use std::{
- future::Future,
+ os::unix::prelude::MetadataExt,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
@@ -25,7 +24,7 @@ use std::{
},
};
use tokio::{
- fs::File,
+ fs::{self, File},
io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
@@ -72,6 +71,27 @@ async fn run() {
)
.await;
+ match &args.action {
+ Action::Send { filename } => {
+ inst.add_local_resource(box FileSender {
+ info: ProvideInfo {
+ id: "the-file".to_string(), // we only share a single file so its fine
+ kind: "file".to_string(),
+ track_kind: None,
+ label: Some(filename.clone().unwrap_or("stdin".to_string())),
+ size: if let Some(filename) = &filename {
+ Some(fs::metadata(filename).await.unwrap().size() as usize)
+ } else {
+ None
+ },
+ },
+ reader_factory: args.action,
+ })
+ .await;
+ }
+ _ => (),
+ }
+
inst.spawn_ping().await;
inst.receive_loop().await;
@@ -89,7 +109,7 @@ impl EventHandler for Handler {
Box::pin(async move {})
}
- fn peer_leave(&self, peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
+ fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
Box::pin(async move {})
}
fn resource_added(
@@ -101,22 +121,22 @@ impl EventHandler for Handler {
let args = self.args.clone();
Box::pin(async move {
match &args.action {
- Action::Send { filename } => {}
- Action::Receive { filename } => {
+ Action::Receive { .. } => {
if info.kind == "file" {
peer.request_resource(id).await;
}
}
+ _ => (),
}
})
}
- fn resource_removed(&self, peer: Arc<Peer>, id: String) -> DynFut<()> {
+ fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> {
Box::pin(async {})
}
fn resource_connected(
&self,
- peer: Arc<Peer>,
+ _peer: Arc<Peer>,
resource: &ProvideInfo,
channel: TransportChannel,
) -> client_native_lib::DynFut<()> {
@@ -129,7 +149,7 @@ impl EventHandler for Handler {
if resource.kind != "file" {
return error!("we got a non-file resource for some reason…");
}
- let mut pos = Arc::new(AtomicUsize::new(0));
+ let pos = Arc::new(AtomicUsize::new(0));
let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
Arc::new(RwLock::new(None));
{
@@ -163,7 +183,7 @@ impl EventHandler for Handler {
Box::pin(async move {
let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed);
info!(
- "{:?} bytes of data ({} of {})",
+ "recv {:?} ({} of {})",
mesg.data.len(),
humansize::format_size(pos, DECIMAL),
humansize::format_size(resource.size.unwrap_or(0), DECIMAL),
@@ -228,15 +248,88 @@ impl Action {
}
struct FileSender {
+ reader_factory: Action, //TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>,
info: ProvideInfo,
}
+
impl LocalResource for FileSender {
- fn info(&self) -> client_native_lib::protocol::ProvideInfo {
+ fn info(&self) -> ProvideInfo {
self.info.clone()
}
fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
- Box::pin(async move {})
+ let id = self.info().id.clone();
+ let reader_factory = self.reader_factory.clone();
+ Box::pin(async move {
+ let channel = peer
+ .peer_connection
+ .create_data_channel(&id, None)
+ .await
+ .unwrap();
+ let pos = Arc::new(AtomicUsize::new(0));
+ let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
+ Arc::new(RwLock::new(None));
+ {
+ let reader = reader.clone();
+ let reader_factory = reader_factory.clone();
+ channel
+ .on_open(box move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel open");
+ *reader.write().await = Some(reader_factory.create_reader().await);
+ })
+ })
+ .await;
+ }
+ {
+ let reader = reader.clone();
+ channel
+ .on_close(box move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel closed");
+ *reader.write().await = None;
+ })
+ })
+ .await;
+ }
+ {
+ let reader = reader.clone();
+ let channel2 = channel.clone();
+ channel
+ .on_buffered_amount_low(box move || {
+ let reader = reader.clone();
+ let channel = channel2.clone();
+ Box::pin(async move {
+ debug!("buffered amount low");
+ let mut buf = [0u8; 1 << 15];
+ let size = reader
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .read(&mut buf)
+ .await
+ .unwrap();
+ if size == 0 {
+ info!("reached EOF, closing channel");
+ channel.close().await.unwrap();
+ } else {
+ debug!("sending {size} bytes");
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ })
+ })
+ .await;
+ }
+ channel
+ .on_error(box move |err| Box::pin(async move { error!("channel error: {err}") }))
+ .await;
+ })
}
}