diff options
author | metamuffin <metamuffin@disroot.org> | 2024-03-18 00:42:21 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-03-18 00:42:21 +0100 |
commit | f427cdf6f7e8fd03418370425f4f663ca4a03121 (patch) | |
tree | ca3a019d62dc9bbeee1b3e93b70e81e2077696e1 /client-native-rift | |
parent | 6823e2acbef4f38daa214ddce1fa92bf809db736 (diff) | |
download | keks-meet-f427cdf6f7e8fd03418370425f4f663ca4a03121.tar keks-meet-f427cdf6f7e8fd03418370425f4f663ca4a03121.tar.bz2 keks-meet-f427cdf6f7e8fd03418370425f4f663ca4a03121.tar.zst |
reworking rift: part one
Diffstat (limited to 'client-native-rift')
-rw-r--r-- | client-native-rift/Cargo.toml | 7 | ||||
-rw-r--r-- | client-native-rift/src/file.rs | 109 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 524 | ||||
-rw-r--r-- | client-native-rift/src/port.rs | 99 |
4 files changed, 468 insertions, 271 deletions
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml index f8d5811..d9a7915 100644 --- a/client-native-rift/Cargo.toml +++ b/client-native-rift/Cargo.toml @@ -4,10 +4,10 @@ version = "1.0.0" edition = "2021" [dependencies] -client-native-lib = { path = "../client-native-lib" } +libkeks = { path = "../client-native-lib" } clap = { version = "4.4.18", features = ["derive"] } -env_logger = "0.11.1" +pretty_env_logger = "0.5.0" log = "0.4" tokio = { version = "1.35", features = ["full"] } @@ -16,3 +16,6 @@ bytes = "1.5.0" indicatif = "0.17.7" humansize = "2.1.3" users = "0.11.0" +anyhow = "1.0.81" +shlex = "1.3.0" +rustyline = "14.0.0" diff --git a/client-native-rift/src/file.rs b/client-native-rift/src/file.rs new file mode 100644 index 0000000..bfe32fd --- /dev/null +++ b/client-native-rift/src/file.rs @@ -0,0 +1,109 @@ +use bytes::Bytes; +use humansize::DECIMAL; +use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource}; +use log::{debug, error, info}; +use std::{ + path::PathBuf, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncReadExt}, + sync::RwLock, +}; + +pub struct FileSender { + pub path: Arc<PathBuf>, + pub info: ProvideInfo, +} + +impl LocalResource for FileSender { + fn info(&self) -> ProvideInfo { + self.info.clone() + } + + fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> { + let id = self.info().id.clone(); + let total_size = self.info().size.unwrap_or(0); + let path = self.path.clone(); + Box::pin(async move { + let channel = peer + .peer_connection + .create_data_channel(&id, None) + .await + .unwrap(); + let pos = Arc::new(AtomicUsize::new(0)); + let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = + Arc::new(RwLock::new(None)); + { + let reader = reader.clone(); + let path = path.clone(); + channel.on_open(Box::new(move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel open"); + *reader.write().await = Some(Box::pin(File::open(&*path).await.unwrap())); + }) + })) + } + { + let reader = reader.clone(); + channel.on_close(Box::new(move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel closed"); + *reader.write().await = None; + }) + })) + } + { + let reader = reader.clone(); + let pos = pos.clone(); + let channel2 = channel.clone(); + channel + .on_buffered_amount_low(Box::new(move || { + let pos = pos.clone(); + let reader = reader.clone(); + let channel = channel2.clone(); + Box::pin(async move { + debug!("buffered amount low"); + let mut buf = [0u8; 1 << 15]; + let size = reader + .write() + .await + .as_mut() + .unwrap() + .read(&mut buf) + .await + .unwrap(); + if size == 0 { + info!("reached EOF, closing channel"); + let _ = channel.send_text("end").await; + channel.close().await.unwrap(); + } else { + let progress_size = pos.fetch_add(size, Ordering::Relaxed); + info!( + "sending {size} bytes ({} of {})", + humansize::format_size(progress_size, DECIMAL), + humansize::format_size(total_size, DECIMAL), + ); + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }) + })) + .await; + channel.set_buffered_amount_low_threshold(1).await; + } + channel.on_error(Box::new(move |err| { + Box::pin(async move { error!("channel error: {err}") }) + })) + }) + } +} diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 0b6327f..57a4947 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -4,36 +4,28 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ -use bytes::Bytes; -use clap::{Parser, Subcommand}; -use client_native_lib::{ +pub mod file; +pub mod port; + +use clap::{ColorChoice, Parser}; +use file::FileSender; +use libkeks::{ instance::Instance, peer::{Peer, TransportChannel}, protocol::ProvideInfo, - Config, DynFut, EventHandler, LocalResource, -}; -use humansize::DECIMAL; -use log::{debug, error, info, warn}; -use std::{ - os::unix::prelude::MetadataExt, - pin::Pin, - process::exit, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; -use tokio::{ - fs::{self, File}, - io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - sync::RwLock, + Config, DynFut, EventHandler, }; +use log::{info, warn}; +use port::PortExposer; +use rustyline::{error::ReadlineError, DefaultEditor}; +use std::{collections::HashMap, os::unix::prelude::MetadataExt, path::PathBuf, sync::Arc}; +use tokio::{fs, sync::RwLock}; use users::get_current_username; fn main() { - env_logger::builder() + pretty_env_logger::formatted_builder() .filter_module("rift", log::LevelFilter::Info) - .filter_module("client_native_lib", log::LevelFilter::Info) + .filter_module("libkeks", log::LevelFilter::Info) .parse_env("LOG") .init(); tokio::runtime::Builder::new_multi_thread() @@ -41,9 +33,11 @@ fn main() { .build() .unwrap() .block_on(run()) + .unwrap(); } #[derive(Parser, Clone)] +/// If no command is provided, rift will enter REPL-mode. pub struct Args { /// keks-meet server used for establishing p2p connection #[clap(long, default_value = "wss://meet.metamuffin.org")] @@ -53,11 +47,24 @@ pub struct Args { username: String, /// pre-shared secret (aka. room name) secret: String, - #[clap(subcommand)] - action: Action, - /// end after completion of the first transfer - #[clap(short, long)] - one_file: bool, + // /// Dispatch a single command after startup + // #[clap(subcommand)] + // command: Option<Command>, +} + +#[derive(Parser, Debug, Clone)] +#[clap(multicall = true, color = ColorChoice::Always)] +pub enum Command { + /// List all peers and their services. + List, + /// Provide a file for download to other peers + Provide { path: PathBuf, id: Option<String> }, + /// Download another peer's files. + Download { id: String, path: Option<PathBuf> }, + /// Expose a local TCP port to other peers. + Expose { port: u16, id: Option<String> }, + /// Forward TCP connections to local port to another peer. + Forward { id: String, port: Option<u16> }, } fn get_username() -> String { @@ -67,79 +74,161 @@ fn get_username() -> String { .to_owned() } -async fn run() { +async fn run() -> anyhow::Result<()> { let args = Args::parse(); - + let state = Arc::new(RwLock::new(State { + requested: Default::default(), + })); let inst = Instance::new( Config { signaling_uri: args.signaling_uri.clone(), username: args.username.clone(), }, Arc::new(Handler { - args: Arc::new(args.clone()), + state: state.clone(), }), ) .await; inst.join(Some(&args.secret)).await; - match &args.action { - Action::Send { filename } => { - inst.add_local_resource(Box::new(FileSender { - info: ProvideInfo { - id: "the-file".to_string(), // we only share a single file so its fine - kind: "file".to_string(), - track_kind: None, - label: Some(filename.clone().unwrap_or("stdin".to_string())), - size: if let Some(filename) = &filename { - Some(fs::metadata(filename).await.unwrap().size() as usize) - } else { - None - }, + inst.spawn_ping().await; + tokio::task::spawn(inst.clone().receive_loop()); + + let mut rl = DefaultEditor::new()?; + loop { + match rl.readline("> ") { + Ok(line) => match Command::try_parse_from(shlex::split(&line).unwrap()) { + Ok(command) => match command { + Command::List => { + let peers = inst.peers.read().await; + println!("{} clients available", peers.len()); + for p in peers.values() { + let username = p + .username + .read() + .await + .clone() + .unwrap_or("<unknown>".to_string()); + println!("{username}:"); + for (rid, r) in p.remote_provided.read().await.iter() { + println!( + "\t{rid:?}: {} {:?}", + r.kind, + r.label.clone().unwrap_or_default() + ) + } + } + } + Command::Provide { path, id } => { + inst.add_local_resource(Box::new(FileSender { + info: ProvideInfo { + id: id.unwrap_or("file".to_owned()), + kind: "file".to_string(), + track_kind: None, + label: Some( + path.file_name().unwrap().to_str().unwrap().to_string(), + ), + size: Some(fs::metadata(&path).await.unwrap().size() as usize), + }, + path: path.into(), + })) + .await; + } + Command::Download { id, path } => { + let peers = inst.peers.read().await; + 'outer: for p in peers.values() { + for (rid, r) in p.remote_provided.read().await.iter() { + if rid == &id { + if r.kind == "file" { + p.request_resource(id).await; + } else { + warn!("not a file"); + } + break 'outer; + } + } + } + } + Command::Expose { port, id } => { + inst.add_local_resource(Box::new(PortExposer { + port, + info: ProvideInfo { + kind: "port".to_string(), + id: id.unwrap_or(format!("p{port}")), + track_kind: None, + label: Some(format!("port {port}")), + size: None, + }, + })) + .await; + } + Command::Forward { id, port } => {} }, - reader_factory: args.action, - })) - .await; + Err(err) => err.print().unwrap(), + }, + Err(ReadlineError::Interrupted) => { + info!("interrupted; exiting..."); + break; + } + Err(e) => Err(e)?, } - _ => (), } - inst.spawn_ping().await; - inst.receive_loop().await; + // match &args.action { + // Action::Send { filename } => { + // inst.add_local_resource(Box::new(FileSender { + // info: ProvideInfo { + // id: "the-file".to_string(), // we only share a single file so its fine + // kind: "file".to_string(), + // track_kind: None, + // label: Some(filename.clone().unwrap_or("stdin".to_string())), + // size: if let Some(filename) = &filename { + // Some(fs::metadata(filename).await.unwrap().size() as usize) + // } else { + // None + // }, + // }, + // reader_factory: args.action, + // })) + // .await; + // } + // _ => (), + // } + Ok(()) +} - tokio::signal::ctrl_c().await.unwrap(); - error!("interrupt received, exiting"); +struct State { + requested: HashMap<String, Box<dyn RequestHandler>>, +} +pub trait RequestHandler: Send + Sync + 'static { + } #[derive(Clone)] struct Handler { - args: Arc<Args>, + state: Arc<RwLock<State>>, } impl EventHandler for Handler { - fn peer_join(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> { + fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> { + fn peer_leave(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn resource_added( - &self, - peer: Arc<Peer>, - info: client_native_lib::protocol::ProvideInfo, - ) -> DynFut<()> { + fn resource_added(&self, peer: Arc<Peer>, info: libkeks::protocol::ProvideInfo) -> DynFut<()> { let id = info.id.clone(); - let args = self.args.clone(); Box::pin(async move { - match &args.action { - Action::Receive { .. } => { - if info.kind == "file" { - peer.request_resource(id).await; - } - } - _ => (), - } + // match &args.action { + // Action::Receive { .. } => { + // if info.kind == "file" { + // peer.request_resource(id).await; + // } + // } + // _ => (), + // } }) } fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> { @@ -151,211 +240,108 @@ impl EventHandler for Handler { _peer: Arc<Peer>, resource: &ProvideInfo, channel: TransportChannel, - ) -> client_native_lib::DynFut<()> { + ) -> libkeks::DynFut<()> { let resource = resource.clone(); let s = self.clone(); Box::pin(async move { - match channel { - TransportChannel::Track(_) => warn!("wrong type"), - TransportChannel::DataChannel(dc) => { - if resource.kind != "file" { - return error!("we got a non-file resource for some reason…"); - } - let pos = Arc::new(AtomicUsize::new(0)); - let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = - Arc::new(RwLock::new(None)); - { - let writer = writer.clone(); - let s = s.clone(); - dc.on_open(Box::new(move || { - let s = s.clone(); - let writer = writer.clone(); - Box::pin(async move { - info!("channel opened"); - *writer.write().await = Some(s.args.action.create_writer().await) - }) - })); - } - { - let writer = writer.clone(); - let args = s.args.clone(); - dc.on_close(Box::new(move || { - let writer = writer.clone(); - let args = args.clone(); - Box::pin(async move { - info!("channel closed"); - *writer.write().await = None; - if args.one_file { - exit(0); - } - }) - })); - } - { - let writer = writer.clone(); - dc.on_message(Box::new(move |mesg| { - let writer = writer.clone(); - let pos = pos.clone(); - Box::pin(async move { - // TODO - if mesg.is_string { - let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); - if &s == "end" { - info!("EOF reached") - } - } else { - let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); - info!( - "recv {:?} ({} of {})", - mesg.data.len(), - humansize::format_size(pos, DECIMAL), - humansize::format_size(resource.size.unwrap_or(0), DECIMAL), - ); - writer - .write() - .await - .as_mut() - .unwrap() - .write_all(&mesg.data) - .await - .unwrap(); - } - }) - })) - } - dc.on_error(Box::new(move |err| { - Box::pin(async move { - error!("data channel errored: {err}"); - }) - })); - } - } + // match channel { + // TransportChannel::Track(_) => warn!("wrong type"), + // TransportChannel::DataChannel(dc) => { + // if resource.kind != "file" { + // return error!("we got a non-file resource for some reason…"); + // } + // let pos = Arc::new(AtomicUsize::new(0)); + // let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = + // Arc::new(RwLock::new(None)); + // { + // let writer = writer.clone(); + // let s = s.clone(); + // dc.on_open(Box::new(move || { + // let s = s.clone(); + // let writer = writer.clone(); + // Box::pin(async move { + // info!("channel opened"); + // *writer.write().await = Some(s.args.action.create_writer().await) + // }) + // })); + // } + // { + // let writer = writer.clone(); + // dc.on_close(Box::new(move || { + // let writer = writer.clone(); + // Box::pin(async move { + // info!("channel closed"); + // *writer.write().await = None; + // exit(0); + // }) + // })); + // } + // { + // let writer = writer.clone(); + // dc.on_message(Box::new(move |mesg| { + // let writer = writer.clone(); + // let pos = pos.clone(); + // Box::pin(async move { + // // TODO + // if mesg.is_string { + // let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); + // if &s == "end" { + // info!("EOF reached") + // } + // } else { + // let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); + // info!( + // "recv {:?} ({} of {})", + // mesg.data.len(), + // humansize::format_size(pos, DECIMAL), + // humansize::format_size(resource.size.unwrap_or(0), DECIMAL), + // ); + // writer + // .write() + // .await + // .as_mut() + // .unwrap() + // .write_all(&mesg.data) + // .await + // .unwrap(); + // } + // }) + // })) + // } + // dc.on_error(Box::new(move |err| { + // Box::pin(async move { + // error!("data channel errored: {err}"); + // }) + // })); + // } + // } }) } } -#[derive(Subcommand, Clone)] -pub enum Action { - /// Send a file - Send { filename: Option<String> }, - /// Receive a file - Receive { filename: Option<String> }, -} - -impl Action { - pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { - match self { - Action::Receive { filename } => { - if let Some(filename) = filename { - Box::pin(File::create(filename).await.unwrap()) - } else { - Box::pin(stdout()) - } - } - _ => unreachable!(), - } - } - pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { - match self { - Action::Send { filename } => { - if let Some(filename) = filename { - Box::pin(File::open(filename).await.unwrap()) - } else { - Box::pin(stdin()) - } - } - _ => unreachable!(), - } - } -} - -struct FileSender { - reader_factory: Action, // TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>, - info: ProvideInfo, -} - -impl LocalResource for FileSender { - fn info(&self) -> ProvideInfo { - self.info.clone() - } - - fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> { - let id = self.info().id.clone(); - let total_size = self.info().size.unwrap_or(0); - let reader_factory = self.reader_factory.clone(); - Box::pin(async move { - let channel = peer - .peer_connection - .create_data_channel(&id, None) - .await - .unwrap(); - let pos = Arc::new(AtomicUsize::new(0)); - let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = - Arc::new(RwLock::new(None)); - { - let reader = reader.clone(); - let reader_factory = reader_factory.clone(); - channel.on_open(Box::new(move || { - let reader = reader.clone(); - Box::pin(async move { - info!("channel open"); - *reader.write().await = Some(reader_factory.create_reader().await); - }) - })) - } - { - let reader = reader.clone(); - channel.on_close(Box::new(move || { - let reader = reader.clone(); - Box::pin(async move { - info!("channel closed"); - *reader.write().await = None; - }) - })) - } - { - let reader = reader.clone(); - let pos = pos.clone(); - let channel2 = channel.clone(); - channel - .on_buffered_amount_low(Box::new(move || { - let pos = pos.clone(); - let reader = reader.clone(); - let channel = channel2.clone(); - Box::pin(async move { - debug!("buffered amount low"); - let mut buf = [0u8; 1 << 15]; - let size = reader - .write() - .await - .as_mut() - .unwrap() - .read(&mut buf) - .await - .unwrap(); - if size == 0 { - info!("reached EOF, closing channel"); - channel.close().await.unwrap(); - } else { - let progress_size = pos.fetch_add(size, Ordering::Relaxed); - info!( - "sending {size} bytes ({} of {})", - humansize::format_size(progress_size, DECIMAL), - humansize::format_size(total_size, DECIMAL), - ); - channel - .send(&Bytes::copy_from_slice(&buf[..size])) - .await - .unwrap(); - } - }) - })) - .await; - channel.set_buffered_amount_low_threshold(1).await; - } - channel.on_error(Box::new(move |err| { - Box::pin(async move { error!("channel error: {err}") }) - })) - }) - } -} +// impl Action { +// pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { +// match self { +// Action::Receive { filename } => { +// if let Some(filename) = filename { +// Box::pin(File::create(filename).await.unwrap()) +// } else { +// Box::pin(stdout()) +// } +// } +// _ => unreachable!(), +// } +// } +// pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { +// match self { +// Action::Send { filename } => { +// if let Some(filename) = filename { +// Box::pin(File::open(filename).await.unwrap()) +// } else { +// Box::pin(stdin()) +// } +// } +// _ => unreachable!(), +// } +// } +// } diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs new file mode 100644 index 0000000..ae56f2c --- /dev/null +++ b/client-native-rift/src/port.rs @@ -0,0 +1,99 @@ +use bytes::Bytes; +use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource}; +use log::{debug, error, info, warn}; +use std::{pin::Pin, sync::Arc}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + net::TcpStream, + sync::RwLock, +}; + +pub struct PortExposer { + pub port: u16, + pub info: ProvideInfo, +} + +impl LocalResource for PortExposer { + fn info(&self) -> ProvideInfo { + self.info.clone() + } + + fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> { + let id = self.info().id.clone(); + let port = self.port; + Box::pin(async move { + let channel = peer + .peer_connection + .create_data_channel(&id, None) + .await + .unwrap(); + let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = + Arc::new(RwLock::new(None)); + { + let reader = reader.clone(); + let channel2 = channel.clone(); + channel.on_open(Box::new(move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel open"); + match TcpStream::connect(("127.0.0.1", port)).await { + Ok(stream) => { + *reader.write().await = Some(Box::pin(stream)); + } + Err(e) => { + warn!("upstream connect failed: {e}"); + channel2.close().await.unwrap(); + } + } + }) + })) + } + { + let reader = reader.clone(); + channel.on_close(Box::new(move || { + let reader = reader.clone(); + Box::pin(async move { + info!("channel closed"); + *reader.write().await = None; + }) + })) + } + { + let reader = reader.clone(); + let channel2 = channel.clone(); + channel + .on_buffered_amount_low(Box::new(move || { + let reader = reader.clone(); + let channel = channel2.clone(); + Box::pin(async move { + debug!("buffered amount low"); + let mut buf = [0u8; 1 << 15]; + let size = reader + .write() + .await + .as_mut() + .unwrap() + .read(&mut buf) + .await + .unwrap(); + if size == 0 { + info!("reached EOF, closing channel"); + let _ = channel.send_text("end").await; + channel.close().await.unwrap(); + } else { + channel + .send(&Bytes::copy_from_slice(&buf[..size])) + .await + .unwrap(); + } + }) + })) + .await; + channel.set_buffered_amount_low_threshold(1).await; + } + channel.on_error(Box::new(move |err| { + Box::pin(async move { error!("channel error: {err}") }) + })) + }) + } +} |