diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-14 19:27:49 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-14 19:27:49 +0200 |
commit | 970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a (patch) | |
tree | 470389d27d1a0149655e89b06eaf0aba02faf9b4 | |
parent | 401ee1336f83a9172b0cc4231b382c6a099bb66c (diff) | |
download | keks-meet-970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a.tar keks-meet-970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a.tar.bz2 keks-meet-970a6cde6ac1d5431f9fc96e98944f0e7b4fd90a.tar.zst |
data channel working
-rw-r--r-- | client-native-rift/src/main.rs | 182 | ||||
-rw-r--r-- | client-native-rift/src/peer.rs | 168 | ||||
-rw-r--r-- | client-native-rift/src/state.rs | 77 |
3 files changed, 255 insertions, 172 deletions
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index fbc8387..f2b79e2 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -1,36 +1,24 @@ #![feature(async_closure)] #![feature(box_syntax)] -use crate::protocol::{RTCSessionDescriptionInit, RelayMessageWrapper, ServerboundPacket}; use clap::{Parser, Subcommand}; -use crypto::Key; -use futures_util::{SinkExt, StreamExt}; -use log::{debug, info}; -use protocol::{ClientboundPacket, RelayMessage}; +use log::{debug, error}; use signaling::signaling_connect; -use std::{ - collections::HashMap, - sync::{Arc, Weak}, - time::Duration, -}; -use tokio::sync::{mpsc::UnboundedSender, Mutex, RwLock}; +use state::State; +use std::sync::Arc; +use tokio::sync::RwLock; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, - API, }, - data_channel::data_channel_message::DataChannelMessage, - ice_transport::ice_server::RTCIceServer, interceptor::registry::Registry, - peer_connection::{ - configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, - RTCPeerConnection, - }, }; pub mod crypto; +pub mod peer; pub mod protocol; pub mod signaling; +pub mod state; fn main() { env_logger::init_from_env("LOG"); @@ -42,7 +30,7 @@ fn main() { } #[derive(Parser)] -struct Args { +pub struct Args { #[clap(long, default_value = "meet.metamuffin.org")] signaling_host: String, #[clap(short, long)] @@ -51,7 +39,7 @@ struct Args { action: Action, } #[derive(Subcommand)] -enum Action { +pub enum Action { Send {}, Receive {}, } @@ -92,156 +80,6 @@ async fn run() { }); } - tokio::time::sleep(Duration::from_secs(10000)).await; - - // // Wait for the answer to be pasted - // let line = signal::must_read_stdin().unwrap(); - // let desc_data = signal::decode(line.as_str()).unwrap(); - // let answer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?; - - // // Apply the answer as the remote description - // peer_connection.set_remote_description(answer).await?; -} - -pub struct State { - args: Args, - api: API, - key: Key, - my_id: RwLock<Option<usize>>, - sender: UnboundedSender<ServerboundPacket>, - peers: RwLock<HashMap<usize, Arc<Peer>>>, -} -impl State { - pub async fn my_id(&self) -> usize { - self.my_id.read().await.expect("not initialized yet") - } - - pub async fn on_message(&self, packet: ClientboundPacket) { - match packet { - protocol::ClientboundPacket::Init { - your_id, - version: _, - } => { - *self.my_id.write().await = Some(your_id); - } - protocol::ClientboundPacket::ClientJoin { id } => { - if id == self.my_id().await { - // we joined - YAY! - } else { - if let Action::Send { .. } = &self.args.action {} - } - } - protocol::ClientboundPacket::ClientLeave { id: _ } => {} - protocol::ClientboundPacket::Message { sender, message } => { - let message = self.key.decrypt(&message); - let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); - self.on_relay(sender, p.inner).await; - } - } - } - - pub async fn on_relay(&self, sender: usize, p: RelayMessage) {} - - pub async fn send_relay(&self, receipient: usize, inner: RelayMessage) { - self.sender - .send(ServerboundPacket::Relay { - recipient: Some(0), - message: self.key.encrypt( - &serde_json::to_string(&RelayMessageWrapper { - sender: self.my_id.read().await.expect("not ready to relay yet.."), - inner, - }) - .unwrap(), - ), - }) - .unwrap() - } -} - -pub struct Peer { - state: Arc<State>, // maybe use Weak later - peer_connection: RTCPeerConnection, - id: usize, -} - -impl Peer { - pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> { - let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec!["stun:metamuffin.org:16900".to_owned()], - ..Default::default() - }], - ..Default::default() - }; - - let peer_connection = state.api.new_peer_connection(config).await.unwrap(); - - let peer = Arc::new(Self { - peer_connection, - id, - state: state.clone(), - }); - peer.peer_connection - .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - println!("conn state: {s}"); - Box::pin(async {}) - })) - .await; - - { - let peer2 = peer.clone(); - peer.peer_connection - .on_negotiation_needed(box move || { - let peer = peer2.clone(); - Box::pin(async { peer.on_negotiation_needed().await }) - }) - .await; - } - peer - } - - pub async fn send_relay(&self, inner: RelayMessage) { - self.state.send_relay(self.id, inner).await - } - - pub async fn start_transfer(&self) { - info!("starting data channel"); - let data_channel = self - .peer_connection - .create_data_channel("file-transfer", None) - .await - .unwrap(); - - data_channel - .on_message(Box::new(move |msg: DataChannelMessage| { - let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); - println!("message! '{}'", msg_str); - Box::pin(async {}) - })) - .await; - } - - pub async fn on_relay(&self, p: RelayMessage) { - match p { - protocol::RelayMessage::Offer(o) => todo!(), - protocol::RelayMessage::Answer(a) => todo!(), - protocol::RelayMessage::IceCandidate(c) => { - self.peer_connection.add_ice_candidate(c).await.unwrap(); - } - } - } - - pub async fn on_negotiation_needed(self: Arc<Self>) { - info!("({}) negotiation needed", self.id); - let offer = self.peer_connection.create_offer(None).await.unwrap(); - self.peer_connection - .set_local_description(offer.clone()) - .await - .unwrap(); - self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit { - sdp: offer.sdp, - ty: offer.sdp_type, - })) - .await - } + tokio::signal::ctrl_c().await.unwrap(); + error!("interrupt received, exiting"); } diff --git a/client-native-rift/src/peer.rs b/client-native-rift/src/peer.rs new file mode 100644 index 0000000..22acffd --- /dev/null +++ b/client-native-rift/src/peer.rs @@ -0,0 +1,168 @@ +use std::sync::Arc; + +use log::info; +use webrtc::{ + data_channel::data_channel_message::DataChannelMessage, + ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer}, + peer_connection::{ + configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, + sdp::session_description::RTCSessionDescription, RTCPeerConnection, + }, +}; + +use crate::{ + protocol::{self, RTCSessionDescriptionInit, RelayMessage}, + state::State, + Action, +}; + +pub struct Peer { + state: Arc<State>, // maybe use Weak later + peer_connection: RTCPeerConnection, + id: usize, +} + +impl Peer { + pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> { + info!("({id}) peer joined"); + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:metamuffin.org:16900".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + let peer_connection = state.api.new_peer_connection(config).await.unwrap(); + + let peer = Arc::new(Self { + peer_connection, + id, + state: state.clone(), + }); + peer.peer_connection + .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + println!("conn state: {s}"); + Box::pin(async {}) + })) + .await; + + { + let peer2 = peer.clone(); + peer.peer_connection + .on_ice_candidate(box move |c| { + let peer = peer2.clone(); + Box::pin(async move { + if let Some(c) = c { + peer.on_ice_candidate(c).await + } + }) + }) + .await; + } + + { + let peer2 = peer.clone(); + peer.peer_connection + .on_negotiation_needed(box move || { + let peer = peer2.clone(); + Box::pin(async { peer.on_negotiation_needed().await }) + }) + .await; + } + + if let Action::Send { .. } = &peer.state.args.action { + peer.start_transfer().await + } + + peer + } + + pub async fn send_relay(&self, inner: RelayMessage) { + self.state.send_relay(self.id, inner).await + } + + pub async fn start_transfer(&self) { + info!("starting data channel"); + let data_channel = self + .peer_connection + .create_data_channel("file-transfer", None) + .await + .unwrap(); + + data_channel + .on_message(Box::new(move |msg: DataChannelMessage| { + let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); + println!("message! '{}'", msg_str); + Box::pin(async {}) + })) + .await; + } + + pub async fn on_relay(&self, p: RelayMessage) { + match p { + protocol::RelayMessage::Offer(o) => self.on_offer(o).await, + protocol::RelayMessage::Answer(a) => self.on_answer(a).await, + protocol::RelayMessage::IceCandidate(c) => { + info!("received ICE candidate"); + self.peer_connection.add_ice_candidate(c).await.unwrap(); + } + } + } + + pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) { + self.send_relay(RelayMessage::IceCandidate( + candidate.to_json().await.unwrap(), + )) + .await; + } + + pub async fn on_negotiation_needed(self: Arc<Self>) { + info!("({}) negotiation needed", self.id); + self.offer().await + } + + pub async fn offer(&self) { + info!("sending offer"); + let offer = self.peer_connection.create_offer(None).await.unwrap(); + self.peer_connection + .set_local_description(offer.clone()) + .await + .unwrap(); + self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit { + sdp: offer.sdp, + ty: offer.sdp_type, + })) + .await + } + pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) { + info!("received offer"); + let offer = RTCSessionDescription::offer(offer.sdp).unwrap(); + self.peer_connection + .set_remote_description(offer) + .await + .unwrap(); + self.answer().await + } + pub async fn answer(&self) { + info!("sending answer"); + let offer = self.peer_connection.create_answer(None).await.unwrap(); + self.peer_connection + .set_local_description(offer.clone()) + .await + .unwrap(); + self.send_relay(protocol::RelayMessage::Answer(RTCSessionDescriptionInit { + sdp: offer.sdp, + ty: offer.sdp_type, + })) + .await + } + pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) { + info!("received answer"); + let offer = RTCSessionDescription::answer(answer.sdp).unwrap(); + self.peer_connection + .set_remote_description(offer) + .await + .unwrap(); + } +} diff --git a/client-native-rift/src/state.rs b/client-native-rift/src/state.rs new file mode 100644 index 0000000..baea90a --- /dev/null +++ b/client-native-rift/src/state.rs @@ -0,0 +1,77 @@ +use std::{collections::HashMap, sync::Arc}; + +use log::warn; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; +use webrtc::api::API; + +use crate::{ + crypto::Key, + peer::Peer, + protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, + Action, Args, +}; + +pub struct State { + pub args: Args, + pub api: API, + pub key: Key, + pub my_id: RwLock<Option<usize>>, + pub sender: UnboundedSender<ServerboundPacket>, + pub peers: RwLock<HashMap<usize, Arc<Peer>>>, +} +impl State { + pub async fn my_id(&self) -> usize { + self.my_id.read().await.expect("not initialized yet") + } + + pub async fn on_message(self: Arc<Self>, packet: ClientboundPacket) { + match packet { + protocol::ClientboundPacket::Init { + your_id, + version: _, + } => { + *self.my_id.write().await = Some(your_id); + } + protocol::ClientboundPacket::ClientJoin { id } => { + if id == self.my_id().await { + // we joined - YAY! + } else { + self.peers + .write() + .await + .insert(id, Peer::create(self.clone(), id).await); + if let Action::Send { .. } = &self.args.action {} + } + } + protocol::ClientboundPacket::ClientLeave { id: _ } => {} + protocol::ClientboundPacket::Message { sender, message } => { + let message = self.key.decrypt(&message); + let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); + self.on_relay(sender, p.inner).await; + } + } + } + + pub async fn on_relay(&self, sender: usize, p: RelayMessage) { + if let Some(peer) = self.peers.read().await.get(&sender).cloned() { + peer.on_relay(p).await + } else { + warn!("got a packet from a non-existent peer") + } + } + + pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { + self.sender + .send(ServerboundPacket::Relay { + recipient: Some(recipient), + message: self.key.encrypt( + &serde_json::to_string(&RelayMessageWrapper { + sender: self.my_id.read().await.expect("not ready to relay yet.."), + inner, + }) + .unwrap(), + ), + }) + .unwrap() + } +} |