diff options
Diffstat (limited to 'client-native-rift/src/main.rs')
-rw-r--r-- | client-native-rift/src/main.rs | 524 |
1 files changed, 255 insertions, 269 deletions
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 0b6327f..57a4947 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -4,36 +4,28 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ -use bytes::Bytes; -use clap::{Parser, Subcommand}; -use client_native_lib::{ +pub mod file; +pub mod port; + +use clap::{ColorChoice, Parser}; +use file::FileSender; +use libkeks::{ instance::Instance, peer::{Peer, TransportChannel}, protocol::ProvideInfo, - Config, DynFut, EventHandler, LocalResource, -}; -use humansize::DECIMAL; -use log::{debug, error, info, warn}; -use std::{ - os::unix::prelude::MetadataExt, - pin::Pin, - process::exit, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; -use tokio::{ - fs::{self, File}, - io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - sync::RwLock, + Config, DynFut, EventHandler, }; +use log::{info, warn}; +use port::PortExposer; +use rustyline::{error::ReadlineError, DefaultEditor}; +use std::{collections::HashMap, os::unix::prelude::MetadataExt, path::PathBuf, sync::Arc}; +use tokio::{fs, sync::RwLock}; use users::get_current_username; fn main() { - env_logger::builder() + pretty_env_logger::formatted_builder() .filter_module("rift", log::LevelFilter::Info) - .filter_module("client_native_lib", log::LevelFilter::Info) + .filter_module("libkeks", log::LevelFilter::Info) .parse_env("LOG") .init(); tokio::runtime::Builder::new_multi_thread() @@ -41,9 +33,11 @@ fn main() { .build() .unwrap() .block_on(run()) + .unwrap(); } #[derive(Parser, Clone)] +/// If no command is provided, rift will enter REPL-mode. pub struct Args { /// keks-meet server used for establishing p2p connection #[clap(long, default_value = "wss://meet.metamuffin.org")] @@ -53,11 +47,24 @@ pub struct Args { username: String, /// pre-shared secret (aka. room name) secret: String, - #[clap(subcommand)] - action: Action, - /// end after completion of the first transfer - #[clap(short, long)] - one_file: bool, + // /// Dispatch a single command after startup + // #[clap(subcommand)] + // command: Option<Command>, +} + +#[derive(Parser, Debug, Clone)] +#[clap(multicall = true, color = ColorChoice::Always)] +pub enum Command { + /// List all peers and their services. + List, + /// Provide a file for download to other peers + Provide { path: PathBuf, id: Option<String> }, + /// Download another peer's files. + Download { id: String, path: Option<PathBuf> }, + /// Expose a local TCP port to other peers. + Expose { port: u16, id: Option<String> }, + /// Forward TCP connections to local port to another peer. + Forward { id: String, port: Option<u16> }, } fn get_username() -> String { @@ -67,79 +74,161 @@ fn get_username() -> String { .to_owned() } -async fn run() { +async fn run() -> anyhow::Result<()> { let args = Args::parse(); - + let state = Arc::new(RwLock::new(State { + requested: Default::default(), + })); let inst = Instance::new( Config { signaling_uri: args.signaling_uri.clone(), username: args.username.clone(), }, Arc::new(Handler { - args: Arc::new(args.clone()), + state: state.clone(), }), ) .await; inst.join(Some(&args.secret)).await; - match &args.action { - Action::Send { filename } => { - inst.add_local_resource(Box::new(FileSender { - info: ProvideInfo { - id: "the-file".to_string(), // we only share a single file so its fine - kind: "file".to_string(), - track_kind: None, - label: Some(filename.clone().unwrap_or("stdin".to_string())), - size: if let Some(filename) = &filename { - Some(fs::metadata(filename).await.unwrap().size() as usize) - } else { - None - }, + inst.spawn_ping().await; + tokio::task::spawn(inst.clone().receive_loop()); + + let mut rl = DefaultEditor::new()?; + loop { + match rl.readline("> ") { + Ok(line) => match Command::try_parse_from(shlex::split(&line).unwrap()) { + Ok(command) => match command { + Command::List => { + let peers = inst.peers.read().await; + println!("{} clients available", peers.len()); + for p in peers.values() { + let username = p + .username + .read() + .await + .clone() + .unwrap_or("<unknown>".to_string()); + println!("{username}:"); + for (rid, r) in p.remote_provided.read().await.iter() { + println!( + "\t{rid:?}: {} {:?}", + r.kind, + r.label.clone().unwrap_or_default() + ) + } + } + } + Command::Provide { path, id } => { + inst.add_local_resource(Box::new(FileSender { + info: ProvideInfo { + id: id.unwrap_or("file".to_owned()), + kind: "file".to_string(), + track_kind: None, + label: Some( + path.file_name().unwrap().to_str().unwrap().to_string(), + ), + size: Some(fs::metadata(&path).await.unwrap().size() as usize), + }, + path: path.into(), + })) + .await; + } + Command::Download { id, path } => { + let peers = inst.peers.read().await; + 'outer: for p in peers.values() { + for (rid, r) in p.remote_provided.read().await.iter() { + if rid == &id { + if r.kind == "file" { + p.request_resource(id).await; + } else { + warn!("not a file"); + } + break 'outer; + } + } + } + } + Command::Expose { port, id } => { + inst.add_local_resource(Box::new(PortExposer { + port, + info: ProvideInfo { + kind: "port".to_string(), + id: id.unwrap_or(format!("p{port}")), + track_kind: None, + label: Some(format!("port {port}")), + size: None, + }, + })) + .await; + } + Command::Forward { id, port } => {} }, - reader_factory: args.action, - })) - .await; + Err(err) => err.print().unwrap(), + }, + Err(ReadlineError::Interrupted) => { + info!("interrupted; exiting..."); + break; + } + Err(e) => Err(e)?, } - _ => (), } - inst.spawn_ping().await; - inst.receive_loop().await; + // match &args.action { + // Action::Send { filename } => { + // inst.add_local_resource(Box::new(FileSender { + // info: ProvideInfo { + // id: "the-file".to_string(), // we only share a single file so its fine + // kind: "file".to_string(), + // track_kind: None, + // label: Some(filename.clone().unwrap_or("stdin".to_string())), + // size: if let Some(filename) = &filename { + // Some(fs::metadata(filename).await.unwrap().size() as usize) + // } else { + // None + // }, + // }, + // reader_factory: args.action, + // })) + // .await; + // } + // _ => (), + // } + Ok(()) +} - tokio::signal::ctrl_c().await.unwrap(); - error!("interrupt received, exiting"); +struct State { + requested: HashMap<String, Box<dyn RequestHandler>>, +} +pub trait RequestHandler: Send + Sync + 'static { + } #[derive(Clone)] struct Handler { - args: Arc<Args>, + state: Arc<RwLock<State>>, } impl EventHandler for Handler { - fn peer_join(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> { + fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> { + fn peer_leave(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn resource_added( - &self, - peer: Arc<Peer>, - info: client_native_lib::protocol::ProvideInfo, - ) -> DynFut<()> { + fn resource_added(&self, peer: Arc<Peer>, info: libkeks::protocol::ProvideInfo) -> DynFut<()> { let id = info.id.clone(); - let args = self.args.clone(); Box::pin(async move { - match &args.action { - Action::Receive { .. } => { - if info.kind == "file" { - peer.request_resource(id).await; - } - } - _ => (), - } + // match &args.action { + // Action::Receive { .. } => { + // if info.kind == "file" { + // peer.request_resource(id).await; + // } + // } + // _ => (), + // } }) } fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> { @@ -151,211 +240,108 @@ impl EventHandler for Handler { _peer: Arc<Peer>, resource: &ProvideInfo, channel: TransportChannel, - ) -> client_native_lib::DynFut<()> { + ) -> libkeks::DynFut<()> { let resource = resource.clone(); let s = self.clone(); Box::pin(async move { - match channel { - TransportChannel::Track(_) => warn!("wrong type"), - TransportChannel::DataChannel(dc) => { - if resource.kind != "file" { - return error!("we got a non-file resource for some reason…"); - } - let pos = Arc::new(AtomicUsize::new(0)); - let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = - Arc::new(RwLock::new(None)); - { - let writer = writer.clone(); - let s = s.clone(); - dc.on_open(Box::new(move || { - let s = s.clone(); - let writer = writer.clone(); - Box::pin(async move { - info!("channel opened"); - *writer.write().await = Some(s.args.action.create_writer().await) - }) - })); - } - { - let writer = writer.clone(); - let args = s.args.clone(); - dc.on_close(Box::new(move || { - let writer = writer.clone(); - let args = args.clone(); - Box::pin(async move { - info!("channel closed"); - *writer.write().await = None; - if args.one_file { - exit(0); - } - }) - })); - } - { - let writer = writer.clone(); - dc.on_message(Box::new(move |mesg| { - let writer = writer.clone(); - let pos = pos.clone(); - Box::pin(async move { - // TODO - if mesg.is_string { - let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); - if &s == "end" { - info!("EOF reached") - } - } else { - let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); - info!( - "recv {:?} ({} of {})", - mesg.data.len(), - humansize::format_size(pos, DECIMAL), - humansize::format_size(resource.size.unwrap_or(0), DECIMAL), - ); - writer - .write() - .await - .as_mut() - .unwrap() - .write_all(&mesg.data) - .await - .unwrap(); - } - }) - })) - } - dc.on_error(Box::new(move |err| { - Box::pin(async move { - error!("data channel errored: {err}"); - }) - })); - } - } + // match channel { + // TransportChannel::Track(_) => warn!("wrong type"), + // TransportChannel::DataChannel(dc) => { + // if resource.kind != "file" { + // return error!("we got a non-file resource for some reason…"); + // } + // let pos = Arc::new(AtomicUsize::new(0)); + // let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = + // Arc::new(RwLock::new(None)); + // { + // let writer = writer.clone(); + // let s = s.clone(); + // dc.on_open(Box::new(move || { + // let s = s.clone(); + // let writer = writer.clone(); + // Box::pin(async move { + // info!("channel opened"); + // *writer.write().await = Some(s.args.action.create_writer().await) + // }) + // })); + // } + // { + // let writer = writer.clone(); + // dc.on_close(Box::new(move || { + // let writer = writer.clone(); + // Box::pin(async move { + // info!("channel closed"); + // *writer.write().await = None; + // exit(0); + // }) + // })); + // } + // { + // let writer = writer.clone(); + // dc.on_message(Box::new(move |mesg| { + // let writer = writer.clone(); + // let pos = pos.clone(); + // Box::pin(async move { + // // TODO + // if mesg.is_string { + // let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); + // if &s == "end" { + // info!("EOF reached") + // } + // } else { + // let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); + // info!( + // "recv {:?} ({} of {})", + // mesg.data.len(), + // humansize::format_size(pos, DECIMAL), + // humansize::format_size(resource.size.unwrap_or(0), DECIMAL), + // ); + // writer + // .write() + // .await + // .as_mut() + // .unwrap() + // .write_all(&mesg.data) + // .await + // .unwrap(); + // } + // }) + // })) + // } + // dc.on_error(Box::new(move |err| { + // Box::pin(async move { + // error!("data channel errored: {err}"); + // }) + // })); + // } + // } }) } } -#[derive(Subcommand, Clone)] -pub enum Action { - /// Send a file - Send { filename: Option<String> }, - /// Receive a file - Receive { filename: Option<String> }, -} - -impl Action { - pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { - match self { - Action::Receive { filename } => { - if let Some(filename) = filename { - Box::pin(File::create(filename).await.unwrap()) - } else { - Box::pin(stdout()) - } - } - _ => unreachable!(), - } - } - pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { - match self { - Action::Send { filename } => { - if let Some(filename) = filename { - Box::pin(File::open(filename).await.unwrap()) - } else { - Box::pin(stdin()) - } - } - _ => unreachable!(), - } - } -} - -struct FileSender { - reader_factory: Action, // TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>, - info: ProvideInfo, -} - -impl LocalResource for FileSender { - fn info(&self) -> ProvideInfo { - self.info.clone() - } - - fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> { - let id = self.info().id.clone(); - let total_size = self.info().size.unwrap_or(0); - let reader_factory = self.reader_factory.clone(); - Box::pin(async move { - let channel = peer - .peer_connection - .create_data_channel(&id, None) - .await - .unwrap(); - let pos = Arc::new(AtomicUsize::new(0)); - let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> = - Arc::new(RwLock::new(None)); - { - let reader = reader.clone(); - let reader_factory = reader_factory.clone(); - channel.on_open(Box::new(move || { - let reader = reader.clone(); - Box::pin(async move { - info!("channel open"); - *reader.write().await = Some(reader_factory.create_reader().await); - }) - })) - } - { - let reader = reader.clone(); - channel.on_close(Box::new(move || { - let reader = reader.clone(); - Box::pin(async move { - info!("channel closed"); - *reader.write().await = None; - }) - })) - } - { - let reader = reader.clone(); - let pos = pos.clone(); - let channel2 = channel.clone(); - channel - .on_buffered_amount_low(Box::new(move || { - let pos = pos.clone(); - let reader = reader.clone(); - let channel = channel2.clone(); - Box::pin(async move { - debug!("buffered amount low"); - let mut buf = [0u8; 1 << 15]; - let size = reader - .write() - .await - .as_mut() - .unwrap() - .read(&mut buf) - .await - .unwrap(); - if size == 0 { - info!("reached EOF, closing channel"); - channel.close().await.unwrap(); - } else { - let progress_size = pos.fetch_add(size, Ordering::Relaxed); - info!( - "sending {size} bytes ({} of {})", - humansize::format_size(progress_size, DECIMAL), - humansize::format_size(total_size, DECIMAL), - ); - channel - .send(&Bytes::copy_from_slice(&buf[..size])) - .await - .unwrap(); - } - }) - })) - .await; - channel.set_buffered_amount_low_threshold(1).await; - } - channel.on_error(Box::new(move |err| { - Box::pin(async move { error!("channel error: {err}") }) - })) - }) - } -} +// impl Action { +// pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { +// match self { +// Action::Receive { filename } => { +// if let Some(filename) = filename { +// Box::pin(File::create(filename).await.unwrap()) +// } else { +// Box::pin(stdout()) +// } +// } +// _ => unreachable!(), +// } +// } +// pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { +// match self { +// Action::Send { filename } => { +// if let Some(filename) = filename { +// Box::pin(File::open(filename).await.unwrap()) +// } else { +// Box::pin(stdin()) +// } +// } +// _ => unreachable!(), +// } +// } +// } |