diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-15 19:08:08 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-15 19:08:08 +0200 |
commit | 1286385698c4d09876abf29cb5ed595f7cfe1a8f (patch) | |
tree | 3adbe53a1393be941c7459c802f7238c5d0e2d64 | |
parent | 7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30 (diff) | |
download | keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.bz2 keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.zst |
rift works.
-rw-r--r-- | Cargo.lock | 24 | ||||
-rw-r--r-- | client-native-lib/src/peer.rs | 16 | ||||
-rw-r--r-- | client-native-lib/src/signaling.rs | 60 | ||||
-rw-r--r-- | client-native-lib/src/state.rs | 4 | ||||
-rw-r--r-- | client-native-rift/Cargo.toml | 2 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 176 |
6 files changed, 180 insertions, 102 deletions
@@ -443,18 +443,6 @@ dependencies = [ ] [[package]] -name = "client-native-rift" -version = "0.1.0" -dependencies = [ - "bytes", - "clap", - "client-native-lib", - "env_logger", - "log", - "tokio", -] - -[[package]] name = "const-oid" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1762,6 +1750,18 @@ dependencies = [ ] [[package]] +name = "rift" +version = "0.1.0" +dependencies = [ + "bytes", + "clap", + "client-native-lib", + "env_logger", + "log", + "tokio", +] + +[[package]] name = "ring" version = "0.16.20" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs index 280cc06..d6ca308 100644 --- a/client-native-lib/src/peer.rs +++ b/client-native-lib/src/peer.rs @@ -48,10 +48,10 @@ impl Peer { .await; { - let peer2 = peer.clone(); + let weak = Arc::<Peer>::downgrade(&peer); peer.peer_connection .on_ice_candidate(box move |c| { - let peer = peer2.clone(); + let peer = weak.upgrade().unwrap(); Box::pin(async move { if let Some(c) = c { peer.on_ice_candidate(c).await @@ -62,10 +62,10 @@ impl Peer { } { - let peer2 = peer.clone(); + let weak = Arc::<Peer>::downgrade(&peer); peer.peer_connection .on_negotiation_needed(box move || { - let peer = peer2.clone(); + let peer = weak.upgrade().unwrap(); Box::pin(async { peer.on_negotiation_needed().await }) }) .await; @@ -114,7 +114,7 @@ impl Peer { } pub async fn offer(&self) { - info!("sending offer"); + info!("({}) sending offer", self.id); let offer = self.peer_connection.create_offer(None).await.unwrap(); self.peer_connection .set_local_description(offer.clone()) @@ -127,7 +127,7 @@ impl Peer { .await } pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) { - info!("received offer"); + info!("({}) received offer", self.id); let offer = RTCSessionDescription::offer(offer.sdp).unwrap(); self.peer_connection .set_remote_description(offer) @@ -136,7 +136,7 @@ impl Peer { self.answer().await } pub async fn answer(&self) { - info!("sending answer"); + info!("({}) sending answer", self.id); let offer = self.peer_connection.create_answer(None).await.unwrap(); self.peer_connection .set_local_description(offer.clone()) @@ -149,7 +149,7 @@ impl Peer { .await } pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) { - info!("received answer"); + info!("({}) received answer", self.id); let offer = RTCSessionDescription::answer(answer.sdp).unwrap(); self.peer_connection .set_remote_description(offer) diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs index 2ac3edc..ef49692 100644 --- a/client-native-lib/src/signaling.rs +++ b/client-native-lib/src/signaling.rs @@ -1,7 +1,9 @@ +use std::time::Duration; + use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{SinkExt, StreamExt}; -use log::{debug, info}; +use log::{debug, error, info, warn}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::tungstenite::{self, Message}; @@ -19,40 +21,52 @@ pub async fn signaling_connect( .unwrap(); info!("connection established"); - let (mut tx, rx) = conn.split(); + let (mut tx, mut rx) = conn.split(); let (in_tx, in_rx) = unbounded_channel(); let (out_tx, mut out_rx) = unbounded_channel(); - tokio::spawn(async { - rx.for_each(move |mesg| { - info!("packet in"); - let mesg = mesg.unwrap(); - match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - in_tx.send(p).unwrap() - } - tungstenite::Message::Close(_) => { - eprintln!("ws closed :("); - unreachable!(); - } - _ => (), - } - Box::pin(async { () }) - }) - .await; + let ping_out_tx = out_tx.clone(); + let ping_task = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + ping_out_tx.send(ServerboundPacket::Ping).unwrap() + } }); - tokio::spawn(async move { + + let send_task = tokio::spawn(async move { while let Some(p) = out_rx.recv().await { debug!(" -> {p:?}"); tx.send(Message::Text( serde_json::to_string::<ServerboundPacket>(&p).unwrap(), )) .await - .unwrap() + .unwrap(); + } + }); + let _receive_task = tokio::spawn(async move { + while let Some(mesg) = rx.next().await { + match mesg { + Ok(mesg) => match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + in_tx.send(p).unwrap() + } + tungstenite::Message::Close(e) => { + error!("ws closed :( {e:?}"); + unreachable!(); + } + _ => (), + }, + Err(_) => { + send_task.abort(); + ping_task.abort(); + break; + } + } } + warn!("recv task stopped"); }); (out_tx, in_rx) diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs index c5e9365..8501c3d 100644 --- a/client-native-lib/src/state.rs +++ b/client-native-lib/src/state.rs @@ -57,7 +57,9 @@ impl<P: HasPeer, I: PeerInit<P>> State<P, I> { ); } } - protocol::ClientboundPacket::ClientLeave { id: _ } => {} + protocol::ClientboundPacket::ClientLeave { id } => { + self.peers.write().await.remove(&id); + } protocol::ClientboundPacket::Message { sender, message } => { let message = self.key.decrypt(&message); let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml index 1693422..08a08bb 100644 --- a/client-native-rift/Cargo.toml +++ b/client-native-rift/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "client-native-rift" +name = "rift" version = "0.1.0" edition = "2021" diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 299a161..7aa7afe 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -12,7 +12,8 @@ use client_native_lib::{ use log::{error, info}; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ - io::{stdin, stdout, AsyncReadExt, AsyncWriteExt}, + fs::File, + io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::RwLock, }; @@ -34,11 +35,6 @@ pub struct Args { #[clap(subcommand)] action: Action, } -#[derive(Subcommand)] -pub enum Action { - Send {}, - Receive {}, -} async fn run() { let args = Args::parse(); @@ -58,13 +54,45 @@ async fn run() { error!("interrupt received, exiting"); } +#[derive(Subcommand)] +pub enum Action { + Send { filename: Option<String> }, + Receive { filename: Option<String> }, +} + +impl Action { + pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { + match self { + Action::Receive { filename } => { + if let Some(filename) = filename { + Box::pin(File::create(filename).await.unwrap()) + } else { + Box::pin(stdout()) + } + } + _ => unreachable!(), + } + } + pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { + match self { + Action::Send { filename } => { + if let Some(filename) = filename { + Box::pin(File::open(filename).await.unwrap()) + } else { + Box::pin(stdin()) + } + } + _ => unreachable!(), + } + } +} + pub struct Conn { pub args: Arc<Args>, } pub struct PeerState { args: Arc<Args>, peer: Arc<Peer>, - channel: RwLock<Option<Arc<RTCDataChannel>>>, } impl PeerInit<PeerState> for Conn { @@ -74,11 +102,7 @@ impl PeerInit<PeerState> for Conn { ) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> { let args = self.args.clone(); Box::pin(async move { - let p = Arc::new(PeerState { - peer, - args, - channel: Default::default(), - }); + let p = Arc::new(PeerState { peer, args }); p.clone().init().await; p }) @@ -93,47 +117,72 @@ impl PeerState { pub async fn init(self: Arc<Self>) { let s = self.clone(); match &self.args.action { - Action::Send {} => *s.channel.write().await = Some(self.init_send_channel().await), - Action::Receive {} => { + Action::Send { .. } => self.init_send_channel().await, + Action::Receive { .. } => { self.peer .peer_connection .on_data_channel(box move |ch| { let s = s.clone(); - Box::pin(async move { - *s.channel.write().await = Some(ch); - s.init_receive_channel().await - }) + Box::pin(async move { s.init_receive_channel(ch).await }) }) .await; } } } - pub async fn init_receive_channel(self: Arc<Self>) { + pub async fn init_receive_channel(self: Arc<Self>, channel: Arc<RTCDataChannel>) { info!("got a data channel"); - let ch = self.channel.read().await.as_ref().unwrap().clone(); - ch.on_open(box move || { - info!("channel opened"); - Box::pin(async {}) - }) - .await; - ch.on_close(box move || { - info!("channel closed"); - Box::pin(async {}) - }) - .await; - ch.on_error(box move |err| { - info!("channel error: {err:?}"); - Box::pin(async {}) - }) - .await; - ch.on_message(box move |mesg| { - Box::pin(async move { stdout().write_all(&mesg.data).await.unwrap() }) - }) - .await; + let writer = Arc::new(RwLock::new(None)); + { + let writer = writer.clone(); + channel + .on_open(box move || { + info!("channel opened"); + Box::pin(async move { + *writer.write().await = Some(self.args.action.create_writer().await); + }) + }) + .await; + } + { + let writer = writer.clone(); + channel + .on_close(box move || { + info!("channel closed"); + let writer = writer.clone(); + Box::pin(async move { + *writer.write().await = None; // drop the writer, so it closes the file or whatever + }) + }) + .await; + } + { + let writer = writer.clone(); + channel + .on_message(box move |mesg| { + let writer = writer.clone(); + Box::pin(async move { + writer + .write() + .await + .as_mut() + .unwrap() + .write_all(&mesg.data) + .await + .unwrap(); + }) + }) + .await; + } + channel + .on_error(box move |err| { + info!("channel error: {err:?}"); + Box::pin(async {}) + }) + .await; } - pub async fn init_send_channel(&self) -> Arc<RTCDataChannel> { + pub async fn init_send_channel(&self) { info!("creating data channel"); let data_channel = self .peer @@ -141,25 +190,38 @@ impl PeerState { .create_data_channel("file-transfer", None) .await .unwrap(); - { - let dc2 = data_channel.clone(); - data_channel - .on_open(box move || { - let data_channel = dc2.clone(); - Box::pin(async move { - loop { - let mut buf = [0u8; 1024]; - let size = stdin().read(&mut buf).await.unwrap(); - data_channel - .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e))) - .await - .unwrap(); + let weak = Arc::downgrade(&data_channel); + let args = self.args.clone(); + data_channel + .on_open(box move || { + let args = args.clone(); + let data_channel = weak.upgrade().unwrap(); + Box::pin(async move { + let mut reader = args.action.create_reader().await; + info!("starting transmission"); + loop { + let mut buf = [0u8; 4096]; + let size = reader.read(&mut buf).await.unwrap(); + if size == 0 { + break; } - }) + data_channel + .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e))) + .await + .unwrap(); + } + info!("transmission finished"); + drop(reader); + info!("now closing the channel again…"); + data_channel.close().await.unwrap(); }) - .await; - } - + }) + .await; + data_channel + .on_close(box || Box::pin(async move { info!("data channel closed") })) + .await; data_channel + .on_error(box |err| Box::pin(async move { error!("data channel error: {err}") })) + .await; } } |