diff options
-rw-r--r-- | client-native-export-track/Cargo.toml | 8 | ||||
-rw-r--r-- | client-native-gui/Cargo.toml | 14 | ||||
-rw-r--r-- | client-native-lib/Cargo.toml | 8 | ||||
-rw-r--r-- | client-native-rift/Cargo.toml | 6 | ||||
-rw-r--r-- | client-native-rift/src/file.rs | 112 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 244 | ||||
-rw-r--r-- | client-native-rift/src/port.rs | 66 |
7 files changed, 283 insertions, 175 deletions
diff --git a/client-native-export-track/Cargo.toml b/client-native-export-track/Cargo.toml index 1eccbdb..6f9e9cd 100644 --- a/client-native-export-track/Cargo.toml +++ b/client-native-export-track/Cargo.toml @@ -6,10 +6,10 @@ edition = "2021" [dependencies] libkeks = { path = "../client-native-lib" } -clap = { version = "4.4.18", features = ["derive"] } -env_logger = "0.11.1" +clap = { version = "4.5.3", features = ["derive"] } +env_logger = "0.11.3" log = "0.4" -tokio = { version = "1.35", features = ["full"] } +tokio = { version = "1.36", features = ["full"] } bytes = "1.5.0" -anyhow = "1.0.79" +anyhow = "1.0.81" diff --git a/client-native-gui/Cargo.toml b/client-native-gui/Cargo.toml index 573f69a..ae9208f 100644 --- a/client-native-gui/Cargo.toml +++ b/client-native-gui/Cargo.toml @@ -6,15 +6,15 @@ edition = "2021" [dependencies] libkeks = { path = "../client-native-lib" } -clap = { version = "4.4.18", features = ["derive"] } +clap = { version = "4.5.3", features = ["derive"] } async-std = "1.12.0" -tokio = { version = "1.35.1", features = ["full"] } -env_logger = "0.11.1" +tokio = { version = "1.36.0", features = ["full"] } +env_logger = "0.11.3" log = "0.4" -anyhow = "1.0.79" -crossbeam-channel = "0.5.11" +anyhow = "1.0.81" +crossbeam-channel = "0.5.12" -egui = "0.25.0" -eframe = "0.25.0" +egui = "0.26.2" +eframe = "0.26.2" libmpv = { git = "https://github.com/sirno/libmpv-rs.git", branch = "upgrade-libmpv" } diff --git a/client-native-lib/Cargo.toml b/client-native-lib/Cargo.toml index 70ad7b8..ea36db9 100644 --- a/client-native-lib/Cargo.toml +++ b/client-native-lib/Cargo.toml @@ -4,14 +4,14 @@ version = "0.2.3" edition = "2021" [dependencies] -tokio = { version = "1.35", features = ["full"] } +tokio = { version = "1.36", features = ["full"] } futures-util = "0.3.30" -webrtc = "0.9.0" +webrtc = "0.10.1" tokio-tungstenite = { version = "*", features = ["rustls-tls"] } url = "2.5.0" -serde = { version = "1.0.196", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } serde_json = "*" log = "0.4" @@ -22,5 +22,5 @@ hex = "0.4.3" sha2 = "0.10.8" rand = "0.8.5" rand_chacha = "0.3.1" -base64 = "0.21.7" +base64 = "0.22.0" bytes = "1.5.0" diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml index d9a7915..9f79cd8 100644 --- a/client-native-rift/Cargo.toml +++ b/client-native-rift/Cargo.toml @@ -6,14 +6,14 @@ edition = "2021" [dependencies] libkeks = { path = "../client-native-lib" } -clap = { version = "4.4.18", features = ["derive"] } +clap = { version = "4.5.3", features = ["derive"] } pretty_env_logger = "0.5.0" log = "0.4" -tokio = { version = "1.35", features = ["full"] } +tokio = { version = "1.36", features = ["full"] } bytes = "1.5.0" -indicatif = "0.17.7" +indicatif = "0.17.8" humansize = "2.1.3" users = "0.11.0" anyhow = "1.0.81" diff --git a/client-native-rift/src/file.rs b/client-native-rift/src/file.rs index bfe32fd..93dde62 100644 --- a/client-native-rift/src/file.rs +++ b/client-native-rift/src/file.rs @@ -1,8 +1,12 @@ +use crate::RequestHandler; use bytes::Bytes; use humansize::DECIMAL; -use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource}; +use libkeks::{ + peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource, +}; use log::{debug, error, info}; use std::{ + future::Future, path::PathBuf, pin::Pin, sync::{ @@ -11,8 +15,8 @@ use std::{ }, }; use tokio::{ - fs::File, - io::{AsyncRead, AsyncReadExt}, + fs::{File, OpenOptions}, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::RwLock, }; @@ -107,3 +111,105 @@ impl LocalResource for FileSender { }) } } + +pub struct DownloadHandler { + pub path: Option<PathBuf>, +} +impl RequestHandler for DownloadHandler { + fn on_connect( + &self, + resource: ProvideInfo, + channel: Arc<RTCDataChannel>, + ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>> { + let path = self.path.clone().unwrap_or_else(|| { + resource + .label + .clone() + .unwrap_or("download".to_owned()) + .replace("/", "_") + .replace("..", "_") + .into() + }); + if path.exists() {} + Box::pin(async move { + let pos = Arc::new(AtomicUsize::new(0)); + let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = + Arc::new(RwLock::new(None)); + { + let writer = writer.clone(); + let path = path.clone(); + let channel2 = channel.clone(); + channel.on_open(Box::new(move || { + let path = path.clone(); + let writer = writer.clone(); + Box::pin(async move { + info!("channel opened"); + match OpenOptions::new() + .write(true) + .read(false) + .create_new(true) + .open(path) + .await + { + Ok(file) => { + *writer.write().await = Some(Box::pin(file)); + } + Err(e) => { + error!("cannot write download: {e}"); + channel2.close().await.unwrap(); + } + } + }) + })); + } + { + let writer = writer.clone(); + channel.on_close(Box::new(move || { + let writer = writer.clone(); + Box::pin(async move { + info!("channel closed"); + *writer.write().await = None; + }) + })); + } + { + let writer = writer.clone(); + channel.on_message(Box::new(move |mesg| { + let writer = writer.clone(); + let pos = pos.clone(); + Box::pin(async move { + // TODO + if mesg.is_string { + let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); + if &s == "end" { + info!("transfer complete") + } + } else { + let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); + info!( + "recv {:?} ({} of {})", + mesg.data.len(), + humansize::format_size(pos, DECIMAL), + humansize::format_size(resource.size.unwrap_or(0), DECIMAL), + ); + writer + .write() + .await + .as_mut() + .unwrap() + .write_all(&mesg.data) + .await + .unwrap(); + } + }) + })) + } + channel.on_error(Box::new(move |err| { + Box::pin(async move { + error!("data channel errored: {err}"); + }) + })); + Ok(()) + }) + } +} diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 57a4947..c44f8c2 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -8,18 +8,22 @@ pub mod file; pub mod port; use clap::{ColorChoice, Parser}; -use file::FileSender; +use file::{DownloadHandler, FileSender}; use libkeks::{ instance::Instance, peer::{Peer, TransportChannel}, protocol::ProvideInfo, + webrtc::data_channel::RTCDataChannel, Config, DynFut, EventHandler, }; -use log::{info, warn}; -use port::PortExposer; +use log::{error, info, warn}; +use port::{ForwardHandler, PortExposer}; use rustyline::{error::ReadlineError, DefaultEditor}; -use std::{collections::HashMap, os::unix::prelude::MetadataExt, path::PathBuf, sync::Arc}; -use tokio::{fs, sync::RwLock}; +use std::{ + collections::HashMap, future::Future, os::unix::prelude::MetadataExt, path::PathBuf, pin::Pin, + sync::Arc, +}; +use tokio::{fs, net::TcpListener, sync::RwLock}; use users::get_current_username; fn main() { @@ -67,6 +71,22 @@ pub enum Command { Forward { id: String, port: Option<u16> }, } +struct State { + requested: HashMap<String, Box<dyn RequestHandler>>, +} +pub trait RequestHandler: Send + Sync + 'static { + fn on_connect( + &self, + resource: ProvideInfo, + channel: Arc<RTCDataChannel>, + ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>>; +} + +#[derive(Clone)] +struct Handler { + state: Arc<RwLock<State>>, +} + fn get_username() -> String { get_current_username() .map(|u| u.to_str().unwrap().to_string()) @@ -141,6 +161,11 @@ async fn run() -> anyhow::Result<()> { for (rid, r) in p.remote_provided.read().await.iter() { if rid == &id { if r.kind == "file" { + state + .write() + .await + .requested + .insert(id.clone(), Box::new(DownloadHandler { path })); p.request_resource(id).await; } else { warn!("not a file"); @@ -163,10 +188,52 @@ async fn run() -> anyhow::Result<()> { })) .await; } - Command::Forward { id, port } => {} + Command::Forward { id, port } => { + let peers = inst.peers.read().await; + 'outer: for peer in peers.values() { + for (rid, r) in peer.remote_provided.read().await.iter() { + if rid == &id { + if r.kind == "port" { + let peer = peer.to_owned(); + let state = state.clone(); + tokio::task::spawn(async move { + let Ok(listener) = + TcpListener::bind(("127.0.0.1", port.unwrap_or(0))) + .await + else { + error!("cannot bind tcp listener"); + return; + }; + info!( + "tcp listener bound to {}", + listener.local_addr().unwrap() + ); + while let Ok((stream, addr)) = listener.accept().await { + info!("new connection from {addr:?}"); + state.write().await.requested.insert( + id.clone(), + Box::new(ForwardHandler { + stream: Arc::new(RwLock::new(Some(stream))), + }), + ); + peer.request_resource(id.clone()).await; + } + }); + } else { + warn!("not a port"); + } + break 'outer; + } + } + } + } }, Err(err) => err.print().unwrap(), }, + Err(ReadlineError::Eof) => { + info!("exit"); + break; + } Err(ReadlineError::Interrupted) => { info!("interrupted; exiting..."); break; @@ -175,64 +242,25 @@ async fn run() -> anyhow::Result<()> { } } - // match &args.action { - // Action::Send { filename } => { - // inst.add_local_resource(Box::new(FileSender { - // info: ProvideInfo { - // id: "the-file".to_string(), // we only share a single file so its fine - // kind: "file".to_string(), - // track_kind: None, - // label: Some(filename.clone().unwrap_or("stdin".to_string())), - // size: if let Some(filename) = &filename { - // Some(fs::metadata(filename).await.unwrap().size() as usize) - // } else { - // None - // }, - // }, - // reader_factory: args.action, - // })) - // .await; - // } - // _ => (), - // } Ok(()) } -struct State { - requested: HashMap<String, Box<dyn RequestHandler>>, -} -pub trait RequestHandler: Send + Sync + 'static { - -} - -#[derive(Clone)] -struct Handler { - state: Arc<RwLock<State>>, -} - impl EventHandler for Handler { fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn peer_leave(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn resource_added(&self, peer: Arc<Peer>, info: libkeks::protocol::ProvideInfo) -> DynFut<()> { - let id = info.id.clone(); - Box::pin(async move { - // match &args.action { - // Action::Receive { .. } => { - // if info.kind == "file" { - // peer.request_resource(id).await; - // } - // } - // _ => (), - // } - }) + fn resource_added( + &self, + _peer: Arc<Peer>, + _info: libkeks::protocol::ProvideInfo, + ) -> DynFut<()> { + Box::pin(async move {}) } fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> { - Box::pin(async {}) + Box::pin(async move {}) } fn resource_connected( @@ -242,106 +270,20 @@ impl EventHandler for Handler { channel: TransportChannel, ) -> libkeks::DynFut<()> { let resource = resource.clone(); - let s = self.clone(); + let k = self.clone(); Box::pin(async move { - // match channel { - // TransportChannel::Track(_) => warn!("wrong type"), - // TransportChannel::DataChannel(dc) => { - // if resource.kind != "file" { - // return error!("we got a non-file resource for some reason…"); - // } - // let pos = Arc::new(AtomicUsize::new(0)); - // let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = - // Arc::new(RwLock::new(None)); - // { - // let writer = writer.clone(); - // let s = s.clone(); - // dc.on_open(Box::new(move || { - // let s = s.clone(); - // let writer = writer.clone(); - // Box::pin(async move { - // info!("channel opened"); - // *writer.write().await = Some(s.args.action.create_writer().await) - // }) - // })); - // } - // { - // let writer = writer.clone(); - // dc.on_close(Box::new(move || { - // let writer = writer.clone(); - // Box::pin(async move { - // info!("channel closed"); - // *writer.write().await = None; - // exit(0); - // }) - // })); - // } - // { - // let writer = writer.clone(); - // dc.on_message(Box::new(move |mesg| { - // let writer = writer.clone(); - // let pos = pos.clone(); - // Box::pin(async move { - // // TODO - // if mesg.is_string { - // let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); - // if &s == "end" { - // info!("EOF reached") - // } - // } else { - // let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); - // info!( - // "recv {:?} ({} of {})", - // mesg.data.len(), - // humansize::format_size(pos, DECIMAL), - // humansize::format_size(resource.size.unwrap_or(0), DECIMAL), - // ); - // writer - // .write() - // .await - // .as_mut() - // .unwrap() - // .write_all(&mesg.data) - // .await - // .unwrap(); - // } - // }) - // })) - // } - // dc.on_error(Box::new(move |err| { - // Box::pin(async move { - // error!("data channel errored: {err}"); - // }) - // })); - // } - // } + if let Some(handler) = k.state.write().await.requested.get(&resource.id) { + match channel { + TransportChannel::Track(_) => warn!("wrong type"), + TransportChannel::DataChannel(channel) => { + if let Err(e) = handler.on_connect(resource, channel).await { + warn!("request handler error: {e}"); + } + } + } + } else { + warn!("got {:?}, which was not requested", resource.id); + } }) } } - -// impl Action { -// pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { -// match self { -// Action::Receive { filename } => { -// if let Some(filename) = filename { -// Box::pin(File::create(filename).await.unwrap()) -// } else { -// Box::pin(stdout()) -// } -// } -// _ => unreachable!(), -// } -// } -// pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { -// match self { -// Action::Send { filename } => { -// if let Some(filename) = filename { -// Box::pin(File::open(filename).await.unwrap()) -// } else { -// Box::pin(stdin()) -// } -// } -// _ => unreachable!(), -// } -// } -// } diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs index ae56f2c..857623e 100644 --- a/client-native-rift/src/port.rs +++ b/client-native-rift/src/port.rs @@ -1,9 +1,12 @@ +use crate::RequestHandler; use bytes::Bytes; -use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource}; +use libkeks::{ + peer::Peer, protocol::ProvideInfo, webrtc::data_channel::RTCDataChannel, DynFut, LocalResource, +}; use log::{debug, error, info, warn}; -use std::{pin::Pin, sync::Arc}; +use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ - io::{AsyncRead, AsyncReadExt}, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, net::TcpStream, sync::RwLock, }; @@ -97,3 +100,60 @@ impl LocalResource for PortExposer { }) } } + +pub struct ForwardHandler { + pub stream: Arc<RwLock<Option<TcpStream>>>, +} +impl RequestHandler for ForwardHandler { + fn on_connect( + &self, + _resource: ProvideInfo, + channel: Arc<RTCDataChannel>, + ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>> { + let stream = self.stream.clone(); + Box::pin(async move { + let stream = stream.write().await.take().unwrap(); + let (mut read, write) = stream.into_split(); + let write = Arc::new(RwLock::new(write)); + + let channel2 = channel.clone(); + channel.on_open(Box::new(move || { + Box::pin(async move { + info!("channel open"); + let channel = channel2.clone(); + tokio::task::spawn(async move { + let mut buf = [0u8; 1 << 15]; + loop { + let Ok(size) = read.read(&mut buf).await else { + break; + }; + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }); + }) + })); + channel.on_close(Box::new(move || { + Box::pin(async move { + info!("channel closed"); + }) + })); + channel.on_error(Box::new(move |err| { + Box::pin(async move { error!("channel error: {err}") }) + })); + { + let write = write.clone(); + channel.on_message(Box::new(move |message| { + let write = write.clone(); + Box::pin(async move { + write.write().await.write_all(&message.data).await.unwrap(); + }) + })); + } + + Ok(()) + }) + } +} |