summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-14 18:59:30 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-14 18:59:30 +0200
commit401ee1336f83a9172b0cc4231b382c6a099bb66c (patch)
tree324f946ac46ea00aa365ce4afd3a67acdb53d1aa
parent473c7624c1419c6636addebb183dede5be88b061 (diff)
downloadkeks-meet-401ee1336f83a9172b0cc4231b382c6a099bb66c.tar
keks-meet-401ee1336f83a9172b0cc4231b382c6a099bb66c.tar.bz2
keks-meet-401ee1336f83a9172b0cc4231b382c6a099bb66c.tar.zst
just a in-between state
-rw-r--r--Cargo.lock82
-rw-r--r--client-native-rift/Cargo.toml1
-rw-r--r--client-native-rift/src/crypto.rs7
-rw-r--r--client-native-rift/src/main.rs244
-rw-r--r--client-native-rift/src/protocol.rs17
-rw-r--r--client-native-rift/src/signaling.rs69
6 files changed, 336 insertions, 84 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 799387b..f448a85 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -383,11 +383,51 @@ dependencies = [
]
[[package]]
+name = "clap"
+version = "3.2.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ed5341b2301a26ab80be5cbdced622e80ed808483c52e45e3310a877d3b37d7"
+dependencies = [
+ "atty",
+ "bitflags",
+ "clap_derive",
+ "clap_lex",
+ "indexmap",
+ "once_cell",
+ "strsim",
+ "termcolor",
+ "textwrap",
+]
+
+[[package]]
+name = "clap_derive"
+version = "3.2.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65"
+dependencies = [
+ "heck",
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "clap_lex"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
+dependencies = [
+ "os_str_bytes",
+]
+
+[[package]]
name = "client-native-rift"
version = "0.1.0"
dependencies = [
"aes-gcm 0.10.1",
"base64",
+ "clap",
"env_logger",
"fastpbkdf2",
"futures-util",
@@ -954,6 +994,12 @@ dependencies = [
]
[[package]]
+name = "heck"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
+
+[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1401,6 +1447,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
+name = "os_str_bytes"
+version = "6.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
+
+[[package]]
name = "p256"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1553,6 +1605,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
+name = "proc-macro-error"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
+dependencies = [
+ "proc-macro-error-attr",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "version_check",
+]
+
+[[package]]
+name = "proc-macro-error-attr"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "version_check",
+]
+
+[[package]]
name = "proc-macro2"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2088,6 +2164,12 @@ dependencies = [
]
[[package]]
+name = "textwrap"
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
+
+[[package]]
name = "thiserror"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml
index de44966..7e1911d 100644
--- a/client-native-rift/Cargo.toml
+++ b/client-native-rift/Cargo.toml
@@ -14,6 +14,7 @@ url = "2.3.1"
serde = { version = "1.0.144", features = ["derive"] }
serde_json = "*"
+clap = { version = "3.2.21", features = ["derive"] }
env_logger = "0.8"
log = "0.4"
diff --git a/client-native-rift/src/crypto.rs b/client-native-rift/src/crypto.rs
index 3f00c67..9bd8908 100644
--- a/client-native-rift/src/crypto.rs
+++ b/client-native-rift/src/crypto.rs
@@ -2,17 +2,20 @@ use aes_gcm::{
aead::{generic_array::sequence::GenericSequence, Aead},
Aes256Gcm, KeyInit, Nonce,
};
+use log::info;
pub struct Key(Aes256Gcm);
impl Key {
- pub fn derive(secret: String) -> Self {
+ pub fn derive(secret: &str) -> Self {
+ info!("running key generation... this might take someā„¢ time");
let salt = base64::decode("thisisagoodsaltAAAAAAA==").unwrap();
- let mut key = [0u8; 256];
+ let mut key = [0u8; 32];
fastpbkdf2::pbkdf2_hmac_sha256(secret.as_bytes(), salt.as_slice(), 250000, &mut key);
let key = Aes256Gcm::new_from_slice(key.as_slice()).unwrap();
+ info!("done");
Self(key)
}
pub fn encrypt(&self, s: &str) -> String {
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index d9935e8..fbc8387 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -1,18 +1,30 @@
#![feature(async_closure)]
+#![feature(box_syntax)]
-use std::{sync::Arc, time::Duration};
-
+use crate::protocol::{RTCSessionDescriptionInit, RelayMessageWrapper, ServerboundPacket};
+use clap::{Parser, Subcommand};
+use crypto::Key;
+use futures_util::{SinkExt, StreamExt};
+use log::{debug, info};
+use protocol::{ClientboundPacket, RelayMessage};
use signaling::signaling_connect;
+use std::{
+ collections::HashMap,
+ sync::{Arc, Weak},
+ time::Duration,
+};
+use tokio::sync::{mpsc::UnboundedSender, Mutex, RwLock};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
+ API,
},
data_channel::data_channel_message::DataChannelMessage,
ice_transport::ice_server::RTCIceServer,
interceptor::registry::Registry,
peer_connection::{
- configuration::RTCConfiguration, math_rand_alpha,
- peer_connection_state::RTCPeerConnectionState,
+ configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
+ RTCPeerConnection,
},
};
@@ -21,6 +33,7 @@ pub mod protocol;
pub mod signaling;
fn main() {
+ env_logger::init_from_env("LOG");
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
@@ -28,8 +41,26 @@ fn main() {
.block_on(run())
}
+#[derive(Parser)]
+struct Args {
+ #[clap(long, default_value = "meet.metamuffin.org")]
+ signaling_host: String,
+ #[clap(short, long)]
+ secret: String,
+ #[clap(subcommand)]
+ action: Action,
+}
+#[derive(Subcommand)]
+enum Action {
+ Send {},
+ Receive {},
+}
+
async fn run() {
- let (send, recv) = signaling_connect("meet.metamuffin.org", "hunter2").await;
+ let args = Args::parse();
+ let (sender, mut recv) = signaling_connect(&args.signaling_host, &args.secret).await;
+
+ let key = crypto::Key::derive(&args.secret);
let mut media_engine = MediaEngine::default();
media_engine.register_default_codecs().unwrap();
@@ -40,48 +71,28 @@ async fn run() {
.with_interceptor_registry(registry)
.build();
- let config = RTCConfiguration {
- ice_servers: vec![RTCIceServer {
- urls: vec!["stun:metamuffin.org:16900".to_owned()],
- ..Default::default()
- }],
- ..Default::default()
- };
+ let state = Arc::new(State {
+ peers: Default::default(),
+ key,
+ api,
+ my_id: RwLock::new(None),
+ sender,
+ args,
+ });
- let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap());
+ {
+ let state = state.clone();
+ tokio::spawn(async move {
+ debug!("receiving packets now");
+ while let Some(packet) = recv.recv().await {
+ debug!("{packet:?}");
+ let state = state.clone();
+ state.on_message(packet).await
+ }
+ });
+ }
- let data_channel = peer_connection
- .create_data_channel("data", None)
- .await
- .unwrap();
-
- let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
-
- peer_connection
- .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
- println!("conn state: {s}");
- Box::pin(async {})
- }))
- .await;
-
- let d_label = data_channel.label().to_owned();
- data_channel
- .on_message(Box::new(move |msg: DataChannelMessage| {
- let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
- println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
- Box::pin(async {})
- }))
- .await;
-
- let offer = peer_connection.create_offer(None).await.unwrap();
- peer_connection
- .set_local_description(offer.clone())
- .await
- .unwrap();
-
- println!("{offer:?}");
-
- tokio::time::sleep(Duration::from_secs(5)).await;
+ tokio::time::sleep(Duration::from_secs(10000)).await;
// // Wait for the answer to be pasted
// let line = signal::must_read_stdin().unwrap();
@@ -91,3 +102,146 @@ async fn run() {
// // Apply the answer as the remote description
// peer_connection.set_remote_description(answer).await?;
}
+
+pub struct State {
+ args: Args,
+ api: API,
+ key: Key,
+ my_id: RwLock<Option<usize>>,
+ sender: UnboundedSender<ServerboundPacket>,
+ peers: RwLock<HashMap<usize, Arc<Peer>>>,
+}
+impl State {
+ pub async fn my_id(&self) -> usize {
+ self.my_id.read().await.expect("not initialized yet")
+ }
+
+ pub async fn on_message(&self, packet: ClientboundPacket) {
+ match packet {
+ protocol::ClientboundPacket::Init {
+ your_id,
+ version: _,
+ } => {
+ *self.my_id.write().await = Some(your_id);
+ }
+ protocol::ClientboundPacket::ClientJoin { id } => {
+ if id == self.my_id().await {
+ // we joined - YAY!
+ } else {
+ if let Action::Send { .. } = &self.args.action {}
+ }
+ }
+ protocol::ClientboundPacket::ClientLeave { id: _ } => {}
+ protocol::ClientboundPacket::Message { sender, message } => {
+ let message = self.key.decrypt(&message);
+ let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
+ self.on_relay(sender, p.inner).await;
+ }
+ }
+ }
+
+ pub async fn on_relay(&self, sender: usize, p: RelayMessage) {}
+
+ pub async fn send_relay(&self, receipient: usize, inner: RelayMessage) {
+ self.sender
+ .send(ServerboundPacket::Relay {
+ recipient: Some(0),
+ message: self.key.encrypt(
+ &serde_json::to_string(&RelayMessageWrapper {
+ sender: self.my_id.read().await.expect("not ready to relay yet.."),
+ inner,
+ })
+ .unwrap(),
+ ),
+ })
+ .unwrap()
+ }
+}
+
+pub struct Peer {
+ state: Arc<State>, // maybe use Weak later
+ peer_connection: RTCPeerConnection,
+ id: usize,
+}
+
+impl Peer {
+ pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> {
+ let config = RTCConfiguration {
+ ice_servers: vec![RTCIceServer {
+ urls: vec!["stun:metamuffin.org:16900".to_owned()],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let peer_connection = state.api.new_peer_connection(config).await.unwrap();
+
+ let peer = Arc::new(Self {
+ peer_connection,
+ id,
+ state: state.clone(),
+ });
+ peer.peer_connection
+ .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
+ println!("conn state: {s}");
+ Box::pin(async {})
+ }))
+ .await;
+
+ {
+ let peer2 = peer.clone();
+ peer.peer_connection
+ .on_negotiation_needed(box move || {
+ let peer = peer2.clone();
+ Box::pin(async { peer.on_negotiation_needed().await })
+ })
+ .await;
+ }
+ peer
+ }
+
+ pub async fn send_relay(&self, inner: RelayMessage) {
+ self.state.send_relay(self.id, inner).await
+ }
+
+ pub async fn start_transfer(&self) {
+ info!("starting data channel");
+ let data_channel = self
+ .peer_connection
+ .create_data_channel("file-transfer", None)
+ .await
+ .unwrap();
+
+ data_channel
+ .on_message(Box::new(move |msg: DataChannelMessage| {
+ let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
+ println!("message! '{}'", msg_str);
+ Box::pin(async {})
+ }))
+ .await;
+ }
+
+ pub async fn on_relay(&self, p: RelayMessage) {
+ match p {
+ protocol::RelayMessage::Offer(o) => todo!(),
+ protocol::RelayMessage::Answer(a) => todo!(),
+ protocol::RelayMessage::IceCandidate(c) => {
+ self.peer_connection.add_ice_candidate(c).await.unwrap();
+ }
+ }
+ }
+
+ pub async fn on_negotiation_needed(self: Arc<Self>) {
+ info!("({}) negotiation needed", self.id);
+ let offer = self.peer_connection.create_offer(None).await.unwrap();
+ self.peer_connection
+ .set_local_description(offer.clone())
+ .await
+ .unwrap();
+ self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit {
+ sdp: offer.sdp,
+ ty: offer.sdp_type,
+ }))
+ .await
+ }
+}
diff --git a/client-native-rift/src/protocol.rs b/client-native-rift/src/protocol.rs
index b3719d0..431dc42 100644
--- a/client-native-rift/src/protocol.rs
+++ b/client-native-rift/src/protocol.rs
@@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
-use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;
+use webrtc::{
+ ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::sdp_type::RTCSdpType,
+};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -22,8 +24,8 @@ pub enum ServerboundPacket {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayMessageWrapper {
- sender: usize, // redundant, but ensures the server didnt cheat
- inner: RelayMessage,
+ pub sender: usize, // redundant, but ensures the server didnt cheat
+ pub inner: RelayMessage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -39,12 +41,3 @@ pub struct RTCSessionDescriptionInit {
pub ty: RTCSdpType,
pub sdp: String,
}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct RTCIceCandidateInit {
- pub candidate: String,
- pub sdp_m_line_index: Option<usize>,
- pub sdp_mid: Option<String>,
- pub username_fragment: Option<String>,
-}
diff --git a/client-native-rift/src/signaling.rs b/client-native-rift/src/signaling.rs
index c61b982..2ac3edc 100644
--- a/client-native-rift/src/signaling.rs
+++ b/client-native-rift/src/signaling.rs
@@ -1,40 +1,59 @@
use crate::protocol::ClientboundPacket;
use crate::{crypto::hash, protocol::ServerboundPacket};
use futures_util::{SinkExt, StreamExt};
+use log::{debug, info};
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_tungstenite::tungstenite::{self, Message};
pub async fn signaling_connect(
host: &str,
secret: &str,
) -> (
- impl SinkExt<ServerboundPacket>,
- impl StreamExt<Item = Option<ClientboundPacket>>,
+ UnboundedSender<ServerboundPacket>,
+ UnboundedReceiver<ClientboundPacket>,
) {
- let (conn, _) = tokio_tungstenite::connect_async(
- url::Url::parse(&format!("wss://{host}/signaling/{}", hash(secret))).unwrap(),
- )
- .await
- .unwrap();
+ let uri = format!("wss://{host}/signaling/{}", hash(secret));
+ info!("connecting to signaling server at {uri:?}");
+ let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap())
+ .await
+ .unwrap();
+ info!("connection established");
- let (tx, rx) = conn.split();
- let prx = rx.map(|mesg| {
- let mesg = mesg.unwrap();
- match mesg {
- tungstenite::Message::Text(t) => {
- let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
- Some(p)
- }
- tungstenite::Message::Close(_) => {
- eprintln!("ws closed :(");
- None
+ let (mut tx, rx) = conn.split();
+
+ let (in_tx, in_rx) = unbounded_channel();
+ let (out_tx, mut out_rx) = unbounded_channel();
+
+ tokio::spawn(async {
+ rx.for_each(move |mesg| {
+ info!("packet in");
+ let mesg = mesg.unwrap();
+ match mesg {
+ tungstenite::Message::Text(t) => {
+ let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
+ debug!("<- {p:?}");
+ in_tx.send(p).unwrap()
+ }
+ tungstenite::Message::Close(_) => {
+ eprintln!("ws closed :(");
+ unreachable!();
+ }
+ _ => (),
}
- _ => None,
- }
+ Box::pin(async { () })
+ })
+ .await;
});
- let ptx = tx.with(async move |p| {
- Ok::<_, tokio_tungstenite::tungstenite::error::Error>(Message::Text(
- serde_json::to_string::<ServerboundPacket>(&p).unwrap(),
- ))
+ tokio::spawn(async move {
+ while let Some(p) = out_rx.recv().await {
+ debug!(" -> {p:?}");
+ tx.send(Message::Text(
+ serde_json::to_string::<ServerboundPacket>(&p).unwrap(),
+ ))
+ .await
+ .unwrap()
+ }
});
- (ptx, prx)
+
+ (out_tx, in_rx)
}