diff options
author | metamuffin <metamuffin@disroot.org> | 2024-03-18 22:08:38 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-03-18 22:08:38 +0100 |
commit | 3cd6a4947407920726e60a9d29c37f7d40fda545 (patch) | |
tree | 85e5e69963f1aa8abfba30e6791fba65f258ecec /client-native-rift/src | |
parent | 5370e79e69771976e27079f4a9fffe807efabd83 (diff) | |
download | keks-meet-3cd6a4947407920726e60a9d29c37f7d40fda545.tar keks-meet-3cd6a4947407920726e60a9d29c37f7d40fda545.tar.bz2 keks-meet-3cd6a4947407920726e60a9d29c37f7d40fda545.tar.zst |
rift: refactor code
Diffstat (limited to 'client-native-rift/src')
-rw-r--r-- | client-native-rift/src/command.rs | 137 | ||||
-rw-r--r-- | client-native-rift/src/main.rs | 155 | ||||
-rw-r--r-- | client-native-rift/src/repl.rs | 139 |
3 files changed, 290 insertions, 141 deletions
diff --git a/client-native-rift/src/command.rs b/client-native-rift/src/command.rs new file mode 100644 index 0000000..56ae788 --- /dev/null +++ b/client-native-rift/src/command.rs @@ -0,0 +1,137 @@ +use crate::file::{DownloadHandler, FileSender}; +use crate::port::{ForwardHandler, PortExposer}; +use crate::{Command, State}; +use anyhow::bail; +use libkeks::{ + instance::Instance, + peer::Peer, + protocol::{ChatMesssage, ProvideInfo, RelayMessage}, +}; +use log::{debug, error, info}; +use std::{os::unix::prelude::MetadataExt, sync::Arc}; +use tokio::{fs, net::TcpListener, sync::RwLock}; + +pub(crate) async fn dispatch_command( + inst: &Arc<Instance>, + state: &Arc<RwLock<State>>, + command: Command, +) -> anyhow::Result<()> { + match command { + Command::List => { + let peers = inst.peers.read().await; + for p in peers.values() { + let username = p + .username + .read() + .await + .clone() + .unwrap_or("<unknown>".to_string()); + info!("{username}:"); + for (rid, r) in p.remote_provided.read().await.iter() { + info!( + "\t{rid:?}: {} {:?}", + r.kind, + r.label.clone().unwrap_or_default() + ) + } + } + } + Command::Stop { mut ids } => { + if ids.is_empty() { + ids = inst + .local_resources + .read() + .await + .keys() + .cloned() + .collect::<Vec<_>>(); + } + for id in ids { + if !inst.remove_local_resource(id.clone()).await { + bail!("service {id:?} not found.") + } + } + } + Command::Provide { path, id } => { + inst.add_local_resource(Box::new(FileSender { + info: ProvideInfo { + id: id.unwrap_or("file".to_owned()), + kind: "file".to_string(), + track_kind: None, + label: Some(path.file_name().unwrap().to_str().unwrap().to_string()), + size: Some(fs::metadata(&path).await?.size() as usize), + }, + path: path.into(), + })) + .await; + } + Command::Download { id, path } => { + let (peer, _resource) = find_id(inst, id.clone(), "file").await?; + state + .write() + .await + .requested + .insert(id.clone(), Box::new(DownloadHandler { path })); + peer.request_resource(id).await; + } + Command::Expose { port, id } => { + inst.add_local_resource(Box::new(PortExposer { + port, + info: ProvideInfo { + kind: "port".to_string(), + id: id.unwrap_or(format!("p{port}")), + track_kind: None, + label: Some(format!("port {port}")), + size: None, + }, + })) + .await; + } + Command::Forward { id, port } => { + let (peer, _resource) = find_id(inst, id.clone(), "port").await?; + let state = state.clone(); + tokio::task::spawn(async move { + let Ok(listener) = TcpListener::bind(("127.0.0.1", port.unwrap_or(0))).await else { + error!("cannot bind tcp listener"); + return; + }; + info!("tcp listener bound to {}", listener.local_addr().unwrap()); + while let Ok((stream, addr)) = listener.accept().await { + debug!("new connection from {addr:?}"); + state.write().await.requested.insert( + id.clone(), + Box::new(ForwardHandler { + stream: Arc::new(RwLock::new(Some(stream))), + }), + ); + peer.request_resource(id.clone()).await; + } + }); + } + Command::Chat { message } => { + inst.send_relay(None, RelayMessage::Chat(ChatMesssage::Text(message))) + .await; + } + } + Ok(()) +} + +async fn find_id( + inst: &Arc<Instance>, + id: String, + kind: &str, +) -> anyhow::Result<(Arc<Peer>, ProvideInfo)> { + let peers = inst.peers.read().await; + for peer in peers.values() { + for (rid, r) in peer.remote_provided.read().await.iter() { + if rid == &id { + if r.kind == kind { + return Ok((peer.to_owned(), r.to_owned())); + } else { + bail!("wrong type: expected {kind:?}, found {:?}", r.kind) + } + } + } + } + bail!("id not found") +} diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index f3db429..b839ee2 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -4,12 +4,13 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ #![allow(clippy::type_complexity)] +pub mod command; pub mod file; pub mod port; +pub mod repl; -use anyhow::bail; +use crate::command::dispatch_command; use clap::{ColorChoice, Parser}; -use file::{DownloadHandler, FileSender}; use libkeks::{ instance::Instance, peer::{Peer, TransportChannel}, @@ -18,13 +19,11 @@ use libkeks::{ Config, DynFut, EventHandler, }; use log::{error, info, trace, warn}; -use port::{ForwardHandler, PortExposer}; -use rustyline::{error::ReadlineError, DefaultEditor}; +use repl::repl; use std::{ - collections::HashMap, future::Future, os::unix::prelude::MetadataExt, path::PathBuf, pin::Pin, - process::exit, sync::Arc, + collections::HashMap, future::Future, path::PathBuf, pin::Pin, process::exit, sync::Arc, }; -use tokio::{fs, net::TcpListener, sync::RwLock}; +use tokio::sync::RwLock; use users::get_current_username; fn main() { @@ -64,6 +63,11 @@ pub struct Args { pub enum Command { /// List all peers and their services. List, + /// Stop providing a services by ID. + Stop { + // IDs of the services to stop. If ommited, all services are stopped. + ids: Vec<String>, + }, /// Provide a file for download to other peers Provide { path: PathBuf, id: Option<String> }, /// Download another peer's files. @@ -128,144 +132,13 @@ async fn run() -> anyhow::Result<()> { }; info!("done"); } - let mut rl = DefaultEditor::new()?; - loop { - match rl.readline("> ") { - Ok(line) => match shlex::split(&line) { - Some(tokens) => match Command::try_parse_from(tokens) { - Ok(command) => match dispatch_command(&inst, &state, command).await { - Ok(()) => (), - Err(err) => error!("{err}"), - }, - Err(err) => err.print().unwrap(), - }, - None => warn!("fix your quoting"), - }, - Err(ReadlineError::Eof) => { - info!("exit"); - break; - } - Err(ReadlineError::Interrupted) => { - info!("interrupted; exiting..."); - break; - } - Err(e) => Err(e)?, - } - } + tokio::task::spawn_blocking(move || repl(inst, state)) + .await? + .await?; Ok(()) } -async fn dispatch_command( - inst: &Arc<Instance>, - state: &Arc<RwLock<State>>, - command: Command, -) -> anyhow::Result<()> { - match command { - Command::List => { - let peers = inst.peers.read().await; - println!("{} clients available", peers.len()); - for p in peers.values() { - let username = p - .username - .read() - .await - .clone() - .unwrap_or("<unknown>".to_string()); - println!("{username}:"); - for (rid, r) in p.remote_provided.read().await.iter() { - println!( - "\t{rid:?}: {} {:?}", - r.kind, - r.label.clone().unwrap_or_default() - ) - } - } - } - Command::Provide { path, id } => { - inst.add_local_resource(Box::new(FileSender { - info: ProvideInfo { - id: id.unwrap_or("file".to_owned()), - kind: "file".to_string(), - track_kind: None, - label: Some(path.file_name().unwrap().to_str().unwrap().to_string()), - size: Some(fs::metadata(&path).await.unwrap().size() as usize), - }, - path: path.into(), - })) - .await; - } - Command::Download { id, path } => { - let (peer, _resource) = find_id(inst, id.clone(), "file").await?; - state - .write() - .await - .requested - .insert(id.clone(), Box::new(DownloadHandler { path })); - peer.request_resource(id).await; - } - Command::Expose { port, id } => { - inst.add_local_resource(Box::new(PortExposer { - port, - info: ProvideInfo { - kind: "port".to_string(), - id: id.unwrap_or(format!("p{port}")), - track_kind: None, - label: Some(format!("port {port}")), - size: None, - }, - })) - .await; - } - Command::Forward { id, port } => { - let (peer, _resource) = find_id(inst, id.clone(), "port").await?; - let state = state.clone(); - tokio::task::spawn(async move { - let Ok(listener) = TcpListener::bind(("127.0.0.1", port.unwrap_or(0))).await else { - error!("cannot bind tcp listener"); - return; - }; - info!("tcp listener bound to {}", listener.local_addr().unwrap()); - while let Ok((stream, addr)) = listener.accept().await { - info!("new connection from {addr:?}"); - state.write().await.requested.insert( - id.clone(), - Box::new(ForwardHandler { - stream: Arc::new(RwLock::new(Some(stream))), - }), - ); - peer.request_resource(id.clone()).await; - } - }); - } - Command::Chat { message } => { - inst.send_relay(None, RelayMessage::Chat(ChatMesssage::Text(message))) - .await; - } - } - Ok(()) -} - -async fn find_id( - inst: &Arc<Instance>, - id: String, - kind: &str, -) -> anyhow::Result<(Arc<Peer>, ProvideInfo)> { - let peers = inst.peers.read().await; - for peer in peers.values() { - for (rid, r) in peer.remote_provided.read().await.iter() { - if rid == &id { - if r.kind == kind { - return Ok((peer.to_owned(), r.to_owned())); - } else { - bail!("wrong type: expected {kind:?}, found {:?}", r.kind) - } - } - } - } - bail!("id not found") -} - impl EventHandler for Handler { fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) diff --git a/client-native-rift/src/repl.rs b/client-native-rift/src/repl.rs new file mode 100644 index 0000000..d462221 --- /dev/null +++ b/client-native-rift/src/repl.rs @@ -0,0 +1,139 @@ +use crate::{command::dispatch_command, Command, State}; +use clap::Parser; +use libkeks::instance::Instance; +use log::{error, info, warn}; +use rustyline::{ + completion::{Candidate, Completer}, + config::Configurer, + error::ReadlineError, + highlight::Highlighter, + hint::{Hint, Hinter}, + validate::{ValidationResult, Validator}, + Editor, Helper, +}; +use std::{borrow::Cow, sync::Arc}; +use tokio::sync::RwLock; + +pub(crate) async fn repl(inst: Arc<Instance>, state: Arc<RwLock<State>>) -> anyhow::Result<()> { + let mut rl = Editor::new()?; + rl.set_auto_add_history(true); + rl.set_helper(Some(ReplHelper {})); + loop { + match rl.readline("> ") { + Ok(line) => match shlex::split(&line) { + Some(tokens) => match Command::try_parse_from(tokens) { + Ok(command) => match dispatch_command(&inst, &state, command).await { + Ok(()) => (), + Err(err) => error!(target: "rift", "{err}"), + }, + Err(err) => err.print().unwrap(), + }, + None => warn!("fix your quoting"), + }, + Err(ReadlineError::Eof) => { + info!("exit"); + break; + } + Err(ReadlineError::Interrupted) => { + info!("interrupted; exiting..."); + break; + } + Err(e) => Err(e)?, + } + } + Ok(()) +} + +struct ReplHelper {} + +impl Helper for ReplHelper {} + +impl Validator for ReplHelper { + fn validate( + &self, + ctx: &mut rustyline::validate::ValidationContext, + ) -> rustyline::Result<rustyline::validate::ValidationResult> { + let _ = ctx; + match shlex::split(ctx.input()) { + Some(_tokens) => Ok(ValidationResult::Valid(None)), + None => Ok(ValidationResult::Invalid(Some( + " incorrect quoting".to_string(), + ))), + } + } + fn validate_while_typing(&self) -> bool { + true + } +} + +impl Hinter for ReplHelper { + type Hint = ReplHint; + fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<Self::Hint> { + let _ = (line, pos, ctx); + None + } +} + +struct ReplHint; +impl Hint for ReplHint { + fn display(&self) -> &str { + "" + } + fn completion(&self) -> Option<&str> { + None + } +} + +impl Completer for ReplHelper { + type Candidate = ReplCandidate; + fn complete( + &self, + line: &str, + pos: usize, + ctx: &rustyline::Context<'_>, + ) -> rustyline::Result<(usize, Vec<Self::Candidate>)> { + let _ = (line, pos, ctx); + Ok((0, Vec::with_capacity(0))) + } + + fn update( + &self, + line: &mut rustyline::line_buffer::LineBuffer, + start: usize, + elected: &str, + cl: &mut rustyline::Changeset, + ) { + let end = line.pos(); + line.replace(start..end, elected, cl); + } +} + +struct ReplCandidate {} +impl Candidate for ReplCandidate { + fn display(&self) -> &str { + "" + } + + fn replacement(&self) -> &str { + "" + } +} + +impl Highlighter for ReplHelper { + fn highlight_prompt<'b, 's: 'b, 'p: 'b>( + &'s self, + prompt: &'p str, + _default: bool, + ) -> Cow<'b, str> { + Cow::Borrowed(prompt) + } + fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> { + Cow::Owned("\x1b[1m".to_owned() + hint + "\x1b[m") + } + fn highlight<'l>(&self, line: &'l str, _pos: usize) -> Cow<'l, str> { + Cow::Borrowed(line) + } + fn highlight_char(&self, _line: &str, _pos: usize, _forced: bool) -> bool { + false + } +} |