summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client-native-lib/src/instance.rs15
-rw-r--r--client-native-lib/src/lib.rs23
-rw-r--r--client-native-lib/src/peer.rs41
-rw-r--r--client-native-rift/src/main.rs104
4 files changed, 141 insertions, 42 deletions
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs
index b3688c3..cd720f1 100644
--- a/client-native-lib/src/instance.rs
+++ b/client-native-lib/src/instance.rs
@@ -18,7 +18,7 @@ use tokio::sync::RwLock;
use webrtc::api::API;
pub struct Instance {
- pub event_handler: Box<dyn EventHandler>,
+ pub event_handler: Arc<dyn EventHandler>,
pub conn: SignalingConnection,
pub config: Config,
pub api: API,
@@ -28,7 +28,7 @@ pub struct Instance {
pub peers: RwLock<HashMap<usize, Arc<Peer>>>,
}
impl Instance {
- pub async fn new(config: Config, event_handler: Box<dyn EventHandler>) -> Arc<Self> {
+ pub async fn new(config: Config, event_handler: Arc<dyn EventHandler>) -> Arc<Self> {
let conn = signaling::SignalingConnection::new(&config.signaling_uri, &config.secret).await;
let key = crypto::Key::derive(&config.secret);
@@ -72,16 +72,13 @@ impl Instance {
username: self.config.username.clone(),
})
.await;
+ self.event_handler.peer_join(peer).await;
}
}
protocol::ClientboundPacket::ClientLeave { id } => {
- self.peers
- .write()
- .await
- .remove(&id)
- .unwrap()
- .on_leave()
- .await;
+ let peer = self.peers.write().await.remove(&id).unwrap();
+ peer.on_leave().await;
+ self.event_handler.peer_leave(peer).await;
}
protocol::ClientboundPacket::Message { sender, message } => {
let message = self.key.decrypt(&message);
diff --git a/client-native-lib/src/lib.rs b/client-native-lib/src/lib.rs
index 27c0595..7a0d0d0 100644
--- a/client-native-lib/src/lib.rs
+++ b/client-native-lib/src/lib.rs
@@ -10,9 +10,9 @@
use std::{pin::Pin, sync::Arc};
use futures_util::Future;
-use peer::Peer;
-use protocol::ProvideInfo;
use instance::Instance;
+use peer::{Peer, TransportChannel};
+use protocol::ProvideInfo;
use tokio::sync::RwLock;
use webrtc::{
api::{
@@ -22,10 +22,10 @@ use webrtc::{
};
pub mod crypto;
+pub mod instance;
pub mod peer;
pub mod protocol;
pub mod signaling;
-pub mod instance;
pub use webrtc;
@@ -46,17 +46,22 @@ pub(crate) fn build_api() -> webrtc::api::API {
.build()
}
+pub type DynFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
+
pub trait LocalResource: Send + Sync + 'static {
fn info(&self) -> ProvideInfo;
- fn on_request(&self, peer: Arc<Peer>) -> Box<dyn Future<Output = ()>>;
+ fn on_request(&self, peer: Arc<Peer>) -> DynFut<()>;
}
pub trait EventHandler: Send + Sync + 'static {
- fn remote_resource_added(
+ fn peer_join(&self, peer: Arc<Peer>) -> DynFut<()>;
+ fn peer_leave(&self, peer: Arc<Peer>) -> DynFut<()>;
+ fn resource_added(&self, peer: Arc<Peer>, info: ProvideInfo) -> DynFut<()>;
+ fn resource_removed(&self, peer: Arc<Peer>, id: String) -> DynFut<()>;
+ fn resource_connected(
&self,
peer: Arc<Peer>,
- info: ProvideInfo,
- ) -> Pin<Box<dyn Future<Output = ()>>>;
- fn remote_resource_removed(&self, peer: Arc<Peer>, id: String)
- -> Pin<Box<dyn Future<Output = ()>>>;
+ resource: &ProvideInfo,
+ channel: TransportChannel,
+ ) -> DynFut<()>;
}
diff --git a/client-native-lib/src/peer.rs b/client-native-lib/src/peer.rs
index b6ecf55..36b1754 100644
--- a/client-native-lib/src/peer.rs
+++ b/client-native-lib/src/peer.rs
@@ -7,11 +7,12 @@ use crate::{
instance::Instance,
protocol::{self, ProvideInfo, RelayMessage, Sdp},
};
-use log::info;
+use log::{info, warn};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use webrtc::{
data::data_channel::DataChannel,
+ data_channel::RTCDataChannel,
ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
ice_server::RTCIceServer,
@@ -41,10 +42,10 @@ pub struct Peer {
// Connected(Arc<TransportChannel>),
// AwaitDisconnect,
// }
-// pub enum TransportChannel {
-// Track(TrackRemote),
-// DataChannel(DataChannel),
-// }
+pub enum TransportChannel {
+ Track(Arc<TrackRemote>),
+ DataChannel(Arc<RTCDataChannel>),
+}
impl Peer {
pub async fn create(inst: Arc<Instance>, id: usize) -> Arc<Self> {
@@ -96,14 +97,30 @@ impl Peer {
}
{
+ let weak = Arc::<Peer>::downgrade(&peer);
peer.peer_connection
.on_data_channel(box move |dc| {
- info!("got a data channel");
+ let peer = weak.upgrade().unwrap();
Box::pin(async move {
- dc.on_message(box move |message| {
- Box::pin(async move { println!("{:?}", message.data) })
- })
- .await
+ if let Some(res) = peer
+ .resources_provided
+ .read()
+ .await
+ .get(&dc.label().to_string())
+ {
+ info!("data channel for ({:?}) '{:?}'", res.id, res.label);
+ peer.inst
+ .event_handler
+ .resource_connected(
+ peer.clone(),
+ res,
+ TransportChannel::DataChannel(dc),
+ )
+ .await;
+ } else {
+ warn!("got unassociated data channel; closed connection");
+ dc.close().await;
+ }
})
})
.await;
@@ -138,7 +155,7 @@ impl Peer {
.insert(info.id.clone(), info.clone());
self.inst
.event_handler
- .remote_resource_added(self.clone(), info)
+ .resource_added(self.clone(), info)
.await;
}
RelayMessage::ProvideStop { id } => {
@@ -146,7 +163,7 @@ impl Peer {
self.resources_provided.write().await.remove(&id);
self.inst
.event_handler
- .remote_resource_removed(self.clone(), id)
+ .resource_removed(self.clone(), id)
.await;
}
_ => (),
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 613a4e6..8ccb7d8 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -8,9 +8,13 @@
use bytes::Bytes;
use clap::{Parser, Subcommand};
use client_native_lib::{
- instance::Instance, peer::Peer, webrtc::data_channel::RTCDataChannel, Config, EventHandler,
+ instance::Instance,
+ peer::{Peer, TransportChannel},
+ protocol::ProvideInfo,
+ webrtc::data_channel::RTCDataChannel,
+ Config, DynFut, EventHandler, LocalResource,
};
-use log::{error, info};
+use log::{error, info, warn};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
fs::File,
@@ -30,7 +34,7 @@ fn main() {
.block_on(run())
}
-#[derive(Parser)]
+#[derive(Parser, Clone)]
pub struct Args {
/// keks-meet server used for establishing p2p connection
#[clap(long, default_value = "wss://meet.metamuffin.org")]
@@ -54,7 +58,9 @@ async fn run() {
signaling_uri: args.signaling_uri.clone(),
username: args.username.clone(),
},
- Box::new(Handler {}),
+ Arc::new(Handler {
+ args: Arc::new(args.clone()),
+ }),
)
.await;
@@ -64,30 +70,91 @@ async fn run() {
error!("interrupt received, exiting");
}
-struct Handler {}
+#[derive(Clone)]
+struct Handler {
+ args: Arc<Args>,
+}
impl EventHandler for Handler {
- fn remote_resource_added(
+ fn peer_join(&self, peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
+ Box::pin(async move {})
+ }
+
+ fn peer_leave(&self, peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
+ Box::pin(async move {})
+ }
+ fn resource_added(
&self,
peer: Arc<Peer>,
info: client_native_lib::protocol::ProvideInfo,
- ) -> Pin<Box<dyn Future<Output = ()>>> {
+ ) -> DynFut<()> {
let id = info.id.clone();
Box::pin(async move {
peer.request_resource(id).await;
})
}
+ fn resource_removed(&self, peer: Arc<Peer>, id: String) -> DynFut<()> {
+ Box::pin(async {})
+ }
- fn remote_resource_removed(
+ fn resource_connected(
&self,
peer: Arc<Peer>,
- id: String,
- ) -> Pin<Box<dyn Future<Output = ()>>> {
- Box::pin(async {})
+ resource: &ProvideInfo,
+ channel: TransportChannel,
+ ) -> client_native_lib::DynFut<()> {
+ let resource = resource.clone();
+ let s = self.clone();
+ Box::pin(async move {
+ match channel {
+ TransportChannel::Track(_) => warn!("wrong type"),
+ TransportChannel::DataChannel(dc) => {
+ if resource.kind != "file" {
+ return error!("we got a non-file resource for some reason…");
+ }
+ let writer = Arc::new(RwLock::new(None));
+ {
+ let writer = writer.clone();
+ dc.on_open(box move || {
+ let s = s.clone();
+ let writer = writer.clone();
+ Box::pin(async move {
+ info!("channel opened");
+ *writer.write().await = Some(s.args.action.create_writer().await)
+ })
+ })
+ .await;
+ }
+ {
+ let writer = writer.clone();
+ dc.on_close(box move || {
+ let writer = writer.clone();
+ Box::pin(async move {
+ info!("channel closed");
+ *writer.write().await = None;
+ })
+ })
+ .await;
+ }
+ dc.on_message(box move |mesg| {
+ Box::pin(async move {
+ info!("{:?} bytes of data", mesg.data.len());
+ })
+ })
+ .await;
+ dc.on_error(box move |err| {
+ Box::pin(async move {
+ error!("data channel errored: {err}");
+ })
+ })
+ .await;
+ }
+ }
+ })
}
}
-#[derive(Subcommand)]
+#[derive(Subcommand, Clone)]
pub enum Action {
/// Send a file
Send { filename: Option<String> },
@@ -122,6 +189,19 @@ impl Action {
}
}
+struct FileSender {
+ info: ProvideInfo,
+}
+impl LocalResource for FileSender {
+ fn info(&self) -> client_native_lib::protocol::ProvideInfo {
+ self.info.clone()
+ }
+
+ fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
+ Box::pin(async move {})
+ }
+}
+
// pub struct Conn {
// pub args: Arc<Args>,
// }