summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-15 19:08:08 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-15 19:08:08 +0200
commit1286385698c4d09876abf29cb5ed595f7cfe1a8f (patch)
tree3adbe53a1393be941c7459c802f7238c5d0e2d64
parent7c68bdb983c2f52fb09e1a5418e5bc44d3b44b30 (diff)
downloadkeks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar
keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.bz2
keks-meet-1286385698c4d09876abf29cb5ed595f7cfe1a8f.tar.zst
rift works.
-rw-r--r--Cargo.lock24
-rw-r--r--client-native-lib/src/peer.rs16
-rw-r--r--client-native-lib/src/signaling.rs60
-rw-r--r--client-native-lib/src/state.rs4
-rw-r--r--client-native-rift/Cargo.toml2
-rw-r--r--client-native-rift/src/main.rs176
6 files changed, 180 insertions, 102 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f517c07..755b855 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -443,18 +443,6 @@ dependencies = [
]
[[package]]
-name = "client-native-rift"
-version = "0.1.0"
-dependencies = [
- "bytes",
- "clap",
- "client-native-lib",
- "env_logger",
- "log",
- "tokio",
-]
-
-[[package]]
name = "const-oid"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1762,6 +1750,18 @@ dependencies = [
]
[[package]]
+name = "rift"
+version = "0.1.0"
+dependencies = [
+ "bytes",
+ "clap",
+ "client-native-lib",
+ "env_logger",
+ "log",
+ "tokio",
+]
+
+[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index 280cc06..d6ca308 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -48,10 +48,10 @@ impl Peer {
.await;
{
- let peer2 = peer.clone();
+ let weak = Arc::<Peer>::downgrade(&peer);
peer.peer_connection
.on_ice_candidate(box move |c| {
- let peer = peer2.clone();
+ let peer = weak.upgrade().unwrap();
Box::pin(async move {
if let Some(c) = c {
peer.on_ice_candidate(c).await
@@ -62,10 +62,10 @@ impl Peer {
}
{
- let peer2 = peer.clone();
+ let weak = Arc::<Peer>::downgrade(&peer);
peer.peer_connection
.on_negotiation_needed(box move || {
- let peer = peer2.clone();
+ let peer = weak.upgrade().unwrap();
Box::pin(async { peer.on_negotiation_needed().await })
})
.await;
@@ -114,7 +114,7 @@ impl Peer {
}
pub async fn offer(&self) {
- info!("sending offer");
+ info!("({}) sending offer", self.id);
let offer = self.peer_connection.create_offer(None).await.unwrap();
self.peer_connection
.set_local_description(offer.clone())
@@ -127,7 +127,7 @@ impl Peer {
.await
}
pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) {
- info!("received offer");
+ info!("({}) received offer", self.id);
let offer = RTCSessionDescription::offer(offer.sdp).unwrap();
self.peer_connection
.set_remote_description(offer)
@@ -136,7 +136,7 @@ impl Peer {
self.answer().await
}
pub async fn answer(&self) {
- info!("sending answer");
+ info!("({}) sending answer", self.id);
let offer = self.peer_connection.create_answer(None).await.unwrap();
self.peer_connection
.set_local_description(offer.clone())
@@ -149,7 +149,7 @@ impl Peer {
.await
}
pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) {
- info!("received answer");
+ info!("({}) received answer", self.id);
let offer = RTCSessionDescription::answer(answer.sdp).unwrap();
self.peer_connection
.set_remote_description(offer)
diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs
index 2ac3edc..ef49692 100644
--- a/client-native-lib/src/signaling.rs
+++ b/client-native-lib/src/signaling.rs
@@ -1,7 +1,9 @@
+use std::time::Duration;
+
use crate::protocol::ClientboundPacket;
use crate::{crypto::hash, protocol::ServerboundPacket};
use futures_util::{SinkExt, StreamExt};
-use log::{debug, info};
+use log::{debug, error, info, warn};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_tungstenite::tungstenite::{self, Message};
@@ -19,40 +21,52 @@ pub async fn signaling_connect(
.unwrap();
info!("connection established");
- let (mut tx, rx) = conn.split();
+ let (mut tx, mut rx) = conn.split();
let (in_tx, in_rx) = unbounded_channel();
let (out_tx, mut out_rx) = unbounded_channel();
- tokio::spawn(async {
- rx.for_each(move |mesg| {
- info!("packet in");
- let mesg = mesg.unwrap();
- match mesg {
- tungstenite::Message::Text(t) => {
- let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
- debug!("<- {p:?}");
- in_tx.send(p).unwrap()
- }
- tungstenite::Message::Close(_) => {
- eprintln!("ws closed :(");
- unreachable!();
- }
- _ => (),
- }
- Box::pin(async { () })
- })
- .await;
+ let ping_out_tx = out_tx.clone();
+ let ping_task = tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ ping_out_tx.send(ServerboundPacket::Ping).unwrap()
+ }
});
- tokio::spawn(async move {
+
+ let send_task = tokio::spawn(async move {
while let Some(p) = out_rx.recv().await {
debug!(" -> {p:?}");
tx.send(Message::Text(
serde_json::to_string::<ServerboundPacket>(&p).unwrap(),
))
.await
- .unwrap()
+ .unwrap();
+ }
+ });
+ let _receive_task = tokio::spawn(async move {
+ while let Some(mesg) = rx.next().await {
+ match mesg {
+ Ok(mesg) => match mesg {
+ tungstenite::Message::Text(t) => {
+ let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
+ debug!("<- {p:?}");
+ in_tx.send(p).unwrap()
+ }
+ tungstenite::Message::Close(e) => {
+ error!("ws closed :( {e:?}");
+ unreachable!();
+ }
+ _ => (),
+ },
+ Err(_) => {
+ send_task.abort();
+ ping_task.abort();
+ break;
+ }
+ }
}
+ warn!("recv task stopped");
});
(out_tx, in_rx)
diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs
index c5e9365..8501c3d 100644
--- a/client-native-lib/src/state.rs
+++ b/client-native-lib/src/state.rs
@@ -57,7 +57,9 @@ impl<P: HasPeer, I: PeerInit<P>> State<P, I> {
);
}
}
- protocol::ClientboundPacket::ClientLeave { id: _ } => {}
+ protocol::ClientboundPacket::ClientLeave { id } => {
+ self.peers.write().await.remove(&id);
+ }
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml
index 1693422..08a08bb 100644
--- a/client-native-rift/Cargo.toml
+++ b/client-native-rift/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "client-native-rift"
+name = "rift"
version = "0.1.0"
edition = "2021"
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 299a161..7aa7afe 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -12,7 +12,8 @@ use client_native_lib::{
use log::{error, info};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
- io::{stdin, stdout, AsyncReadExt, AsyncWriteExt},
+ fs::File,
+ io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
@@ -34,11 +35,6 @@ pub struct Args {
#[clap(subcommand)]
action: Action,
}
-#[derive(Subcommand)]
-pub enum Action {
- Send {},
- Receive {},
-}
async fn run() {
let args = Args::parse();
@@ -58,13 +54,45 @@ async fn run() {
error!("interrupt received, exiting");
}
+#[derive(Subcommand)]
+pub enum Action {
+ Send { filename: Option<String> },
+ Receive { filename: Option<String> },
+}
+
+impl Action {
+ pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> {
+ match self {
+ Action::Receive { filename } => {
+ if let Some(filename) = filename {
+ Box::pin(File::create(filename).await.unwrap())
+ } else {
+ Box::pin(stdout())
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+ pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
+ match self {
+ Action::Send { filename } => {
+ if let Some(filename) = filename {
+ Box::pin(File::open(filename).await.unwrap())
+ } else {
+ Box::pin(stdin())
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
pub struct Conn {
pub args: Arc<Args>,
}
pub struct PeerState {
args: Arc<Args>,
peer: Arc<Peer>,
- channel: RwLock<Option<Arc<RTCDataChannel>>>,
}
impl PeerInit<PeerState> for Conn {
@@ -74,11 +102,7 @@ impl PeerInit<PeerState> for Conn {
) -> Pin<Box<(dyn Future<Output = Arc<PeerState>> + Send + Sync + 'static)>> {
let args = self.args.clone();
Box::pin(async move {
- let p = Arc::new(PeerState {
- peer,
- args,
- channel: Default::default(),
- });
+ let p = Arc::new(PeerState { peer, args });
p.clone().init().await;
p
})
@@ -93,47 +117,72 @@ impl PeerState {
pub async fn init(self: Arc<Self>) {
let s = self.clone();
match &self.args.action {
- Action::Send {} => *s.channel.write().await = Some(self.init_send_channel().await),
- Action::Receive {} => {
+ Action::Send { .. } => self.init_send_channel().await,
+ Action::Receive { .. } => {
self.peer
.peer_connection
.on_data_channel(box move |ch| {
let s = s.clone();
- Box::pin(async move {
- *s.channel.write().await = Some(ch);
- s.init_receive_channel().await
- })
+ Box::pin(async move { s.init_receive_channel(ch).await })
})
.await;
}
}
}
- pub async fn init_receive_channel(self: Arc<Self>) {
+ pub async fn init_receive_channel(self: Arc<Self>, channel: Arc<RTCDataChannel>) {
info!("got a data channel");
- let ch = self.channel.read().await.as_ref().unwrap().clone();
- ch.on_open(box move || {
- info!("channel opened");
- Box::pin(async {})
- })
- .await;
- ch.on_close(box move || {
- info!("channel closed");
- Box::pin(async {})
- })
- .await;
- ch.on_error(box move |err| {
- info!("channel error: {err:?}");
- Box::pin(async {})
- })
- .await;
- ch.on_message(box move |mesg| {
- Box::pin(async move { stdout().write_all(&mesg.data).await.unwrap() })
- })
- .await;
+ let writer = Arc::new(RwLock::new(None));
+ {
+ let writer = writer.clone();
+ channel
+ .on_open(box move || {
+ info!("channel opened");
+ Box::pin(async move {
+ *writer.write().await = Some(self.args.action.create_writer().await);
+ })
+ })
+ .await;
+ }
+ {
+ let writer = writer.clone();
+ channel
+ .on_close(box move || {
+ info!("channel closed");
+ let writer = writer.clone();
+ Box::pin(async move {
+ *writer.write().await = None; // drop the writer, so it closes the file or whatever
+ })
+ })
+ .await;
+ }
+ {
+ let writer = writer.clone();
+ channel
+ .on_message(box move |mesg| {
+ let writer = writer.clone();
+ Box::pin(async move {
+ writer
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .write_all(&mesg.data)
+ .await
+ .unwrap();
+ })
+ })
+ .await;
+ }
+ channel
+ .on_error(box move |err| {
+ info!("channel error: {err:?}");
+ Box::pin(async {})
+ })
+ .await;
}
- pub async fn init_send_channel(&self) -> Arc<RTCDataChannel> {
+ pub async fn init_send_channel(&self) {
info!("creating data channel");
let data_channel = self
.peer
@@ -141,25 +190,38 @@ impl PeerState {
.create_data_channel("file-transfer", None)
.await
.unwrap();
- {
- let dc2 = data_channel.clone();
- data_channel
- .on_open(box move || {
- let data_channel = dc2.clone();
- Box::pin(async move {
- loop {
- let mut buf = [0u8; 1024];
- let size = stdin().read(&mut buf).await.unwrap();
- data_channel
- .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
- .await
- .unwrap();
+ let weak = Arc::downgrade(&data_channel);
+ let args = self.args.clone();
+ data_channel
+ .on_open(box move || {
+ let args = args.clone();
+ let data_channel = weak.upgrade().unwrap();
+ Box::pin(async move {
+ let mut reader = args.action.create_reader().await;
+ info!("starting transmission");
+ loop {
+ let mut buf = [0u8; 4096];
+ let size = reader.read(&mut buf).await.unwrap();
+ if size == 0 {
+ break;
}
- })
+ data_channel
+ .send(&Bytes::from_iter(buf[0..size].into_iter().map(|e| *e)))
+ .await
+ .unwrap();
+ }
+ info!("transmission finished");
+ drop(reader);
+ info!("now closing the channel again…");
+ data_channel.close().await.unwrap();
})
- .await;
- }
-
+ })
+ .await;
+ data_channel
+ .on_close(box || Box::pin(async move { info!("data channel closed") }))
+ .await;
data_channel
+ .on_error(box |err| Box::pin(async move { error!("data channel error: {err}") }))
+ .await;
}
}