aboutsummaryrefslogtreecommitdiff
path: root/client-native-lib
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-09-14 20:11:23 +0200
committermetamuffin <metamuffin@disroot.org>2022-09-14 20:11:23 +0200
commitc752fe962df841b0cb811b09f155568735e7380c (patch)
tree1ee5ea82899a517a3ad841072e7b094836613e9e /client-native-lib
parenta8de400e17bd3eb7892cac5a0bef02b35482e946 (diff)
downloadkeks-meet-c752fe962df841b0cb811b09f155568735e7380c.tar
keks-meet-c752fe962df841b0cb811b09f155568735e7380c.tar.bz2
keks-meet-c752fe962df841b0cb811b09f155568735e7380c.tar.zst
rename crate
Diffstat (limited to 'client-native-lib')
-rw-r--r--client-native-lib/Cargo.toml27
-rw-r--r--client-native-lib/src/crypto.rs39
-rw-r--r--client-native-lib/src/main.rs85
-rw-r--r--client-native-lib/src/peer.rs197
-rw-r--r--client-native-lib/src/protocol.rs43
-rw-r--r--client-native-lib/src/signaling.rs59
-rw-r--r--client-native-lib/src/state.rs77
7 files changed, 527 insertions, 0 deletions
diff --git a/client-native-lib/Cargo.toml b/client-native-lib/Cargo.toml
new file mode 100644
index 0000000..0fa740b
--- /dev/null
+++ b/client-native-lib/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "client-native-lib"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+tokio = { version = "1.21", features = ["full"] }
+futures-util = "0.3.24"
+
+webrtc = "0.5.0"
+tokio-tungstenite = { version = "*", features = ["rustls-tls"] }
+url = "2.3.1"
+
+serde = { version = "1.0.144", features = ["derive"] }
+serde_json = "*"
+
+clap = { version = "3.2.21", features = ["derive"] }
+env_logger = "0.8"
+log = "0.4"
+
+fastpbkdf2 = "0.1.0"
+aes-gcm = "0.10.1"
+sha256 = "1.0.3"
+rand = "0.8.5"
+rand_chacha = "0.3.1"
+base64 = "0.13.0"
+bytes = "1.2.1"
diff --git a/client-native-lib/src/crypto.rs b/client-native-lib/src/crypto.rs
new file mode 100644
index 0000000..9bd8908
--- /dev/null
+++ b/client-native-lib/src/crypto.rs
@@ -0,0 +1,39 @@
+use aes_gcm::{
+ aead::{generic_array::sequence::GenericSequence, Aead},
+ Aes256Gcm, KeyInit, Nonce,
+};
+use log::info;
+
+pub struct Key(Aes256Gcm);
+
+impl Key {
+ pub fn derive(secret: &str) -> Self {
+ info!("running key generation... this might take someā„¢ time");
+ let salt = base64::decode("thisisagoodsaltAAAAAAA==").unwrap();
+ let mut key = [0u8; 32];
+ fastpbkdf2::pbkdf2_hmac_sha256(secret.as_bytes(), salt.as_slice(), 250000, &mut key);
+
+ let key = Aes256Gcm::new_from_slice(key.as_slice()).unwrap();
+
+ info!("done");
+ Self(key)
+ }
+ pub fn encrypt(&self, s: &str) -> String {
+ let iv = Nonce::generate(|_| rand::random()); // TODO check if this is secure randomness
+ let ciphertext = self.0.encrypt(&iv, s.as_bytes()).unwrap();
+ let mut packet = iv.to_vec(); // TODO this could be doing less allocations
+ packet.extend(ciphertext);
+ base64::encode(packet)
+ }
+ pub fn decrypt(&self, s: &str) -> String {
+ let r = base64::decode(s).unwrap();
+ let iv = &r[0..12];
+ let ciphertext = &r[12..];
+ let plaintext = self.0.decrypt(Nonce::from_slice(iv), ciphertext).unwrap();
+ String::from_utf8(plaintext).unwrap()
+ }
+}
+
+pub fn hash(secret: &str) -> String {
+ sha256::digest(format!("also-a-very-good-salt{}", secret))
+}
diff --git a/client-native-lib/src/main.rs b/client-native-lib/src/main.rs
new file mode 100644
index 0000000..f2b79e2
--- /dev/null
+++ b/client-native-lib/src/main.rs
@@ -0,0 +1,85 @@
+#![feature(async_closure)]
+#![feature(box_syntax)]
+
+use clap::{Parser, Subcommand};
+use log::{debug, error};
+use signaling::signaling_connect;
+use state::State;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use webrtc::{
+ api::{
+ interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
+ },
+ interceptor::registry::Registry,
+};
+
+pub mod crypto;
+pub mod peer;
+pub mod protocol;
+pub mod signaling;
+pub mod state;
+
+fn main() {
+ env_logger::init_from_env("LOG");
+ tokio::runtime::Builder::new_multi_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(run())
+}
+
+#[derive(Parser)]
+pub struct Args {
+ #[clap(long, default_value = "meet.metamuffin.org")]
+ signaling_host: String,
+ #[clap(short, long)]
+ secret: String,
+ #[clap(subcommand)]
+ action: Action,
+}
+#[derive(Subcommand)]
+pub enum Action {
+ Send {},
+ Receive {},
+}
+
+async fn run() {
+ let args = Args::parse();
+ let (sender, mut recv) = signaling_connect(&args.signaling_host, &args.secret).await;
+
+ let key = crypto::Key::derive(&args.secret);
+
+ let mut media_engine = MediaEngine::default();
+ media_engine.register_default_codecs().unwrap();
+ let mut registry = Registry::new();
+ registry = register_default_interceptors(registry, &mut media_engine).unwrap();
+ let api = APIBuilder::new()
+ .with_media_engine(media_engine)
+ .with_interceptor_registry(registry)
+ .build();
+
+ let state = Arc::new(State {
+ peers: Default::default(),
+ key,
+ api,
+ my_id: RwLock::new(None),
+ sender,
+ args,
+ });
+
+ {
+ let state = state.clone();
+ tokio::spawn(async move {
+ debug!("receiving packets now");
+ while let Some(packet) = recv.recv().await {
+ debug!("{packet:?}");
+ let state = state.clone();
+ state.on_message(packet).await
+ }
+ });
+ }
+
+ tokio::signal::ctrl_c().await.unwrap();
+ error!("interrupt received, exiting");
+}
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
new file mode 100644
index 0000000..8402fc6
--- /dev/null
+++ b/client-native-lib/src/peer.rs
@@ -0,0 +1,197 @@
+use log::info;
+use std::sync::Arc;
+use webrtc::{
+ data_channel::data_channel_message::DataChannelMessage,
+ ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer},
+ peer_connection::{
+ configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
+ sdp::session_description::RTCSessionDescription, RTCPeerConnection,
+ },
+};
+
+use crate::{
+ protocol::{self, RTCSessionDescriptionInit, RelayMessage},
+ state::State,
+ Action,
+};
+
+pub struct Peer {
+ state: Arc<State>, // maybe use Weak later
+ peer_connection: RTCPeerConnection,
+ id: usize,
+}
+
+impl Peer {
+ pub async fn create(state: Arc<State>, id: usize) -> Arc<Self> {
+ info!("({id}) peer joined");
+ let config = RTCConfiguration {
+ ice_servers: vec![RTCIceServer {
+ urls: vec!["stun:metamuffin.org:16900".to_owned()],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let peer_connection = state.api.new_peer_connection(config).await.unwrap();
+
+ let peer = Arc::new(Self {
+ peer_connection,
+ id,
+ state: state.clone(),
+ });
+ peer.peer_connection
+ .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
+ println!("conn state: {s}");
+ Box::pin(async {})
+ }))
+ .await;
+
+ {
+ let peer2 = peer.clone();
+ peer.peer_connection
+ .on_ice_candidate(box move |c| {
+ let peer = peer2.clone();
+ Box::pin(async move {
+ if let Some(c) = c {
+ peer.on_ice_candidate(c).await
+ }
+ })
+ })
+ .await;
+ }
+
+ {
+ let peer2 = peer.clone();
+ peer.peer_connection
+ .on_negotiation_needed(box move || {
+ let peer = peer2.clone();
+ Box::pin(async { peer.on_negotiation_needed().await })
+ })
+ .await;
+ }
+
+ {
+ peer.peer_connection
+ .on_data_channel(box move |dc| {
+ Box::pin(async move {
+ dc.on_message(box move |message| {
+ Box::pin(async move { println!("{:?}", message.data) })
+ })
+ .await
+ })
+ })
+ .await;
+ }
+
+ if let Action::Send { .. } = &peer.state.args.action {
+ peer.start_transfer().await
+ }
+
+ peer
+ }
+
+ pub async fn send_relay(&self, inner: RelayMessage) {
+ self.state.send_relay(self.id, inner).await
+ }
+
+ pub async fn start_transfer(&self) {
+ info!("starting data channel");
+ let data_channel = self
+ .peer_connection
+ .create_data_channel("file-transfer", None)
+ .await
+ .unwrap();
+
+ data_channel
+ .on_message(Box::new(move |msg: DataChannelMessage| {
+ let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
+ println!("message! '{}'", msg_str);
+ Box::pin(async {})
+ }))
+ .await;
+
+ {
+ let dc2 = data_channel.clone();
+ data_channel
+ .on_open(box move || {
+ let data_channel = dc2.clone();
+ Box::pin(async move {
+ loop {
+ data_channel
+ .send(&bytes::Bytes::from_static(b"test\n"))
+ .await
+ .unwrap();
+ }
+ })
+ })
+ .await;
+ }
+ }
+
+ pub async fn on_relay(&self, p: RelayMessage) {
+ match p {
+ protocol::RelayMessage::Offer(o) => self.on_offer(o).await,
+ protocol::RelayMessage::Answer(a) => self.on_answer(a).await,
+ protocol::RelayMessage::IceCandidate(c) => {
+ info!("received ICE candidate");
+ self.peer_connection.add_ice_candidate(c).await.unwrap();
+ }
+ }
+ }
+
+ pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) {
+ self.send_relay(RelayMessage::IceCandidate(
+ candidate.to_json().await.unwrap(),
+ ))
+ .await;
+ }
+
+ pub async fn on_negotiation_needed(self: Arc<Self>) {
+ info!("({}) negotiation needed", self.id);
+ self.offer().await
+ }
+
+ pub async fn offer(&self) {
+ info!("sending offer");
+ let offer = self.peer_connection.create_offer(None).await.unwrap();
+ self.peer_connection
+ .set_local_description(offer.clone())
+ .await
+ .unwrap();
+ self.send_relay(protocol::RelayMessage::Offer(RTCSessionDescriptionInit {
+ sdp: offer.sdp,
+ ty: offer.sdp_type,
+ }))
+ .await
+ }
+ pub async fn on_offer(&self, offer: RTCSessionDescriptionInit) {
+ info!("received offer");
+ let offer = RTCSessionDescription::offer(offer.sdp).unwrap();
+ self.peer_connection
+ .set_remote_description(offer)
+ .await
+ .unwrap();
+ self.answer().await
+ }
+ pub async fn answer(&self) {
+ info!("sending answer");
+ let offer = self.peer_connection.create_answer(None).await.unwrap();
+ self.peer_connection
+ .set_local_description(offer.clone())
+ .await
+ .unwrap();
+ self.send_relay(protocol::RelayMessage::Answer(RTCSessionDescriptionInit {
+ sdp: offer.sdp,
+ ty: offer.sdp_type,
+ }))
+ .await
+ }
+ pub async fn on_answer(&self, answer: RTCSessionDescriptionInit) {
+ info!("received answer");
+ let offer = RTCSessionDescription::answer(answer.sdp).unwrap();
+ self.peer_connection
+ .set_remote_description(offer)
+ .await
+ .unwrap();
+ }
+}
diff --git a/client-native-lib/src/protocol.rs b/client-native-lib/src/protocol.rs
new file mode 100644
index 0000000..431dc42
--- /dev/null
+++ b/client-native-lib/src/protocol.rs
@@ -0,0 +1,43 @@
+use serde::{Deserialize, Serialize};
+use webrtc::{
+ ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::sdp_type::RTCSdpType,
+};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum ClientboundPacket {
+ Init { your_id: usize, version: String },
+ ClientJoin { id: usize },
+ ClientLeave { id: usize },
+ Message { sender: usize, message: String },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum ServerboundPacket {
+ Ping,
+ Relay {
+ recipient: Option<usize>,
+ message: String,
+ },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RelayMessageWrapper {
+ pub sender: usize, // redundant, but ensures the server didnt cheat
+ pub inner: RelayMessage,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub enum RelayMessage {
+ Offer(RTCSessionDescriptionInit),
+ Answer(RTCSessionDescriptionInit),
+ IceCandidate(RTCIceCandidateInit),
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RTCSessionDescriptionInit {
+ #[serde(rename = "type")]
+ pub ty: RTCSdpType,
+ pub sdp: String,
+}
diff --git a/client-native-lib/src/signaling.rs b/client-native-lib/src/signaling.rs
new file mode 100644
index 0000000..2ac3edc
--- /dev/null
+++ b/client-native-lib/src/signaling.rs
@@ -0,0 +1,59 @@
+use crate::protocol::ClientboundPacket;
+use crate::{crypto::hash, protocol::ServerboundPacket};
+use futures_util::{SinkExt, StreamExt};
+use log::{debug, info};
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
+use tokio_tungstenite::tungstenite::{self, Message};
+
+pub async fn signaling_connect(
+ host: &str,
+ secret: &str,
+) -> (
+ UnboundedSender<ServerboundPacket>,
+ UnboundedReceiver<ClientboundPacket>,
+) {
+ let uri = format!("wss://{host}/signaling/{}", hash(secret));
+ info!("connecting to signaling server at {uri:?}");
+ let (conn, _) = tokio_tungstenite::connect_async(url::Url::parse(&uri).unwrap())
+ .await
+ .unwrap();
+ info!("connection established");
+
+ let (mut tx, rx) = conn.split();
+
+ let (in_tx, in_rx) = unbounded_channel();
+ let (out_tx, mut out_rx) = unbounded_channel();
+
+ tokio::spawn(async {
+ rx.for_each(move |mesg| {
+ info!("packet in");
+ let mesg = mesg.unwrap();
+ match mesg {
+ tungstenite::Message::Text(t) => {
+ let p: ClientboundPacket = serde_json::from_str(t.as_str()).unwrap();
+ debug!("<- {p:?}");
+ in_tx.send(p).unwrap()
+ }
+ tungstenite::Message::Close(_) => {
+ eprintln!("ws closed :(");
+ unreachable!();
+ }
+ _ => (),
+ }
+ Box::pin(async { () })
+ })
+ .await;
+ });
+ tokio::spawn(async move {
+ while let Some(p) = out_rx.recv().await {
+ debug!(" -> {p:?}");
+ tx.send(Message::Text(
+ serde_json::to_string::<ServerboundPacket>(&p).unwrap(),
+ ))
+ .await
+ .unwrap()
+ }
+ });
+
+ (out_tx, in_rx)
+}
diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs
new file mode 100644
index 0000000..baea90a
--- /dev/null
+++ b/client-native-lib/src/state.rs
@@ -0,0 +1,77 @@
+use std::{collections::HashMap, sync::Arc};
+
+use log::warn;
+use tokio::sync::{mpsc::UnboundedSender, RwLock};
+use webrtc::api::API;
+
+use crate::{
+ crypto::Key,
+ peer::Peer,
+ protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket},
+ Action, Args,
+};
+
+pub struct State {
+ pub args: Args,
+ pub api: API,
+ pub key: Key,
+ pub my_id: RwLock<Option<usize>>,
+ pub sender: UnboundedSender<ServerboundPacket>,
+ pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
+}
+impl State {
+ pub async fn my_id(&self) -> usize {
+ self.my_id.read().await.expect("not initialized yet")
+ }
+
+ pub async fn on_message(self: Arc<Self>, packet: ClientboundPacket) {
+ match packet {
+ protocol::ClientboundPacket::Init {
+ your_id,
+ version: _,
+ } => {
+ *self.my_id.write().await = Some(your_id);
+ }
+ protocol::ClientboundPacket::ClientJoin { id } => {
+ if id == self.my_id().await {
+ // we joined - YAY!
+ } else {
+ self.peers
+ .write()
+ .await
+ .insert(id, Peer::create(self.clone(), id).await);
+ if let Action::Send { .. } = &self.args.action {}
+ }
+ }
+ protocol::ClientboundPacket::ClientLeave { id: _ } => {}
+ protocol::ClientboundPacket::Message { sender, message } => {
+ let message = self.key.decrypt(&message);
+ let p = serde_json::from_str::<RelayMessageWrapper>(&message).unwrap();
+ self.on_relay(sender, p.inner).await;
+ }
+ }
+ }
+
+ pub async fn on_relay(&self, sender: usize, p: RelayMessage) {
+ if let Some(peer) = self.peers.read().await.get(&sender).cloned() {
+ peer.on_relay(p).await
+ } else {
+ warn!("got a packet from a non-existent peer")
+ }
+ }
+
+ pub async fn send_relay(&self, recipient: usize, inner: RelayMessage) {
+ self.sender
+ .send(ServerboundPacket::Relay {
+ recipient: Some(recipient),
+ message: self.key.encrypt(
+ &serde_json::to_string(&RelayMessageWrapper {
+ sender: self.my_id.read().await.expect("not ready to relay yet.."),
+ inner,
+ })
+ .unwrap(),
+ ),
+ })
+ .unwrap()
+ }
+}