diff options
author | metamuffin <metamuffin@disroot.org> | 2022-09-14 18:59:30 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-09-14 18:59:30 +0200 |
commit | 401ee1336f83a9172b0cc4231b382c6a099bb66c (patch) | |
tree | 324f946ac46ea00aa365ce4afd3a67acdb53d1aa /client-native-rift/src | |
parent | 473c7624c1419c6636addebb183dede5be88b061 (diff) | |
download | keks-meet-401ee1336f83a9172b0cc4231b382c6a099bb66c.tar keks-meet-401ee1336f83a9172b0cc4231b382c6a099bb66c.tar.bz2 keks-meet-401ee1336f83a9172b0cc4231b382c6a099bb66c.tar.zst |
just a in-between state
Diffstat (limited to 'client-native-rift/src')
-rw-r--r-- | client-native-rift/src/crypto.rs | 7 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 244 | ||||
-rw-r--r-- | client-native-rift/src/protocol.rs | 17 | ||||
-rw-r--r-- | client-native-rift/src/signaling.rs | 69 |
4 files changed, 253 insertions, 84 deletions
diff --git a/client-native-rift/src/crypto.rs b/client-native-rift/src/crypto.rs index 3f00c67..9bd8908 100644 --- a/client-native-rift/src/crypto.rs +++ b/client-native-rift/src/crypto.rs @@ -2,17 +2,20 @@ use aes_gcm::{ aead::{generic_array::sequence::GenericSequence, Aead}, Aes256Gcm, KeyInit, Nonce, }; +use log::info; pub struct Key(Aes256Gcm); impl Key { - pub fn derive(secret: String) -> Self { + pub fn derive(secret: &str) -> Self { + info!("running key generation... this might take some⢠time"); let salt = base64::decode("thisisagoodsaltAAAAAAA==").unwrap(); - let mut key = [0u8; 256]; + let mut key = [0u8; 32]; fastpbkdf2::pbkdf2_hmac_sha256(secret.as_bytes(), salt.as_slice(), 250000, &mut key); let key = Aes256Gcm::new_from_slice(key.as_slice()).unwrap(); + info!("done"); Self(key) } pub fn encrypt(&self, s: &str) -> String { diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index d9935e8..fbc8387 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -1,18 +1,30 @@ #![feature(async_closure)] +#![feature(box_syntax)] -use std::{sync::Arc, time::Duration}; - +use crate::protocol::{RTCSessionDescriptionInit, RelayMessageWrapper, ServerboundPacket}; +use clap::{Parser, Subcommand}; +use crypto::Key; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, info}; +use protocol::{ClientboundPacket, RelayMessage}; use signaling::signaling_connect; +use std::{ + collections::HashMap, + sync::{Arc, Weak}, + time::Duration, +}; +use tokio::sync::{mpsc::UnboundedSender, Mutex, RwLock}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, + API, }, data_channel::data_channel_message::DataChannelMessage, ice_transport::ice_server::RTCIceServer, interceptor::registry::Registry, peer_connection::{ - configuration::RTCConfiguration, math_rand_alpha, - peer_connection_state::RTCPeerConnectionState, + configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, + RTCPeerConnection, }, }; @@ -21,6 +33,7 @@ pub mod protocol; pub mod signaling; fn main() { + env_logger::init_from_env("LOG"); tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -28,8 +41,26 @@ fn main() { .block_on(run()) } +#[derive(Parser)] +struct Args { + #[clap(long, default_value = "meet.metamuffin.org")] + signaling_host: String, + #[clap(short, long)] + secret: String, + #[clap(subcommand)] + action: Action, +} +#[derive(Subcommand)] +enum Action { + Send {}, + Receive {}, +} + async fn run() { - let (send, recv) = signaling_connect("meet.metamuffin.org", "hunter2").await; + let args = Args::parse(); + let (sender, mut recv) = signaling_connect(&args.signaling_host, &args.secret).await; + + let key = crypto::Key::derive(&args.secret); let mut media_engine = MediaEngine::default(); media_engine.register_default_codecs().unwrap(); @@ -40,48 +71,28 @@ async fn run() { .with_interceptor_registry(registry) .build(); - let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec!["stun:metamuffin.org:16900".to_owned()], - ..Default::default() - }], - ..Default::default() - }; + let state = Arc::new(State { + peers: Default::default(), + key, + api, + my_id: RwLock::new(None), + sender, + args, + }); - let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap()); + { + let state = state.clone(); + tokio::spawn(async move { + debug!("receiving packets now"); + while let Some(packet) = recv.recv().await { + debug!("{packet:?}"); + let state = state.clone(); + state.on_message(packet).await + } + }); + } - let data_channel = peer_connection - .create_data_channel("data", None) - .await - .unwrap(); - - let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); - - peer_connection - .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - println!("conn state: {s}"); - Box::pin(async {}) - })) - .await; - - let d_label = data_channel.label().to_owned(); - data_channel - .on_message(Box::new(move |msg: DataChannelMessage| { - let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); - println!("Message from DataChannel '{}': '{}'", d_label, msg_str); - Box::pin(async {}) - })) - .await; - - let offer = peer_connection.create_offer(None).await.unwrap(); - peer_connection - .set_local_description(offer.clone()) - .await - .unwrap(); - - println!("{offer:?}"); - - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(10000)).await; // // Wait for the answer to be pasted // let line = signal::must_read_stdin().unwrap(); @@ -91,3 +102,146 @@ async fn run() { // // Apply the answer as the remote description // peer_connection.set_remote_description(answer).await?; } + +pub struct State { + args: Args, + api: API, + key: Key, + my_id: RwLock<Option<usize>>, + sender: UnboundedSender<ServerboundPacket>, + peers: RwLock<HashMap<usize, Arc<Peer>>>, +} +impl State { + pub async fn my_id(&self) -> usize { + self.my_id.read().await.expect("not initialized yet") + } + + pub async fn on_message(&self, packet: ClientboundPacket) { + match packet { + protocol::ClientboundPacket::Init { + your_id, + version: _, + } => { + *self.my_id.write().await = Some(your_id); + } + protocol::ClientboundPacket::ClientJoin { id } => { + if id == self.my_id().await { + // we joined - YAY! + } else { + if let Action::Send { .. } = &self.args.action {} + } + } + protocol::ClientboundPacket::ClientLeave { id: _ } => {} + protocol::ClientboundPacket::Message { sender, message } => { + let message = self.key.decrypt(&message); + let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap(); + self.on_relay(sender, p.inner).await; + } + } + } + + pub async fn on_relay(&self, sender: usize, p: RelayMessage) {} + + pub async fn send_relay(&self, receipient: usize, inner: RelayMessage) { + self.sender + .send(ServerboundPacket::Relay { + recipient: Some(0), + message: self.key.encrypt( + &serde_json::to_string(&RelayMessageWrapper { + sender: self.my_id.read().await.expect("not ready to relay yet.."), + inner, + }) + .unwrap(), + ), + }) + .unwrap() + } +} + +pub struct Peer { + state: Arc<State>, // maybe use Weak later + peer_connection: RTCPeerConnection, + id: usize, +} + +impl Peer { + pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> { + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:metamuffin.org:16900".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + let peer_connection = state.api.new_peer_connection(config).await.unwrap(); + + let peer = Arc::new(Self { + peer_connection, + id, + state: state.clone(), + }); + peer.peer_connection + .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + println!("conn state: {s}"); + Box::pin(async {}) + })) + .await; + + { + let peer2 = peer.clone(); + peer.peer_connection + .on_negotiation_needed(box move || { + let peer = peer2.clone(); + Box::pin(async { peer.on_negotiation_needed().await }) + }) + .await; + } + peer + } + + pub async fn send_relay(&self, inner: RelayMessage) { + self.state.send_relay(self.id, inner).await + } + + pub async fn start_transfer(&self) { + info!("starting data channel"); + let data_channel = self + .peer_connection + .create_data_channel("file-transfer", None) + .await + .unwrap(); + + data_channel + .on_message(Box::new(move |msg: DataChannelMessage| { + let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); + println!("message! '{}'", msg_str); + Box::pin(async {}) + })) + .await; + } + + pub async fn on_relay(&self, p: RelayMessage) { + match p { + protocol::RelayMessage::Offer(o) => todo!(), + protocol::RelayMessage::Answer(a) => todo!(), + protocol::RelayMessage::IceCandidate(c) => { + self.peer_connection.add_ice_candidate(c).await.unwrap(); + } + } + } + + pub async fn on_negotiation_needed(self: Arc<Self>) { + info!("({}) negotiation needed", self.id); + let offer = self.peer_connection.create_offer(None).await.unwrap(); + self.peer_connection + .set_local_description(offer.clone()) + .await + .unwrap(); + self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit { + sdp: offer.sdp, + ty: offer.sdp_type, + })) + .await + } +} diff --git a/client-native-rift/src/protocol.rs b/client-native-rift/src/protocol.rs index b3719d0..431dc42 100644 --- a/client-native-rift/src/protocol.rs +++ b/client-native-rift/src/protocol.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; -use webrtc::peer_connection::sdp::sdp_type::RTCSdpType; +use webrtc::{ + ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::sdp_type::RTCSdpType, +}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -22,8 +24,8 @@ pub enum ServerboundPacket { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RelayMessageWrapper { - sender: usize, // redundant, but ensures the server didnt cheat - inner: RelayMessage, + pub sender: usize, // redundant, but ensures the server didnt cheat + pub inner: RelayMessage, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -39,12 +41,3 @@ pub struct RTCSessionDescriptionInit { pub ty: RTCSdpType, pub sdp: String, } - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct RTCIceCandidateInit { - pub candidate: String, - pub sdp_m_line_index: Option<usize>, - pub sdp_mid: Option<String>, - pub username_fragment: Option<String>, -} diff --git a/client-native-rift/src/signaling.rs b/client-native-rift/src/signaling.rs index c61b982..2ac3edc 100644 --- a/client-native-rift/src/signaling.rs +++ b/client-native-rift/src/signaling.rs @@ -1,40 +1,59 @@ use crate::protocol::ClientboundPacket; use crate::{crypto::hash, protocol::ServerboundPacket}; use futures_util::{SinkExt, StreamExt}; +use log::{debug, info}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::tungstenite::{self, Message}; pub async fn signaling_connect( host: &str, secret: &str, ) -> ( - impl SinkExt<ServerboundPacket>, - impl StreamExt<Item = Option<ClientboundPacket>>, + UnboundedSender<ServerboundPacket>, + UnboundedReceiver<ClientboundPacket>, ) { - let (conn, _) = tokio_tungstenite::connect_async( - url::Url::parse(&format!("wss://{host}/signaling/{}", hash(secret))).unwrap(), - ) - .await - .unwrap(); + let uri = format!("wss://{host}/signaling/{}", hash(secret)); + info!("connecting to signaling server at {uri:?}"); + let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) + .await + .unwrap(); + info!("connection established"); - let (tx, rx) = conn.split(); - let prx = rx.map(|mesg| { - let mesg = mesg.unwrap(); - match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - Some(p) - } - tungstenite::Message::Close(_) => { - eprintln!("ws closed :("); - None + let (mut tx, rx) = conn.split(); + + let (in_tx, in_rx) = unbounded_channel(); + let (out_tx, mut out_rx) = unbounded_channel(); + + tokio::spawn(async { + rx.for_each(move |mesg| { + info!("packet in"); + let mesg = mesg.unwrap(); + match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + in_tx.send(p).unwrap() + } + tungstenite::Message::Close(_) => { + eprintln!("ws closed :("); + unreachable!(); + } + _ => (), } - _ => None, - } + Box::pin(async { () }) + }) + .await; }); - let ptx = tx.with(async move |p| { - Ok::<_, tokio_tungstenite::tungstenite::error::Error>(Message::Text( - serde_json::to_string::<ServerboundPacket>(&p).unwrap(), - )) + tokio::spawn(async move { + while let Some(p) = out_rx.recv().await { + debug!(" -> {p:?}"); + tx.send(Message::Text( + serde_json::to_string::<ServerboundPacket>(&p).unwrap(), + )) + .await + .unwrap() + } }); - (ptx, prx) + + (out_tx, in_rx) } |