summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client-native-export-track/Cargo.toml8
-rw-r--r--client-native-gui/Cargo.toml14
-rw-r--r--client-native-lib/Cargo.toml8
-rw-r--r--client-native-rift/Cargo.toml6
-rw-r--r--client-native-rift/src/file.rs112
-rw-r--r--client-native-rift/src/main.rs244
-rw-r--r--client-native-rift/src/port.rs66
7 files changed, 283 insertions, 175 deletions
diff --git a/client-native-export-track/Cargo.toml b/client-native-export-track/Cargo.toml
index 1eccbdb..6f9e9cd 100644
--- a/client-native-export-track/Cargo.toml
+++ b/client-native-export-track/Cargo.toml
@@ -6,10 +6,10 @@ edition = "2021"
[dependencies]
libkeks = { path = "../client-native-lib" }
-clap = { version = "4.4.18", features = ["derive"] }
-env_logger = "0.11.1"
+clap = { version = "4.5.3", features = ["derive"] }
+env_logger = "0.11.3"
log = "0.4"
-tokio = { version = "1.35", features = ["full"] }
+tokio = { version = "1.36", features = ["full"] }
bytes = "1.5.0"
-anyhow = "1.0.79"
+anyhow = "1.0.81"
diff --git a/client-native-gui/Cargo.toml b/client-native-gui/Cargo.toml
index 573f69a..ae9208f 100644
--- a/client-native-gui/Cargo.toml
+++ b/client-native-gui/Cargo.toml
@@ -6,15 +6,15 @@ edition = "2021"
[dependencies]
libkeks = { path = "../client-native-lib" }
-clap = { version = "4.4.18", features = ["derive"] }
+clap = { version = "4.5.3", features = ["derive"] }
async-std = "1.12.0"
-tokio = { version = "1.35.1", features = ["full"] }
-env_logger = "0.11.1"
+tokio = { version = "1.36.0", features = ["full"] }
+env_logger = "0.11.3"
log = "0.4"
-anyhow = "1.0.79"
-crossbeam-channel = "0.5.11"
+anyhow = "1.0.81"
+crossbeam-channel = "0.5.12"
-egui = "0.25.0"
-eframe = "0.25.0"
+egui = "0.26.2"
+eframe = "0.26.2"
libmpv = { git = "https://github.com/sirno/libmpv-rs.git", branch = "upgrade-libmpv" }
diff --git a/client-native-lib/Cargo.toml b/client-native-lib/Cargo.toml
index 70ad7b8..ea36db9 100644
--- a/client-native-lib/Cargo.toml
+++ b/client-native-lib/Cargo.toml
@@ -4,14 +4,14 @@ version = "0.2.3"
edition = "2021"
[dependencies]
-tokio = { version = "1.35", features = ["full"] }
+tokio = { version = "1.36", features = ["full"] }
futures-util = "0.3.30"
-webrtc = "0.9.0"
+webrtc = "0.10.1"
tokio-tungstenite = { version = "*", features = ["rustls-tls"] }
url = "2.5.0"
-serde = { version = "1.0.196", features = ["derive"] }
+serde = { version = "1.0.197", features = ["derive"] }
serde_json = "*"
log = "0.4"
@@ -22,5 +22,5 @@ hex = "0.4.3"
sha2 = "0.10.8"
rand = "0.8.5"
rand_chacha = "0.3.1"
-base64 = "0.21.7"
+base64 = "0.22.0"
bytes = "1.5.0"
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml
index d9a7915..9f79cd8 100644
--- a/client-native-rift/Cargo.toml
+++ b/client-native-rift/Cargo.toml
@@ -6,14 +6,14 @@ edition = "2021"
[dependencies]
libkeks = { path = "../client-native-lib" }
-clap = { version = "4.4.18", features = ["derive"] }
+clap = { version = "4.5.3", features = ["derive"] }
pretty_env_logger = "0.5.0"
log = "0.4"
-tokio = { version = "1.35", features = ["full"] }
+tokio = { version = "1.36", features = ["full"] }
bytes = "1.5.0"
-indicatif = "0.17.7"
+indicatif = "0.17.8"
humansize = "2.1.3"
users = "0.11.0"
anyhow = "1.0.81"
diff --git a/client-native-rift/src/file.rs b/client-native-rift/src/file.rs
index bfe32fd..93dde62 100644
--- a/client-native-rift/src/file.rs
+++ b/client-native-rift/src/file.rs
@@ -1,8 +1,12 @@
+use crate::RequestHandler;
use bytes::Bytes;
use humansize::DECIMAL;
-use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
+use libkeks::{
+ peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource,
+};
use log::{debug, error, info};
use std::{
+ future::Future,
path::PathBuf,
pin::Pin,
sync::{
@@ -11,8 +15,8 @@ use std::{
},
};
use tokio::{
- fs::File,
- io::{AsyncRead, AsyncReadExt},
+ fs::{File, OpenOptions},
+ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
@@ -107,3 +111,105 @@ impl LocalResource for FileSender {
})
}
}
+
+pub struct DownloadHandler {
+ pub path: Option<PathBuf>,
+}
+impl RequestHandler for DownloadHandler {
+ fn on_connect(
+ &self,
+ resource: ProvideInfo,
+ channel: Arc<RTCDataChannel>,
+ ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>> {
+ let path = self.path.clone().unwrap_or_else(|| {
+ resource
+ .label
+ .clone()
+ .unwrap_or("download".to_owned())
+ .replace("/", "_")
+ .replace("..", "_")
+ .into()
+ });
+ if path.exists() {}
+ Box::pin(async move {
+ let pos = Arc::new(AtomicUsize::new(0));
+ let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
+ Arc::new(RwLock::new(None));
+ {
+ let writer = writer.clone();
+ let path = path.clone();
+ let channel2 = channel.clone();
+ channel.on_open(Box::new(move || {
+ let path = path.clone();
+ let writer = writer.clone();
+ Box::pin(async move {
+ info!("channel opened");
+ match OpenOptions::new()
+ .write(true)
+ .read(false)
+ .create_new(true)
+ .open(path)
+ .await
+ {
+ Ok(file) => {
+ *writer.write().await = Some(Box::pin(file));
+ }
+ Err(e) => {
+ error!("cannot write download: {e}");
+ channel2.close().await.unwrap();
+ }
+ }
+ })
+ }));
+ }
+ {
+ let writer = writer.clone();
+ channel.on_close(Box::new(move || {
+ let writer = writer.clone();
+ Box::pin(async move {
+ info!("channel closed");
+ *writer.write().await = None;
+ })
+ }));
+ }
+ {
+ let writer = writer.clone();
+ channel.on_message(Box::new(move |mesg| {
+ let writer = writer.clone();
+ let pos = pos.clone();
+ Box::pin(async move {
+ // TODO
+ if mesg.is_string {
+ let s = String::from_utf8((&mesg.data).to_vec()).unwrap();
+ if &s == "end" {
+ info!("transfer complete")
+ }
+ } else {
+ let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed);
+ info!(
+ "recv {:?} ({} of {})",
+ mesg.data.len(),
+ humansize::format_size(pos, DECIMAL),
+ humansize::format_size(resource.size.unwrap_or(0), DECIMAL),
+ );
+ writer
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .write_all(&mesg.data)
+ .await
+ .unwrap();
+ }
+ })
+ }))
+ }
+ channel.on_error(Box::new(move |err| {
+ Box::pin(async move {
+ error!("data channel errored: {err}");
+ })
+ }));
+ Ok(())
+ })
+ }
+}
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 57a4947..c44f8c2 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -8,18 +8,22 @@ pub mod file;
pub mod port;
use clap::{ColorChoice, Parser};
-use file::FileSender;
+use file::{DownloadHandler, FileSender};
use libkeks::{
instance::Instance,
peer::{Peer, TransportChannel},
protocol::ProvideInfo,
+ webrtc::data_channel::RTCDataChannel,
Config, DynFut, EventHandler,
};
-use log::{info, warn};
-use port::PortExposer;
+use log::{error, info, warn};
+use port::{ForwardHandler, PortExposer};
use rustyline::{error::ReadlineError, DefaultEditor};
-use std::{collections::HashMap, os::unix::prelude::MetadataExt, path::PathBuf, sync::Arc};
-use tokio::{fs, sync::RwLock};
+use std::{
+ collections::HashMap, future::Future, os::unix::prelude::MetadataExt, path::PathBuf, pin::Pin,
+ sync::Arc,
+};
+use tokio::{fs, net::TcpListener, sync::RwLock};
use users::get_current_username;
fn main() {
@@ -67,6 +71,22 @@ pub enum Command {
Forward { id: String, port: Option<u16> },
}
+struct State {
+ requested: HashMap<String, Box<dyn RequestHandler>>,
+}
+pub trait RequestHandler: Send + Sync + 'static {
+ fn on_connect(
+ &self,
+ resource: ProvideInfo,
+ channel: Arc<RTCDataChannel>,
+ ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>>;
+}
+
+#[derive(Clone)]
+struct Handler {
+ state: Arc<RwLock<State>>,
+}
+
fn get_username() -> String {
get_current_username()
.map(|u| u.to_str().unwrap().to_string())
@@ -141,6 +161,11 @@ async fn run() -> anyhow::Result<()> {
for (rid, r) in p.remote_provided.read().await.iter() {
if rid == &id {
if r.kind == "file" {
+ state
+ .write()
+ .await
+ .requested
+ .insert(id.clone(), Box::new(DownloadHandler { path }));
p.request_resource(id).await;
} else {
warn!("not a file");
@@ -163,10 +188,52 @@ async fn run() -> anyhow::Result<()> {
}))
.await;
}
- Command::Forward { id, port } => {}
+ Command::Forward { id, port } => {
+ let peers = inst.peers.read().await;
+ 'outer: for peer in peers.values() {
+ for (rid, r) in peer.remote_provided.read().await.iter() {
+ if rid == &id {
+ if r.kind == "port" {
+ let peer = peer.to_owned();
+ let state = state.clone();
+ tokio::task::spawn(async move {
+ let Ok(listener) =
+ TcpListener::bind(("127.0.0.1", port.unwrap_or(0)))
+ .await
+ else {
+ error!("cannot bind tcp listener");
+ return;
+ };
+ info!(
+ "tcp listener bound to {}",
+ listener.local_addr().unwrap()
+ );
+ while let Ok((stream, addr)) = listener.accept().await {
+ info!("new connection from {addr:?}");
+ state.write().await.requested.insert(
+ id.clone(),
+ Box::new(ForwardHandler {
+ stream: Arc::new(RwLock::new(Some(stream))),
+ }),
+ );
+ peer.request_resource(id.clone()).await;
+ }
+ });
+ } else {
+ warn!("not a port");
+ }
+ break 'outer;
+ }
+ }
+ }
+ }
},
Err(err) => err.print().unwrap(),
},
+ Err(ReadlineError::Eof) => {
+ info!("exit");
+ break;
+ }
Err(ReadlineError::Interrupted) => {
info!("interrupted; exiting...");
break;
@@ -175,64 +242,25 @@ async fn run() -> anyhow::Result<()> {
}
}
- // match &args.action {
- // Action::Send { filename } => {
- // inst.add_local_resource(Box::new(FileSender {
- // info: ProvideInfo {
- // id: "the-file".to_string(), // we only share a single file so its fine
- // kind: "file".to_string(),
- // track_kind: None,
- // label: Some(filename.clone().unwrap_or("stdin".to_string())),
- // size: if let Some(filename) = &filename {
- // Some(fs::metadata(filename).await.unwrap().size() as usize)
- // } else {
- // None
- // },
- // },
- // reader_factory: args.action,
- // }))
- // .await;
- // }
- // _ => (),
- // }
Ok(())
}
-struct State {
- requested: HashMap<String, Box<dyn RequestHandler>>,
-}
-pub trait RequestHandler: Send + Sync + 'static {
-
-}
-
-#[derive(Clone)]
-struct Handler {
- state: Arc<RwLock<State>>,
-}
-
impl EventHandler for Handler {
fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> {
Box::pin(async move {})
}
-
fn peer_leave(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> {
Box::pin(async move {})
}
- fn resource_added(&self, peer: Arc<Peer>, info: libkeks::protocol::ProvideInfo) -> DynFut<()> {
- let id = info.id.clone();
- Box::pin(async move {
- // match &args.action {
- // Action::Receive { .. } => {
- // if info.kind == "file" {
- // peer.request_resource(id).await;
- // }
- // }
- // _ => (),
- // }
- })
+ fn resource_added(
+ &self,
+ _peer: Arc<Peer>,
+ _info: libkeks::protocol::ProvideInfo,
+ ) -> DynFut<()> {
+ Box::pin(async move {})
}
fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> {
- Box::pin(async {})
+ Box::pin(async move {})
}
fn resource_connected(
@@ -242,106 +270,20 @@ impl EventHandler for Handler {
channel: TransportChannel,
) -> libkeks::DynFut<()> {
let resource = resource.clone();
- let s = self.clone();
+ let k = self.clone();
Box::pin(async move {
- // match channel {
- // TransportChannel::Track(_) => warn!("wrong type"),
- // TransportChannel::DataChannel(dc) => {
- // if resource.kind != "file" {
- // return error!("we got a non-file resource for some reason…");
- // }
- // let pos = Arc::new(AtomicUsize::new(0));
- // let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
- // Arc::new(RwLock::new(None));
- // {
- // let writer = writer.clone();
- // let s = s.clone();
- // dc.on_open(Box::new(move || {
- // let s = s.clone();
- // let writer = writer.clone();
- // Box::pin(async move {
- // info!("channel opened");
- // *writer.write().await = Some(s.args.action.create_writer().await)
- // })
- // }));
- // }
- // {
- // let writer = writer.clone();
- // dc.on_close(Box::new(move || {
- // let writer = writer.clone();
- // Box::pin(async move {
- // info!("channel closed");
- // *writer.write().await = None;
- // exit(0);
- // })
- // }));
- // }
- // {
- // let writer = writer.clone();
- // dc.on_message(Box::new(move |mesg| {
- // let writer = writer.clone();
- // let pos = pos.clone();
- // Box::pin(async move {
- // // TODO
- // if mesg.is_string {
- // let s = String::from_utf8((&mesg.data).to_vec()).unwrap();
- // if &s == "end" {
- // info!("EOF reached")
- // }
- // } else {
- // let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed);
- // info!(
- // "recv {:?} ({} of {})",
- // mesg.data.len(),
- // humansize::format_size(pos, DECIMAL),
- // humansize::format_size(resource.size.unwrap_or(0), DECIMAL),
- // );
- // writer
- // .write()
- // .await
- // .as_mut()
- // .unwrap()
- // .write_all(&mesg.data)
- // .await
- // .unwrap();
- // }
- // })
- // }))
- // }
- // dc.on_error(Box::new(move |err| {
- // Box::pin(async move {
- // error!("data channel errored: {err}");
- // })
- // }));
- // }
- // }
+ if let Some(handler) = k.state.write().await.requested.get(&resource.id) {
+ match channel {
+ TransportChannel::Track(_) => warn!("wrong type"),
+ TransportChannel::DataChannel(channel) => {
+ if let Err(e) = handler.on_connect(resource, channel).await {
+ warn!("request handler error: {e}");
+ }
+ }
+ }
+ } else {
+ warn!("got {:?}, which was not requested", resource.id);
+ }
})
}
}
-
-// impl Action {
-// pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> {
-// match self {
-// Action::Receive { filename } => {
-// if let Some(filename) = filename {
-// Box::pin(File::create(filename).await.unwrap())
-// } else {
-// Box::pin(stdout())
-// }
-// }
-// _ => unreachable!(),
-// }
-// }
-// pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
-// match self {
-// Action::Send { filename } => {
-// if let Some(filename) = filename {
-// Box::pin(File::open(filename).await.unwrap())
-// } else {
-// Box::pin(stdin())
-// }
-// }
-// _ => unreachable!(),
-// }
-// }
-// }
diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs
index ae56f2c..857623e 100644
--- a/client-native-rift/src/port.rs
+++ b/client-native-rift/src/port.rs
@@ -1,9 +1,12 @@
+use crate::RequestHandler;
use bytes::Bytes;
-use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
+use libkeks::{
+ peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource,
+};
use log::{debug, error, info, warn};
-use std::{pin::Pin, sync::Arc};
+use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
- io::{AsyncRead, AsyncReadExt},
+ io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::RwLock,
};
@@ -97,3 +100,60 @@ impl LocalResource for PortExposer {
})
}
}
+
+pub struct ForwardHandler {
+ pub stream: Arc<RwLock<Option<TcpStream>>>,
+}
+impl RequestHandler for ForwardHandler {
+ fn on_connect(
+ &self,
+ _resource: ProvideInfo,
+ channel: Arc<RTCDataChannel>,
+ ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>> {
+ let stream = self.stream.clone();
+ Box::pin(async move {
+ let stream = stream.write().await.take().unwrap();
+ let (mut read, write) = stream.into_split();
+ let write = Arc::new(RwLock::new(write));
+
+ let channel2 = channel.clone();
+ channel.on_open(Box::new(move || {
+ Box::pin(async move {
+ info!("channel open");
+ let channel = channel2.clone();
+ tokio::task::spawn(async move {
+ let mut buf = [0u8; 1 << 15];
+ loop {
+ let Ok(size) = read.read(&mut buf).await else {
+ break;
+ };
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ });
+ })
+ }));
+ channel.on_close(Box::new(move || {
+ Box::pin(async move {
+ info!("channel closed");
+ })
+ }));
+ channel.on_error(Box::new(move |err| {
+ Box::pin(async move { error!("channel error: {err}") })
+ }));
+ {
+ let write = write.clone();
+ channel.on_message(Box::new(move |message| {
+ let write = write.clone();
+ Box::pin(async move {
+ write.write().await.write_all(&message.data).await.unwrap();
+ })
+ }));
+ }
+
+ Ok(())
+ })
+ }
+}