summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-14 19:27:49 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-14 19:27:49 +0200
commit970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a (patch)
tree470389d27d1a0149655e89b06eaf0aba02faf9b4
parent401ee1336f83a9172b0cc4231b382c6a099bb66c (diff)
downloadkeks-meet-970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a.tar
keks-meet-970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a.tar.bz2
keks-meet-970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a.tar.zst
data channel working
-rw-r--r--client-native-rift/src/main.rs182
-rw-r--r--client-native-rift/src/peer.rs168
-rw-r--r--client-native-rift/src/state.rs77
3 files changed, 255 insertions, 172 deletions
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index fbc8387..f2b79e2 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -1,36 +1,24 @@
#![feature(async_closure)]
#![feature(box_syntax)]
-use crate::protocol::{RTCSessionDescriptionInit, RelayMessageWrapper, ServerboundPacket};
use clap::{Parser, Subcommand};
-use crypto::Key;
-use futures_util::{SinkExt, StreamExt};
-use log::{debug, info};
-use protocol::{ClientboundPacket, RelayMessage};
+use log::{debug, error};
use signaling::signaling_connect;
-use std::{
- collections::HashMap,
- sync::{Arc, Weak},
- time::Duration,
-};
-use tokio::sync::{mpsc::UnboundedSender, Mutex, RwLock};
+use state::State;
+use std::sync::Arc;
+use tokio::sync::RwLock;
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
- API,
},
- data_channel::data_channel_message::DataChannelMessage,
- ice_transport::ice_server::RTCIceServer,
interceptor::registry::Registry,
- peer_connection::{
- configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
- RTCPeerConnection,
- },
};
pub mod crypto;
+pub mod peer;
pub mod protocol;
pub mod signaling;
+pub mod state;
fn main() {
env_logger::init_from_env("LOG");
@@ -42,7 +30,7 @@ fn main() {
}
#[derive(Parser)]
-struct Args {
+pub struct Args {
#[clap(long, default_value = "meet.metamuffin.org")]
signaling_host: String,
#[clap(short, long)]
@@ -51,7 +39,7 @@ struct Args {
action: Action,
}
#[derive(Subcommand)]
-enum Action {
+pub enum Action {
Send {},
Receive {},
}
@@ -92,156 +80,6 @@ async fn run() {
});
}
- tokio::time::sleep(Duration::from_secs(10000)).await;
-
- // // Wait for the answer to be pasted
- // let line = signal::must_read_stdin().unwrap();
- // let desc_data = signal::decode(line.as_str()).unwrap();
- // let answer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
-
- // // Apply the answer as the remote description
- // peer_connection.set_remote_description(answer).await?;
-}
-
-pub struct State {
- args: Args,
- api: API,
- key: Key,
- my_id: RwLock<Option<usize>>,
- sender: UnboundedSender<ServerboundPacket>,
- peers: RwLock<HashMap<usize, Arc<Peer>>>,
-}
-impl State {
- pub async fn my_id(&self) -> usize {
- self.my_id.read().await.expect("not initialized yet")
- }
-
- pub async fn on_message(&self, packet: ClientboundPacket) {
- match packet {
- protocol::ClientboundPacket::Init {
- your_id,
- version: _,
- } => {
- *self.my_id.write().await = Some(your_id);
- }
- protocol::ClientboundPacket::ClientJoin { id } => {
- if id == self.my_id().await {
- // we joined - YAY!
- } else {
- if let Action::Send { .. } = &self.args.action {}
- }
- }
- protocol::ClientboundPacket::ClientLeave { id: _ } => {}
- protocol::ClientboundPacket::Message { sender, message } => {
- let message = self.key.decrypt(&message);
- let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
- self.on_relay(sender, p.inner).await;
- }
- }
- }
-
- pub async fn on_relay(&self, sender: usize, p: RelayMessage) {}
-
- pub async fn send_relay(&self, receipient: usize, inner: RelayMessage) {
- self.sender
- .send(ServerboundPacket::Relay {
- recipient: Some(0),
- message: self.key.encrypt(
- &serde_json::to_string(&RelayMessageWrapper {
- sender: self.my_id.read().await.expect("not ready to relay yet.."),
- inner,
- })
- .unwrap(),
- ),
- })
- .unwrap()
- }
-}
-
-pub struct Peer {
- state: Arc<State>, // maybe use Weak later
- peer_connection: RTCPeerConnection,
- id: usize,
-}
-
-impl Peer {
- pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> {
- let config = RTCConfiguration {
- ice_servers: vec![RTCIceServer {
- urls: vec!["stun:metamuffin.org:16900".to_owned()],
- ..Default::default()
- }],
- ..Default::default()
- };
-
- let peer_connection = state.api.new_peer_connection(config).await.unwrap();
-
- let peer = Arc::new(Self {
- peer_connection,
- id,
- state: state.clone(),
- });
- peer.peer_connection
- .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
- println!("conn state: {s}");
- Box::pin(async {})
- }))
- .await;
-
- {
- let peer2 = peer.clone();
- peer.peer_connection
- .on_negotiation_needed(box move || {
- let peer = peer2.clone();
- Box::pin(async { peer.on_negotiation_needed().await })
- })
- .await;
- }
- peer
- }
-
- pub async fn send_relay(&self, inner: RelayMessage) {
- self.state.send_relay(self.id, inner).await
- }
-
- pub async fn start_transfer(&self) {
- info!("starting data channel");
- let data_channel = self
- .peer_connection
- .create_data_channel("file-transfer", None)
- .await
- .unwrap();
-
- data_channel
- .on_message(Box::new(move |msg: DataChannelMessage| {
- let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
- println!("message! '{}'", msg_str);
- Box::pin(async {})
- }))
- .await;
- }
-
- pub async fn on_relay(&self, p: RelayMessage) {
- match p {
- protocol::RelayMessage::Offer(o) => todo!(),
- protocol::RelayMessage::Answer(a) => todo!(),
- protocol::RelayMessage::IceCandidate(c) => {
- self.peer_connection.add_ice_candidate(c).await.unwrap();
- }
- }
- }
-
- pub async fn on_negotiation_needed(self: Arc<Self>) {
- info!("({}) negotiation needed", self.id);
- let offer = self.peer_connection.create_offer(None).await.unwrap();
- self.peer_connection
- .set_local_description(offer.clone())
- .await
- .unwrap();
- self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit {
- sdp: offer.sdp,
- ty: offer.sdp_type,
- }))
- .await
- }
+ tokio::signal::ctrl_c().await.unwrap();
+ error!("interrupt received, exiting");
}
diff --git a/client-native-rift/src/peer.rs b/client-native-rift/src/peer.rs
new file mode 100644
index 0000000..22acffd
--- /dev/null
+++ b/client-native-rift/src/peer.rs
@@ -0,0 +1,168 @@
+use std::sync::Arc;
+
+use log::info;
+use webrtc::{
+ data_channel::data_channel_message::DataChannelMessage,
+ ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer},
+ peer_connection::{
+ configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
+ sdp::session_description::RTCSessionDescription, RTCPeerConnection,
+ },
+};
+
+use crate::{
+ protocol::{self, RTCSessionDescriptionInit, RelayMessage},
+ state::State,
+ Action,
+};
+
+pub struct Peer {
+ state: Arc<State>, // maybe use Weak later
+ peer_connection: RTCPeerConnection,
+ id: usize,
+}
+
+impl Peer {
+ pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> {
+ info!("({id}) peer joined");
+ let config = RTCConfiguration {
+ ice_servers: vec![RTCIceServer {
+ urls: vec!["stun:metamuffin.org:16900".to_owned()],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let peer_connection = state.api.new_peer_connection(config).await.unwrap();
+
+ let peer = Arc::new(Self {
+ peer_connection,
+ id,
+ state: state.clone(),
+ });
+ peer.peer_connection
+ .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
+ println!("conn state: {s}");
+ Box::pin(async {})
+ }))
+ .await;
+
+ {
+ let peer2 = peer.clone();
+ peer.peer_connection
+ .on_ice_candidate(box move |c| {
+ let peer = peer2.clone();
+ Box::pin(async move {
+ if let Some(c) = c {
+ peer.on_ice_candidate(c).await
+ }
+ })
+ })
+ .await;
+ }
+
+ {
+ let peer2 = peer.clone();
+ peer.peer_connection
+ .on_negotiation_needed(box move || {
+ let peer = peer2.clone();
+ Box::pin(async { peer.on_negotiation_needed().await })
+ })
+ .await;
+ }
+
+ if let Action::Send { .. } = &peer.state.args.action {
+ peer.start_transfer().await
+ }
+
+ peer
+ }
+
+ pub async fn send_relay(&self, inner: RelayMessage) {
+ self.state.send_relay(self.id, inner).await
+ }
+
+ pub async fn start_transfer(&self) {
+ info!("starting data channel");
+ let data_channel = self
+ .peer_connection
+ .create_data_channel("file-transfer", None)
+ .await
+ .unwrap();
+
+ data_channel
+ .on_message(Box::new(move |msg: DataChannelMessage| {
+ let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
+ println!("message! '{}'", msg_str);
+ Box::pin(async {})
+ }))
+ .await;
+ }
+
+ pub async fn on_relay(&self, p: RelayMessage) {
+ match p {
+ protocol::RelayMessage::Offer(o) => self.on_offer(o).await,
+ protocol::RelayMessage::Answer(a) => self.on_answer(a).await,
+ protocol::RelayMessage::IceCandidate(c) => {
+ info!("received ICE candidate");
+ self.peer_connection.add_ice_candidate(c).await.unwrap();
+ }
+ }
+ }
+
+ pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) {
+ self.send_relay(RelayMessage::IceCandidate(
+ candidate.to_json().await.unwrap(),
+ ))
+ .await;
+ }
+
+ pub async fn on_negotiation_needed(self: Arc<Self>) {
+ info!("({}) negotiation needed", self.id);
+ self.offer().await
+ }
+
+ pub async fn offer(&self) {
+ info!("sending offer");
+ let offer = self.peer_connection.create_offer(None).await.unwrap();
+ self.peer_connection
+ .set_local_description(offer.clone())
+ .await
+ .unwrap();
+ self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit {
+ sdp: offer.sdp,
+ ty: offer.sdp_type,
+ }))
+ .await
+ }
+ pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) {
+ info!("received offer");
+ let offer = RTCSessionDescription::offer(offer.sdp).unwrap();
+ self.peer_connection
+ .set_remote_description(offer)
+ .await
+ .unwrap();
+ self.answer().await
+ }
+ pub async fn answer(&self) {
+ info!("sending answer");
+ let offer = self.peer_connection.create_answer(None).await.unwrap();
+ self.peer_connection
+ .set_local_description(offer.clone())
+ .await
+ .unwrap();
+ self.send_relay(protocol::RelayMessage::Answer(RTCSessionDescriptionInit {
+ sdp: offer.sdp,
+ ty: offer.sdp_type,
+ }))
+ .await
+ }
+ pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) {
+ info!("received answer");
+ let offer = RTCSessionDescription::answer(answer.sdp).unwrap();
+ self.peer_connection
+ .set_remote_description(offer)
+ .await
+ .unwrap();
+ }
+}
diff --git a/client-native-rift/src/state.rs b/client-native-rift/src/state.rs
new file mode 100644
index 0000000..baea90a
--- /dev/null
+++ b/client-native-rift/src/state.rs
@@ -0,0 +1,77 @@
+use std::{collections::HashMap, sync::Arc};
+
+use log::warn;
+use tokio::sync::{mpsc::UnboundedSender, RwLock};
+use webrtc::api::API;
+
+use crate::{
+ crypto::Key,
+ peer::Peer,
+ protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket},
+ Action, Args,
+};
+
+pub struct State {
+ pub args: Args,
+ pub api: API,
+ pub key: Key,
+ pub my_id: RwLock<Option<usize>>,
+ pub sender: UnboundedSender<ServerboundPacket>,
+ pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
+}
+impl State {
+ pub async fn my_id(&self) -> usize {
+ self.my_id.read().await.expect("not initialized yet")
+ }
+
+ pub async fn on_message(self: Arc<Self>, packet: ClientboundPacket) {
+ match packet {
+ protocol::ClientboundPacket::Init {
+ your_id,
+ version: _,
+ } => {
+ *self.my_id.write().await = Some(your_id);
+ }
+ protocol::ClientboundPacket::ClientJoin { id } => {
+ if id == self.my_id().await {
+ // we joined - YAY!
+ } else {
+ self.peers
+ .write()
+ .await
+ .insert(id, Peer::create(self.clone(), id).await);
+ if let Action::Send { .. } = &self.args.action {}
+ }
+ }
+ protocol::ClientboundPacket::ClientLeave { id: _ } => {}
+ protocol::ClientboundPacket::Message { sender, message } => {
+ let message = self.key.decrypt(&message);
+ let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
+ self.on_relay(sender, p.inner).await;
+ }
+ }
+ }
+
+ pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
+ if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
+ peer.on_relay(p).await
+ } else {
+ warn!("got a packet from a non-existent peer")
+ }
+ }
+
+ pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) {
+ self.sender
+ .send(ServerboundPacket::Relay {
+ recipient: Some(recipient),
+ message: self.key.encrypt(
+ &serde_json::to_string(&RelayMessageWrapper {
+ sender: self.my_id.read().await.expect("not ready to relay yet.."),
+ inner,
+ })
+ .unwrap(),
+ ),
+ })
+ .unwrap()
+ }
+}