From c752fe962df841b0cb811b09f155568735e7380c Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 14 Sep 2022 20:11:23 +0200 Subject: rename crate --- Cargo.lock | 9 +- Cargo.toml | 2 +- client-native-lib/Cargo.toml | 27 +++++ client-native-lib/src/crypto.rs | 39 +++++++ client-native-lib/src/main.rs | 85 ++++++++++++++++ client-native-lib/src/peer.rs | 197 ++++++++++++++++++++++++++++++++++++ client-native-lib/src/protocol.rs | 43 ++++++++ client-native-lib/src/signaling.rs | 59 +++++++++++ client-native-lib/src/state.rs | 77 ++++++++++++++ client-native-rift/Cargo.toml | 22 +--- client-native-rift/src/crypto.rs | 39 ------- client-native-rift/src/main.rs | 84 +-------------- client-native-rift/src/peer.rs | 197 ------------------------------------ client-native-rift/src/protocol.rs | 43 -------- client-native-rift/src/signaling.rs | 59 ----------- client-native-rift/src/state.rs | 77 -------------- 16 files changed, 538 insertions(+), 521 deletions(-) create mode 100644 client-native-lib/Cargo.toml create mode 100644 client-native-lib/src/crypto.rs create mode 100644 client-native-lib/src/main.rs create mode 100644 client-native-lib/src/peer.rs create mode 100644 client-native-lib/src/protocol.rs create mode 100644 client-native-lib/src/signaling.rs create mode 100644 client-native-lib/src/state.rs delete mode 100644 client-native-rift/src/crypto.rs delete mode 100644 client-native-rift/src/peer.rs delete mode 100644 client-native-rift/src/protocol.rs delete mode 100644 client-native-rift/src/signaling.rs delete mode 100644 client-native-rift/src/state.rs diff --git a/Cargo.lock b/Cargo.lock index b3b3c3e..4829c6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,7 +422,7 @@ dependencies = [ ] [[package]] -name = "client-native-rift" +name = "client-native-lib" version = "0.1.0" dependencies = [ "aes-gcm 0.10.1", @@ -444,6 +444,13 @@ dependencies = [ "webrtc", ] +[[package]] +name = "client-native-rift" +version = "0.1.0" +dependencies = [ + "client-native-lib", +] + [[package]] name = "const-oid" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index fb9f30e..84edef0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["server", "client-native-rift"] +members = ["server", "client-native-rift", "client-native-lib"] diff --git a/client-native-lib/Cargo.toml b/client-native-lib/Cargo.toml new file mode 100644 index 0000000..0fa740b --- /dev/null +++ b/client-native-lib/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "client-native-lib" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.21", features = ["full"] } +futures-util = "0.3.24" + +webrtc = "0.5.0" +tokio-tungstenite = { version = "*", features = ["rustls-tls"] } +url = "2.3.1" + +serde = { version = "1.0.144", features = ["derive"] } +serde_json = "*" + +clap = { version = "3.2.21", features = ["derive"] } +env_logger = "0.8" +log = "0.4" + +fastpbkdf2 = "0.1.0" +aes-gcm = "0.10.1" +sha256 = "1.0.3" +rand = "0.8.5" +rand_chacha = "0.3.1" +base64 = "0.13.0" +bytes = "1.2.1" diff --git a/client-native-lib/src/crypto.rs b/client-native-lib/src/crypto.rs new file mode 100644 index 0000000..9bd8908 --- /dev/null +++ b/client-native-lib/src/crypto.rs @@ -0,0 +1,39 @@ +use aes_gcm::{ + aead::{generic_array::sequence::GenericSequence, Aead}, + Aes256Gcm, KeyInit, Nonce, +}; +use log::info; + +pub struct Key(Aes256Gcm); + +impl Key { + pub fn derive(secret: &str) -> Self { + info!("running key generation... this might take some™ time"); + let salt = base64::decode("thisisagoodsaltAAAAAAA==").unwrap(); + let mut key = [0u8; 32]; + fastpbkdf2::pbkdf2_hmac_sha256(secret.as_bytes(), salt.as_slice(), 250000, &mut key); + + let key = Aes256Gcm::new_from_slice(key.as_slice()).unwrap(); + + info!("done"); + Self(key) + } + pub fn encrypt(&self, s: &str) -> String { + let iv = Nonce::generate(|_| rand::random()); // TODO check if this is secure randomness + let ciphertext = self.0.encrypt(&iv, s.as_bytes()).unwrap(); + let mut packet = iv.to_vec(); // TODO this could be doing less allocations + packet.extend(ciphertext); + base64::encode(packet) + } + pub fn decrypt(&self, s: &str) -> String { + let r = base64::decode(s).unwrap(); + let iv = &r[0..12]; + let ciphertext = &r[12..]; + let plaintext = self.0.decrypt(Nonce::from_slice(iv), ciphertext).unwrap(); + String::from_utf8(plaintext).unwrap() + } +} + +pub fn hash(secret: &str) -> String { + sha256::digest(format!("also-a-very-good-salt{}", secret)) +} diff --git a/client-native-lib/src/main.rs b/client-native-lib/src/main.rs new file mode 100644 index 0000000..f2b79e2 --- /dev/null +++ b/client-native-lib/src/main.rs @@ -0,0 +1,85 @@ +#![feature(async_closure)] +#![feature(box_syntax)] + +use clap::{Parser, Subcommand}; +use log::{debug, error}; +use signaling::signaling_connect; +use state::State; +use std::sync::Arc; +use tokio::sync::RwLock; +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, + }, + interceptor::registry::Registry, +}; + +pub mod crypto; +pub mod peer; +pub mod protocol; +pub mod signaling; +pub mod state; + +fn main() { + env_logger::init_from_env("LOG"); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(run()) +} + +#[derive(Parser)] +pub struct Args { + #[clap(long, default_value = "meet.metamuffin.org")] + signaling_host: String, + #[clap(short, long)] + secret: String, + #[clap(subcommand)] + action: Action, +} +#[derive(Subcommand)] +pub enum Action { + Send {}, + Receive {}, +} + +async fn run() { + let args = Args::parse(); + let (sender, mut recv) = signaling_connect(&args.signaling_host, &args.secret).await; + + let key = crypto::Key::derive(&args.secret); + + let mut media_engine = MediaEngine::default(); + media_engine.register_default_codecs().unwrap(); + let mut registry = Registry::new(); + registry = register_default_interceptors(registry, &mut media_engine).unwrap(); + let api = APIBuilder::new() + .with_media_engine(media_engine) + .with_interceptor_registry(registry) + .build(); + + let state = Arc::new(State { + peers: Default::default(), + key, + api, + my_id: RwLock::new(None), + sender, + args, + }); + + { + let state = state.clone(); + tokio::spawn(async move { + debug!("receiving packets now"); + while let Some(packet) = recv.recv().await { + debug!("{packet:?}"); + let state = state.clone(); + state.on_message(packet).await + } + }); + } + + tokio::signal::ctrl_c().await.unwrap(); + error!("interrupt received, exiting"); +} diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs new file mode 100644 index 0000000..8402fc6 --- /dev/null +++ b/client-native-lib/src/peer.rs @@ -0,0 +1,197 @@ +use log::info; +use std::sync::Arc; +use webrtc::{ + data_channel::data_channel_message::DataChannelMessage, + ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer}, + peer_connection::{ + configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, + sdp::session_description::RTCSessionDescription, RTCPeerConnection, + }, +}; + +use crate::{ + protocol::{self, RTCSessionDescriptionInit, RelayMessage}, + state::State, + Action, +}; + +pub struct Peer { + state: Arc, // maybe use Weak later + peer_connection: RTCPeerConnection, + id: usize, +} + +impl Peer { + pub async fn create(state: Arc, id: usize) -> Arc { + info!("({id}) peer joined"); + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:metamuffin.org:16900".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + let peer_connection = state.api.new_peer_connection(config).await.unwrap(); + + let peer = Arc::new(Self { + peer_connection, + id, + state: state.clone(), + }); + peer.peer_connection + .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + println!("conn state: {s}"); + Box::pin(async {}) + })) + .await; + + { + let peer2 = peer.clone(); + peer.peer_connection + .on_ice_candidate(box move |c| { + let peer = peer2.clone(); + Box::pin(async move { + if let Some(c) = c { + peer.on_ice_candidate(c).await + } + }) + }) + .await; + } + + { + let peer2 = peer.clone(); + peer.peer_connection + .on_negotiation_needed(box move || { + let peer = peer2.clone(); + Box::pin(async { peer.on_negotiation_needed().await }) + }) + .await; + } + + { + peer.peer_connection + .on_data_channel(box move |dc| { + Box::pin(async move { + dc.on_message(box move |message| { + Box::pin(async move { println!("{:?}", message.data) }) + }) + .await + }) + }) + .await; + } + + if let Action::Send { .. } = &peer.state.args.action { + peer.start_transfer().await + } + + peer + } + + pub async fn send_relay(&self, inner: RelayMessage) { + self.state.send_relay(self.id, inner).await + } + + pub async fn start_transfer(&self) { + info!("starting data channel"); + let data_channel = self + .peer_connection + .create_data_channel("file-transfer", None) + .await + .unwrap(); + + data_channel + .on_message(Box::new(move |msg: DataChannelMessage| { + let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); + println!("message! '{}'", msg_str); + Box::pin(async {}) + })) + .await; + + { + let dc2 = data_channel.clone(); + data_channel + .on_open(box move || { + let data_channel = dc2.clone(); + Box::pin(async move { + loop { + data_channel + .send(&bytes::Bytes::from_static(b"test\n")) + .await + .unwrap(); + } + }) + }) + .await; + } + } + + pub async fn on_relay(&self, p: RelayMessage) { + match p { + protocol::RelayMessage::Offer(o) => self.on_offer(o).await, + protocol::RelayMessage::Answer(a) => self.on_answer(a).await, + protocol::RelayMessage::IceCandidate(c) => { + info!("received ICE candidate"); + self.peer_connection.add_ice_candidate(c).await.unwrap(); + } + } + } + + pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) { + self.send_relay(RelayMessage::IceCandidate( + candidate.to_json().await.unwrap(), + )) + .await; + } + + pub async fn on_negotiation_needed(self: Arc) { + info!("({}) negotiation needed", self.id); + self.offer().await + } + + pub async fn offer(&self) { + info!("sending offer"); + let offer = self.peer_connection.create_offer(None).await.unwrap(); + self.peer_connection + .set_local_description(offer.clone()) + .await + .unwrap(); + self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit { + sdp: offer.sdp, + ty: offer.sdp_type, + })) + .await + } + pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) { + info!("received offer"); + let offer = RTCSessionDescription::offer(offer.sdp).unwrap(); + self.peer_connection + .set_remote_description(offer) + .await + .unwrap(); + self.answer().await + } + pub async fn answer(&self) { + info!("sending answer"); + let offer = self.peer_connection.create_answer(None).await.unwrap(); + self.peer_connection + .set_local_description(offer.clone()) + .await + .unwrap(); + self.send_relay(protocol::RelayMessage::Answer(RTCSessionDescriptionInit { + sdp: offer.sdp, + ty: offer.sdp_type, + })) + .await + } + pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) { + info!("received answer"); + let offer = RTCSessionDescription::answer(answer.sdp).unwrap(); + self.peer_connection + .set_remote_description(offer) + .await + .unwrap(); + } +} diff --git a/client-native-lib/src/protocol.rs b/client-native-lib/src/protocol.rs new file mode 100644 index 0000000..431dc42 --- /dev/null +++ b/client-native-lib/src/protocol.rs @@ -0,0 +1,43 @@ +use serde::{Deserialize, Serialize}; +use webrtc::{ + ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::sdp_type::RTCSdpType, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ClientboundPacket { + Init { your_id: usize, version: String }, + ClientJoin { id: usize }, + ClientLeave { id: usize }, + Message { sender: usize, message: String }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ServerboundPacket { + Ping, + Relay { + recipient: Option, + message: String, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RelayMessageWrapper { + pub sender: usize, // redundant, but ensures the server didnt cheat + pub inner: RelayMessage, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RelayMessage { + Offer(RTCSessionDescriptionInit), + Answer(RTCSessionDescriptionInit), + IceCandidate(RTCIceCandidateInit), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RTCSessionDescriptionInit { + #[serde(rename = "type")] + pub ty: RTCSdpType, + pub sdp: String, +} diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs new file mode 100644 index 0000000..2ac3edc --- /dev/null +++ b/client-native-lib/src/signaling.rs @@ -0,0 +1,59 @@ +use crate::protocol::ClientboundPacket; +use crate::{crypto::hash, protocol::ServerboundPacket}; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, info}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio_tungstenite::tungstenite::{self, Message}; + +pub async fn signaling_connect( + host: &str, + secret: &str, +) -> ( + UnboundedSender, + UnboundedReceiver, +) { + let uri = format!("wss://{host}/signaling/{}", hash(secret)); + info!("connecting to signaling server at {uri:?}"); + let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) + .await + .unwrap(); + info!("connection established"); + + let (mut tx, rx) = conn.split(); + + let (in_tx, in_rx) = unbounded_channel(); + let (out_tx, mut out_rx) = unbounded_channel(); + + tokio::spawn(async { + rx.for_each(move |mesg| { + info!("packet in"); + let mesg = mesg.unwrap(); + match mesg { + tungstenite::Message::Text(t) => { + let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); + debug!("<- {p:?}"); + in_tx.send(p).unwrap() + } + tungstenite::Message::Close(_) => { + eprintln!("ws closed :("); + unreachable!(); + } + _ => (), + } + Box::pin(async { () }) + }) + .await; + }); + tokio::spawn(async move { + while let Some(p) = out_rx.recv().await { + debug!(" -> {p:?}"); + tx.send(Message::Text( + serde_json::to_string::(&p).unwrap(), + )) + .await + .unwrap() + } + }); + + (out_tx, in_rx) +} diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs new file mode 100644 index 0000000..baea90a --- /dev/null +++ b/client-native-lib/src/state.rs @@ -0,0 +1,77 @@ +use std::{collections::HashMap, sync::Arc}; + +use log::warn; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; +use webrtc::api::API; + +use crate::{ + crypto::Key, + peer::Peer, + protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, + Action, Args, +}; + +pub struct State { + pub args: Args, + pub api: API, + pub key: Key, + pub my_id: RwLock>, + pub sender: UnboundedSender, + pub peers: RwLock>>, +} +impl State { + pub async fn my_id(&self) -> usize { + self.my_id.read().await.expect("not initialized yet") + } + + pub async fn on_message(self: Arc, packet: ClientboundPacket) { + match packet { + protocol::ClientboundPacket::Init { + your_id, + version: _, + } => { + *self.my_id.write().await = Some(your_id); + } + protocol::ClientboundPacket::ClientJoin { id } => { + if id == self.my_id().await { + // we joined - YAY! + } else { + self.peers + .write() + .await + .insert(id, Peer::create(self.clone(), id).await); + if let Action::Send { .. } = &self.args.action {} + } + } + protocol::ClientboundPacket::ClientLeave { id: _ } => {} + protocol::ClientboundPacket::Message { sender, message } => { + let message = self.key.decrypt(&message); + let p = serde_json::from_str::(&message).unwrap(); + self.on_relay(sender, p.inner).await; + } + } + } + + pub async fn on_relay(&self, sender: usize, p: RelayMessage) { + if let Some(peer) = self.peers.read().await.get(&sender).cloned() { + peer.on_relay(p).await + } else { + warn!("got a packet from a non-existent peer") + } + } + + pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { + self.sender + .send(ServerboundPacket::Relay { + recipient: Some(recipient), + message: self.key.encrypt( + &serde_json::to_string(&RelayMessageWrapper { + sender: self.my_id.read().await.expect("not ready to relay yet.."), + inner, + }) + .unwrap(), + ), + }) + .unwrap() + } +} diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml index 9030315..f87cf16 100644 --- a/client-native-rift/Cargo.toml +++ b/client-native-rift/Cargo.toml @@ -4,24 +4,4 @@ version = "0.1.0" edition = "2021" [dependencies] -tokio = { version = "1.21", features = ["full"] } -futures-util = "0.3.24" - -webrtc = "0.5.0" -tokio-tungstenite = { version = "*", features = ["rustls-tls"] } -url = "2.3.1" - -serde = { version = "1.0.144", features = ["derive"] } -serde_json = "*" - -clap = { version = "3.2.21", features = ["derive"] } -env_logger = "0.8" -log = "0.4" - -fastpbkdf2 = "0.1.0" -aes-gcm = "0.10.1" -sha256 = "1.0.3" -rand = "0.8.5" -rand_chacha = "0.3.1" -base64 = "0.13.0" -bytes = "1.2.1" +client-native-lib = { path = "../client-native-lib" } diff --git a/client-native-rift/src/crypto.rs b/client-native-rift/src/crypto.rs deleted file mode 100644 index 9bd8908..0000000 --- a/client-native-rift/src/crypto.rs +++ /dev/null @@ -1,39 +0,0 @@ -use aes_gcm::{ - aead::{generic_array::sequence::GenericSequence, Aead}, - Aes256Gcm, KeyInit, Nonce, -}; -use log::info; - -pub struct Key(Aes256Gcm); - -impl Key { - pub fn derive(secret: &str) -> Self { - info!("running key generation... this might take some™ time"); - let salt = base64::decode("thisisagoodsaltAAAAAAA==").unwrap(); - let mut key = [0u8; 32]; - fastpbkdf2::pbkdf2_hmac_sha256(secret.as_bytes(), salt.as_slice(), 250000, &mut key); - - let key = Aes256Gcm::new_from_slice(key.as_slice()).unwrap(); - - info!("done"); - Self(key) - } - pub fn encrypt(&self, s: &str) -> String { - let iv = Nonce::generate(|_| rand::random()); // TODO check if this is secure randomness - let ciphertext = self.0.encrypt(&iv, s.as_bytes()).unwrap(); - let mut packet = iv.to_vec(); // TODO this could be doing less allocations - packet.extend(ciphertext); - base64::encode(packet) - } - pub fn decrypt(&self, s: &str) -> String { - let r = base64::decode(s).unwrap(); - let iv = &r[0..12]; - let ciphertext = &r[12..]; - let plaintext = self.0.decrypt(Nonce::from_slice(iv), ciphertext).unwrap(); - String::from_utf8(plaintext).unwrap() - } -} - -pub fn hash(secret: &str) -> String { - sha256::digest(format!("also-a-very-good-salt{}", secret)) -} diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index f2b79e2..e7a11a9 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -1,85 +1,3 @@ -#![feature(async_closure)] -#![feature(box_syntax)] - -use clap::{Parser, Subcommand}; -use log::{debug, error}; -use signaling::signaling_connect; -use state::State; -use std::sync::Arc; -use tokio::sync::RwLock; -use webrtc::{ - api::{ - interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, - }, - interceptor::registry::Registry, -}; - -pub mod crypto; -pub mod peer; -pub mod protocol; -pub mod signaling; -pub mod state; - fn main() { - env_logger::init_from_env("LOG"); - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(run()) -} - -#[derive(Parser)] -pub struct Args { - #[clap(long, default_value = "meet.metamuffin.org")] - signaling_host: String, - #[clap(short, long)] - secret: String, - #[clap(subcommand)] - action: Action, -} -#[derive(Subcommand)] -pub enum Action { - Send {}, - Receive {}, -} - -async fn run() { - let args = Args::parse(); - let (sender, mut recv) = signaling_connect(&args.signaling_host, &args.secret).await; - - let key = crypto::Key::derive(&args.secret); - - let mut media_engine = MediaEngine::default(); - media_engine.register_default_codecs().unwrap(); - let mut registry = Registry::new(); - registry = register_default_interceptors(registry, &mut media_engine).unwrap(); - let api = APIBuilder::new() - .with_media_engine(media_engine) - .with_interceptor_registry(registry) - .build(); - - let state = Arc::new(State { - peers: Default::default(), - key, - api, - my_id: RwLock::new(None), - sender, - args, - }); - - { - let state = state.clone(); - tokio::spawn(async move { - debug!("receiving packets now"); - while let Some(packet) = recv.recv().await { - debug!("{packet:?}"); - let state = state.clone(); - state.on_message(packet).await - } - }); - } - - tokio::signal::ctrl_c().await.unwrap(); - error!("interrupt received, exiting"); + println!("Hello, world!"); } diff --git a/client-native-rift/src/peer.rs b/client-native-rift/src/peer.rs deleted file mode 100644 index 8402fc6..0000000 --- a/client-native-rift/src/peer.rs +++ /dev/null @@ -1,197 +0,0 @@ -use log::info; -use std::sync::Arc; -use webrtc::{ - data_channel::data_channel_message::DataChannelMessage, - ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer}, - peer_connection::{ - configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, - sdp::session_description::RTCSessionDescription, RTCPeerConnection, - }, -}; - -use crate::{ - protocol::{self, RTCSessionDescriptionInit, RelayMessage}, - state::State, - Action, -}; - -pub struct Peer { - state: Arc, // maybe use Weak later - peer_connection: RTCPeerConnection, - id: usize, -} - -impl Peer { - pub async fn create(state: Arc, id: usize) -> Arc { - info!("({id}) peer joined"); - let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec!["stun:metamuffin.org:16900".to_owned()], - ..Default::default() - }], - ..Default::default() - }; - - let peer_connection = state.api.new_peer_connection(config).await.unwrap(); - - let peer = Arc::new(Self { - peer_connection, - id, - state: state.clone(), - }); - peer.peer_connection - .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - println!("conn state: {s}"); - Box::pin(async {}) - })) - .await; - - { - let peer2 = peer.clone(); - peer.peer_connection - .on_ice_candidate(box move |c| { - let peer = peer2.clone(); - Box::pin(async move { - if let Some(c) = c { - peer.on_ice_candidate(c).await - } - }) - }) - .await; - } - - { - let peer2 = peer.clone(); - peer.peer_connection - .on_negotiation_needed(box move || { - let peer = peer2.clone(); - Box::pin(async { peer.on_negotiation_needed().await }) - }) - .await; - } - - { - peer.peer_connection - .on_data_channel(box move |dc| { - Box::pin(async move { - dc.on_message(box move |message| { - Box::pin(async move { println!("{:?}", message.data) }) - }) - .await - }) - }) - .await; - } - - if let Action::Send { .. } = &peer.state.args.action { - peer.start_transfer().await - } - - peer - } - - pub async fn send_relay(&self, inner: RelayMessage) { - self.state.send_relay(self.id, inner).await - } - - pub async fn start_transfer(&self) { - info!("starting data channel"); - let data_channel = self - .peer_connection - .create_data_channel("file-transfer", None) - .await - .unwrap(); - - data_channel - .on_message(Box::new(move |msg: DataChannelMessage| { - let msg_str = String::from_utf8(msg.data.to_vec()).unwrap(); - println!("message! '{}'", msg_str); - Box::pin(async {}) - })) - .await; - - { - let dc2 = data_channel.clone(); - data_channel - .on_open(box move || { - let data_channel = dc2.clone(); - Box::pin(async move { - loop { - data_channel - .send(&bytes::Bytes::from_static(b"test\n")) - .await - .unwrap(); - } - }) - }) - .await; - } - } - - pub async fn on_relay(&self, p: RelayMessage) { - match p { - protocol::RelayMessage::Offer(o) => self.on_offer(o).await, - protocol::RelayMessage::Answer(a) => self.on_answer(a).await, - protocol::RelayMessage::IceCandidate(c) => { - info!("received ICE candidate"); - self.peer_connection.add_ice_candidate(c).await.unwrap(); - } - } - } - - pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) { - self.send_relay(RelayMessage::IceCandidate( - candidate.to_json().await.unwrap(), - )) - .await; - } - - pub async fn on_negotiation_needed(self: Arc) { - info!("({}) negotiation needed", self.id); - self.offer().await - } - - pub async fn offer(&self) { - info!("sending offer"); - let offer = self.peer_connection.create_offer(None).await.unwrap(); - self.peer_connection - .set_local_description(offer.clone()) - .await - .unwrap(); - self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit { - sdp: offer.sdp, - ty: offer.sdp_type, - })) - .await - } - pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) { - info!("received offer"); - let offer = RTCSessionDescription::offer(offer.sdp).unwrap(); - self.peer_connection - .set_remote_description(offer) - .await - .unwrap(); - self.answer().await - } - pub async fn answer(&self) { - info!("sending answer"); - let offer = self.peer_connection.create_answer(None).await.unwrap(); - self.peer_connection - .set_local_description(offer.clone()) - .await - .unwrap(); - self.send_relay(protocol::RelayMessage::Answer(RTCSessionDescriptionInit { - sdp: offer.sdp, - ty: offer.sdp_type, - })) - .await - } - pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) { - info!("received answer"); - let offer = RTCSessionDescription::answer(answer.sdp).unwrap(); - self.peer_connection - .set_remote_description(offer) - .await - .unwrap(); - } -} diff --git a/client-native-rift/src/protocol.rs b/client-native-rift/src/protocol.rs deleted file mode 100644 index 431dc42..0000000 --- a/client-native-rift/src/protocol.rs +++ /dev/null @@ -1,43 +0,0 @@ -use serde::{Deserialize, Serialize}; -use webrtc::{ - ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::sdp_type::RTCSdpType, -}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum ClientboundPacket { - Init { your_id: usize, version: String }, - ClientJoin { id: usize }, - ClientLeave { id: usize }, - Message { sender: usize, message: String }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum ServerboundPacket { - Ping, - Relay { - recipient: Option, - message: String, - }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RelayMessageWrapper { - pub sender: usize, // redundant, but ensures the server didnt cheat - pub inner: RelayMessage, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum RelayMessage { - Offer(RTCSessionDescriptionInit), - Answer(RTCSessionDescriptionInit), - IceCandidate(RTCIceCandidateInit), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RTCSessionDescriptionInit { - #[serde(rename = "type")] - pub ty: RTCSdpType, - pub sdp: String, -} diff --git a/client-native-rift/src/signaling.rs b/client-native-rift/src/signaling.rs deleted file mode 100644 index 2ac3edc..0000000 --- a/client-native-rift/src/signaling.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::protocol::ClientboundPacket; -use crate::{crypto::hash, protocol::ServerboundPacket}; -use futures_util::{SinkExt, StreamExt}; -use log::{debug, info}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio_tungstenite::tungstenite::{self, Message}; - -pub async fn signaling_connect( - host: &str, - secret: &str, -) -> ( - UnboundedSender, - UnboundedReceiver, -) { - let uri = format!("wss://{host}/signaling/{}", hash(secret)); - info!("connecting to signaling server at {uri:?}"); - let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap()) - .await - .unwrap(); - info!("connection established"); - - let (mut tx, rx) = conn.split(); - - let (in_tx, in_rx) = unbounded_channel(); - let (out_tx, mut out_rx) = unbounded_channel(); - - tokio::spawn(async { - rx.for_each(move |mesg| { - info!("packet in"); - let mesg = mesg.unwrap(); - match mesg { - tungstenite::Message::Text(t) => { - let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap(); - debug!("<- {p:?}"); - in_tx.send(p).unwrap() - } - tungstenite::Message::Close(_) => { - eprintln!("ws closed :("); - unreachable!(); - } - _ => (), - } - Box::pin(async { () }) - }) - .await; - }); - tokio::spawn(async move { - while let Some(p) = out_rx.recv().await { - debug!(" -> {p:?}"); - tx.send(Message::Text( - serde_json::to_string::(&p).unwrap(), - )) - .await - .unwrap() - } - }); - - (out_tx, in_rx) -} diff --git a/client-native-rift/src/state.rs b/client-native-rift/src/state.rs deleted file mode 100644 index baea90a..0000000 --- a/client-native-rift/src/state.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use log::warn; -use tokio::sync::{mpsc::UnboundedSender, RwLock}; -use webrtc::api::API; - -use crate::{ - crypto::Key, - peer::Peer, - protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket}, - Action, Args, -}; - -pub struct State { - pub args: Args, - pub api: API, - pub key: Key, - pub my_id: RwLock>, - pub sender: UnboundedSender, - pub peers: RwLock>>, -} -impl State { - pub async fn my_id(&self) -> usize { - self.my_id.read().await.expect("not initialized yet") - } - - pub async fn on_message(self: Arc, packet: ClientboundPacket) { - match packet { - protocol::ClientboundPacket::Init { - your_id, - version: _, - } => { - *self.my_id.write().await = Some(your_id); - } - protocol::ClientboundPacket::ClientJoin { id } => { - if id == self.my_id().await { - // we joined - YAY! - } else { - self.peers - .write() - .await - .insert(id, Peer::create(self.clone(), id).await); - if let Action::Send { .. } = &self.args.action {} - } - } - protocol::ClientboundPacket::ClientLeave { id: _ } => {} - protocol::ClientboundPacket::Message { sender, message } => { - let message = self.key.decrypt(&message); - let p = serde_json::from_str::(&message).unwrap(); - self.on_relay(sender, p.inner).await; - } - } - } - - pub async fn on_relay(&self, sender: usize, p: RelayMessage) { - if let Some(peer) = self.peers.read().await.get(&sender).cloned() { - peer.on_relay(p).await - } else { - warn!("got a packet from a non-existent peer") - } - } - - pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) { - self.sender - .send(ServerboundPacket::Relay { - recipient: Some(recipient), - message: self.key.encrypt( - &serde_json::to_string(&RelayMessageWrapper { - sender: self.my_id.read().await.expect("not ready to relay yet.."), - inner, - }) - .unwrap(), - ), - }) - .unwrap() - } -} -- cgit v1.2.3-70-g09d2