aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client-native-lib/src/lib.rs39
-rw-r--r--client-native-lib/src/peer.rs53
-rw-r--r--client-native-lib/src/protocol.rs10
-rw-r--r--client-native-lib/src/state.rs61
-rw-r--r--client-native-rift/src/main.rs41
-rw-r--r--readme.md1
6 files changed, 161 insertions, 44 deletions
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index 6ffbee3..bb88b9f 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -7,6 +7,11 @@
#![feature(box_syntax)]
#![feature(async_fn_in_trait)]
+use std::{pin::Pin, sync::Arc};
+
+use futures_util::Future;
+use peer::Peer;
+use protocol::ProvideInfo;
use state::State;
use tokio::sync::RwLock;
use webrtc::{
@@ -27,25 +32,10 @@ pub use webrtc;
pub struct Config {
pub signaling_uri: String,
pub secret: String,
+ pub username: String,
}
-impl State {
- pub async fn new(config: Config) -> Self {
- let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await;
- let key = crypto::Key::derive(&config.secret);
-
- Self {
- api: build_api(),
- my_id: RwLock::new(None),
- peers: Default::default(),
- config,
- conn,
- key,
- }
- }
-}
-
-fn build_api() -> webrtc::api::API {
+pub(crate) fn build_api() -> webrtc::api::API {
let mut media_engine = MediaEngine::default();
media_engine.register_default_codecs().unwrap();
let mut registry = Registry::new();
@@ -55,3 +45,18 @@ fn build_api() -> webrtc::api::API {
.with_interceptor_registry(registry)
.build()
}
+
+pub trait LocalResource: Send + Sync + 'static {
+ fn info(&self) -> ProvideInfo;
+ fn on_request(&self, peer: Arc<Peer>) -> Box<dyn Future<Output = ()>>;
+}
+
+pub trait EventHandler: Send + Sync + 'static {
+ fn remote_resource_added(
+ &self,
+ peer: &Peer,
+ info: ProvideInfo,
+ ) -> Pin<Box<dyn Future<Output = ()>>>;
+ fn remote_resource_removed(&self, peer: &Peer, id: String)
+ -> Pin<Box<dyn Future<Output = ()>>>;
+}
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index d45d0e5..5e672e6 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -4,13 +4,17 @@
Copyright (C) 2022 metamuffin <metamuffin@disroot.org>
*/
use crate::{
- protocol::{self, RelayMessage, Sdp},
+ protocol::{self, ProvideInfo, RelayMessage, Sdp},
state::State,
};
use log::info;
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
use webrtc::{
- ice_transport::{ice_candidate::RTCIceCandidate, ice_server::RTCIceServer},
+ ice_transport::{
+ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
+ ice_server::RTCIceServer,
+ },
peer_connection::{
configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
sdp::session_description::RTCSessionDescription, RTCPeerConnection,
@@ -20,6 +24,7 @@ use webrtc::{
pub struct Peer {
pub state: Arc<State>,
pub peer_connection: RTCPeerConnection,
+ pub resources_provided: RwLock<HashMap<String, ProvideInfo>>,
pub id: usize,
}
@@ -36,6 +41,7 @@ impl Peer {
let peer_connection = state.api.new_peer_connection(config).await.unwrap();
let peer = Arc::new(Self {
+ resources_provided: Default::default(),
state: state.clone(),
peer_connection,
id,
@@ -92,22 +98,53 @@ impl Peer {
pub async fn on_relay(&self, p: RelayMessage) {
match p {
- protocol::RelayMessage::Offer(o) => self.on_offer(o).await,
- protocol::RelayMessage::Answer(a) => self.on_answer(a).await,
- protocol::RelayMessage::IceCandidate(c) => {
- info!("received ICE candidate");
- self.peer_connection.add_ice_candidate(c).await.unwrap();
+ RelayMessage::Offer(o) => self.on_offer(o).await,
+ RelayMessage::Answer(a) => self.on_answer(a).await,
+ RelayMessage::IceCandidate(c) => self.on_remote_ice_candidate(c).await,
+ RelayMessage::Provide(info) => {
+ info!(
+ "remote resource provided: ({:?}) {:?} {:?}",
+ info.id, info.kind, info.label
+ );
+ self.resources_provided
+ .write()
+ .await
+ .insert(info.id.clone(), info.clone());
+ self.state
+ .event_handler
+ .remote_resource_added(&self, info)
+ .await;
+ }
+ RelayMessage::ProvideStop { id } => {
+ info!("remote resource removed: ({:?}) ", id);
+ self.resources_provided.write().await.remove(&id);
+ self.state
+ .event_handler
+ .remote_resource_removed(&self, id)
+ .await;
}
_ => (),
}
}
+ pub async fn on_leave(&self) {
+ info!("({}) peer left", self.id);
+ }
+
pub async fn on_ice_candidate(&self, candidate: RTCIceCandidate) {
+ info!("publishing local ICE candidate");
self.send_relay(RelayMessage::IceCandidate(
candidate.to_json().await.unwrap(),
))
.await;
}
+ pub async fn on_remote_ice_candidate(&self, candidate: RTCIceCandidateInit) {
+ info!("adding remote ICE candidate");
+ self.peer_connection
+ .add_ice_candidate(candidate)
+ .await
+ .unwrap();
+ }
pub async fn on_negotiation_needed(self: Arc<Self>) {
info!("({}) negotiation needed", self.id);
diff --git a/client-native-lib/src/protocol.rs b/client-native-lib/src/protocol.rs
index 78a5a29..c361f6b 100644
--- a/client-native-lib/src/protocol.rs
+++ b/client-native-lib/src/protocol.rs
@@ -65,9 +65,9 @@ pub enum TrackKind {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProvideInfo {
- id: String,
- kind: String, // not an enum so we dont fail if we dont support it
- track_kind: Option<TrackKind>,
- label: Option<String>,
- size: Option<usize>,
+ pub id: String,
+ pub kind: String, // not an enum so we dont fail if we dont support it
+ pub track_kind: Option<TrackKind>,
+ pub label: Option<String>,
+ pub size: Option<usize>,
}
diff --git a/client-native-lib/src/state.rs b/client-native-lib/src/state.rs
index 841c876..d129e32 100644
--- a/client-native-lib/src/state.rs
+++ b/client-native-lib/src/state.rs
@@ -4,11 +4,12 @@
Copyright (C) 2022 metamuffin <metamuffin@disroot.org>
*/
use crate::{
- crypto::Key,
+ build_api,
+ crypto::{self, Key},
peer::Peer,
protocol::{self, ClientboundPacket, RelayMessage, RelayMessageWrapper, ServerboundPacket},
- signaling::SignalingConnection,
- Config,
+ signaling::{self, SignalingConnection},
+ Config, EventHandler, LocalResource,
};
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
@@ -17,14 +18,32 @@ use tokio::sync::RwLock;
use webrtc::api::API;
pub struct State {
+ pub event_handler: Box<dyn EventHandler>,
pub conn: SignalingConnection,
pub config: Config,
pub api: API,
pub key: Key,
- pub my_id: RwLock<Option<usize>>,
+ local_resources: RwLock<HashMap<String, Box<dyn LocalResource>>>,
+ my_id: RwLock<Option<usize>>,
pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
}
impl State {
+ pub async fn new(config: Config, event_handler: Box<dyn EventHandler>) -> Arc<Self> {
+ let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await;
+ let key = crypto::Key::derive(&config.secret);
+
+ Arc::new(Self {
+ event_handler,
+ api: build_api(),
+ my_id: RwLock::new(None),
+ peers: Default::default(),
+ local_resources: Default::default(),
+ config,
+ conn,
+ key,
+ })
+ }
+
pub async fn my_id(&self) -> usize {
self.my_id.read().await.expect("not initialized yet")
}
@@ -47,14 +66,22 @@ impl State {
if id == self.my_id().await {
// we joined - YAY!
} else {
- self.peers
- .write()
- .await
- .insert(id, Peer::create(self.clone(), id).await);
+ let peer = Peer::create(self.clone(), id).await;
+ self.peers.write().await.insert(id, peer.clone());
+ peer.send_relay(RelayMessage::Identify {
+ username: self.config.username.clone(),
+ })
+ .await;
}
}
protocol::ClientboundPacket::ClientLeave { id } => {
- self.peers.write().await.remove(&id);
+ self.peers
+ .write()
+ .await
+ .remove(&id)
+ .unwrap()
+ .on_leave()
+ .await;
}
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
@@ -90,4 +117,20 @@ impl State {
.await
.unwrap()
}
+
+ pub async fn add_local_resource(&self, res: Box<dyn LocalResource>) {
+ for (pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::Provide(res.info()));
+ }
+ self.local_resources
+ .write()
+ .await
+ .insert(res.info().id, res);
+ }
+ pub async fn remove_local_resource(&self, id: String) {
+ self.local_resources.write().await.remove(&id);
+ for (pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::ProvideStop { id: id.clone() });
+ }
+ }
}
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 0e458c4..9d97b8e 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -7,7 +7,9 @@
use bytes::Bytes;
use clap::{Parser, Subcommand};
-use client_native_lib::{connect, peer::Peer, webrtc::data_channel::RTCDataChannel, Config};
+use client_native_lib::{
+ peer::Peer, state::State, webrtc::data_channel::RTCDataChannel, Config, EventHandler,
+};
use log::{error, info};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
@@ -33,6 +35,9 @@ pub struct Args {
/// keks-meet server used for establishing p2p connection
#[clap(long, default_value = "wss://meet.metamuffin.org")]
signaling_uri: String,
+ /// username for the `identify` packet
+ #[clap(short, long, default_value = "guest")]
+ username: String,
/// pre-shared secret (aka. room name)
#[clap(short, long)]
secret: String,
@@ -43,16 +48,42 @@ pub struct Args {
async fn run() {
let args = Args::parse();
- connect(Config {
- secret: args.secret.clone(),
- signaling_uri: args.signaling_uri.clone(),
- })
+ let state = State::new(
+ Config {
+ secret: args.secret.clone(),
+ signaling_uri: args.signaling_uri.clone(),
+ username: args.username.clone(),
+ },
+ Box::new(Handler {}),
+ )
.await;
+ state.receive_loop().await;
+
tokio::signal::ctrl_c().await.unwrap();
error!("interrupt received, exiting");
}
+struct Handler {}
+
+impl EventHandler for Handler {
+ fn remote_resource_added(
+ &self,
+ peer: &Peer,
+ info: client_native_lib::protocol::ProvideInfo,
+ ) -> Pin<Box<dyn Future<Output = ()>>> {
+ todo!()
+ }
+
+ fn remote_resource_removed(
+ &self,
+ peer: &Peer,
+ id: String,
+ ) -> Pin<Box<dyn Future<Output = ()>>> {
+ todo!()
+ }
+}
+
#[derive(Subcommand)]
pub enum Action {
/// Send a file
diff --git a/readme.md b/readme.md
index 33b2cf7..3823e60 100644
--- a/readme.md
+++ b/readme.md
@@ -103,6 +103,7 @@ system works as follows:
- download files in a streaming manner.
- workaround using service worker
- service worker to implement manual updates
+- open chat links in a new tab
## Parameters