aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock14
-rw-r--r--client-native-lib/Cargo.toml2
-rw-r--r--client-native-lib/src/instance.rs14
-rw-r--r--client-native-rift/Cargo.toml7
-rw-r--r--client-native-rift/src/command.rs137
-rw-r--r--client-native-rift/src/main.rs155
-rw-r--r--client-native-rift/src/repl.rs139
7 files changed, 315 insertions, 153 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ee450df..bd30335 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2721,7 +2721,7 @@ checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
name = "libkeks"
-version = "0.2.3"
+version = "0.3.0"
dependencies = [
"aes-gcm",
"base64 0.22.0",
@@ -3961,6 +3961,7 @@ dependencies = [
"memchr",
"nix 0.28.0",
"radix_trie",
+ "rustyline-derive",
"unicode-segmentation",
"unicode-width",
"utf8parse",
@@ -3968,6 +3969,17 @@ dependencies = [
]
[[package]]
+name = "rustyline-derive"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5af959c8bf6af1aff6d2b463a57f71aae53d1332da58419e30ad8dc7011d951"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.53",
+]
+
+[[package]]
name = "ryu"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/client-native-lib/Cargo.toml b/client-native-lib/Cargo.toml
index ea36db9..152ae2a 100644
--- a/client-native-lib/Cargo.toml
+++ b/client-native-lib/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "libkeks"
-version = "0.2.3"
+version = "0.3.0"
edition = "2021"
[dependencies]
diff --git a/client-native-lib/src/instance.rs b/client-native-lib/src/instance.rs
index 899e48c..476d9bd 100644
--- a/client-native-lib/src/instance.rs
+++ b/client-native-lib/src/instance.rs
@@ -181,11 +181,15 @@ impl Instance {
.await
.insert(res.info().id, res);
}
- pub async fn remove_local_resource(&self, id: String) {
- self.local_resources.write().await.remove(&id);
- for (_pid, peer) in self.peers.read().await.iter() {
- peer.send_relay(RelayMessage::ProvideStop { id: id.clone() })
- .await;
+ pub async fn remove_local_resource(&self, id: String) -> bool {
+ if let Some(_) = self.local_resources.write().await.remove(&id) {
+ for (_pid, peer) in self.peers.read().await.iter() {
+ peer.send_relay(RelayMessage::ProvideStop { id: id.clone() })
+ .await;
+ }
+ true
+ } else {
+ false
}
}
}
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml
index cc6b92d..3b52b83 100644
--- a/client-native-rift/Cargo.toml
+++ b/client-native-rift/Cargo.toml
@@ -5,17 +5,14 @@ edition = "2021"
[dependencies]
libkeks = { path = "../client-native-lib" }
-
clap = { version = "4.5.3", features = ["derive"] }
-pretty_env_logger = "0.5.0"
log = { version = "0.4", features = ["kv"] }
-
+rustyline = { version = "14.0.0", features = ["derive"] }
tokio = { version = "1.36", features = ["full"] }
bytes = "1.5.0"
-
+pretty_env_logger = "0.5.0"
indicatif = "0.17.8"
humansize = "2.1.3"
users = "0.11.0"
anyhow = "1.0.81"
shlex = "1.3.0"
-rustyline = "14.0.0"
diff --git a/client-native-rift/src/command.rs b/client-native-rift/src/command.rs
new file mode 100644
index 0000000..56ae788
--- /dev/null
+++ b/client-native-rift/src/command.rs
@@ -0,0 +1,137 @@
+use crate::file::{DownloadHandler, FileSender};
+use crate::port::{ForwardHandler, PortExposer};
+use crate::{Command, State};
+use anyhow::bail;
+use libkeks::{
+ instance::Instance,
+ peer::Peer,
+ protocol::{ChatMesssage, ProvideInfo, RelayMessage},
+};
+use log::{debug, error, info};
+use std::{os::unix::prelude::MetadataExt, sync::Arc};
+use tokio::{fs, net::TcpListener, sync::RwLock};
+
+pub(crate) async fn dispatch_command(
+ inst: &Arc<Instance>,
+ state: &Arc<RwLock<State>>,
+ command: Command,
+) -> anyhow::Result<()> {
+ match command {
+ Command::List => {
+ let peers = inst.peers.read().await;
+ for p in peers.values() {
+ let username = p
+ .username
+ .read()
+ .await
+ .clone()
+ .unwrap_or("<unknown>".to_string());
+ info!("{username}:");
+ for (rid, r) in p.remote_provided.read().await.iter() {
+ info!(
+ "\t{rid:?}: {} {:?}",
+ r.kind,
+ r.label.clone().unwrap_or_default()
+ )
+ }
+ }
+ }
+ Command::Stop { mut ids } => {
+ if ids.is_empty() {
+ ids = inst
+ .local_resources
+ .read()
+ .await
+ .keys()
+ .cloned()
+ .collect::<Vec<_>>();
+ }
+ for id in ids {
+ if !inst.remove_local_resource(id.clone()).await {
+ bail!("service {id:?} not found.")
+ }
+ }
+ }
+ Command::Provide { path, id } => {
+ inst.add_local_resource(Box::new(FileSender {
+ info: ProvideInfo {
+ id: id.unwrap_or("file".to_owned()),
+ kind: "file".to_string(),
+ track_kind: None,
+ label: Some(path.file_name().unwrap().to_str().unwrap().to_string()),
+ size: Some(fs::metadata(&path).await?.size() as usize),
+ },
+ path: path.into(),
+ }))
+ .await;
+ }
+ Command::Download { id, path } => {
+ let (peer, _resource) = find_id(inst, id.clone(), "file").await?;
+ state
+ .write()
+ .await
+ .requested
+ .insert(id.clone(), Box::new(DownloadHandler { path }));
+ peer.request_resource(id).await;
+ }
+ Command::Expose { port, id } => {
+ inst.add_local_resource(Box::new(PortExposer {
+ port,
+ info: ProvideInfo {
+ kind: "port".to_string(),
+ id: id.unwrap_or(format!("p{port}")),
+ track_kind: None,
+ label: Some(format!("port {port}")),
+ size: None,
+ },
+ }))
+ .await;
+ }
+ Command::Forward { id, port } => {
+ let (peer, _resource) = find_id(inst, id.clone(), "port").await?;
+ let state = state.clone();
+ tokio::task::spawn(async move {
+ let Ok(listener) = TcpListener::bind(("127.0.0.1", port.unwrap_or(0))).await else {
+ error!("cannot bind tcp listener");
+ return;
+ };
+ info!("tcp listener bound to {}", listener.local_addr().unwrap());
+ while let Ok((stream, addr)) = listener.accept().await {
+ debug!("new connection from {addr:?}");
+ state.write().await.requested.insert(
+ id.clone(),
+ Box::new(ForwardHandler {
+ stream: Arc::new(RwLock::new(Some(stream))),
+ }),
+ );
+ peer.request_resource(id.clone()).await;
+ }
+ });
+ }
+ Command::Chat { message } => {
+ inst.send_relay(None, RelayMessage::Chat(ChatMesssage::Text(message)))
+ .await;
+ }
+ }
+ Ok(())
+}
+
+async fn find_id(
+ inst: &Arc<Instance>,
+ id: String,
+ kind: &str,
+) -> anyhow::Result<(Arc<Peer>, ProvideInfo)> {
+ let peers = inst.peers.read().await;
+ for peer in peers.values() {
+ for (rid, r) in peer.remote_provided.read().await.iter() {
+ if rid == &id {
+ if r.kind == kind {
+ return Ok((peer.to_owned(), r.to_owned()));
+ } else {
+ bail!("wrong type: expected {kind:?}, found {:?}", r.kind)
+ }
+ }
+ }
+ }
+ bail!("id not found")
+}
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index f3db429..b839ee2 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -4,12 +4,13 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
#![allow(clippy::type_complexity)]
+pub mod command;
pub mod file;
pub mod port;
+pub mod repl;
-use anyhow::bail;
+use crate::command::dispatch_command;
use clap::{ColorChoice, Parser};
-use file::{DownloadHandler, FileSender};
use libkeks::{
instance::Instance,
peer::{Peer, TransportChannel},
@@ -18,13 +19,11 @@ use libkeks::{
Config, DynFut, EventHandler,
};
use log::{error, info, trace, warn};
-use port::{ForwardHandler, PortExposer};
-use rustyline::{error::ReadlineError, DefaultEditor};
+use repl::repl;
use std::{
- collections::HashMap, future::Future, os::unix::prelude::MetadataExt, path::PathBuf, pin::Pin,
- process::exit, sync::Arc,
+ collections::HashMap, future::Future, path::PathBuf, pin::Pin, process::exit, sync::Arc,
};
-use tokio::{fs, net::TcpListener, sync::RwLock};
+use tokio::sync::RwLock;
use users::get_current_username;
fn main() {
@@ -64,6 +63,11 @@ pub struct Args {
pub enum Command {
/// List all peers and their services.
List,
+ /// Stop providing a services by ID.
+ Stop {
+ // IDs of the services to stop. If ommited, all services are stopped.
+ ids: Vec<String>,
+ },
/// Provide a file for download to other peers
Provide { path: PathBuf, id: Option<String> },
/// Download another peer's files.
@@ -128,144 +132,13 @@ async fn run() -> anyhow::Result<()> {
};
info!("done");
}
- let mut rl = DefaultEditor::new()?;
- loop {
- match rl.readline("> ") {
- Ok(line) => match shlex::split(&line) {
- Some(tokens) => match Command::try_parse_from(tokens) {
- Ok(command) => match dispatch_command(&inst, &state, command).await {
- Ok(()) => (),
- Err(err) => error!("{err}"),
- },
- Err(err) => err.print().unwrap(),
- },
- None => warn!("fix your quoting"),
- },
- Err(ReadlineError::Eof) => {
- info!("exit");
- break;
- }
- Err(ReadlineError::Interrupted) => {
- info!("interrupted; exiting...");
- break;
- }
- Err(e) => Err(e)?,
- }
- }
+ tokio::task::spawn_blocking(move || repl(inst, state))
+ .await?
+ .await?;
Ok(())
}
-async fn dispatch_command(
- inst: &Arc<Instance>,
- state: &Arc<RwLock<State>>,
- command: Command,
-) -> anyhow::Result<()> {
- match command {
- Command::List => {
- let peers = inst.peers.read().await;
- println!("{} clients available", peers.len());
- for p in peers.values() {
- let username = p
- .username
- .read()
- .await
- .clone()
- .unwrap_or("<unknown>".to_string());
- println!("{username}:");
- for (rid, r) in p.remote_provided.read().await.iter() {
- println!(
- "\t{rid:?}: {} {:?}",
- r.kind,
- r.label.clone().unwrap_or_default()
- )
- }
- }
- }
- Command::Provide { path, id } => {
- inst.add_local_resource(Box::new(FileSender {
- info: ProvideInfo {
- id: id.unwrap_or("file".to_owned()),
- kind: "file".to_string(),
- track_kind: None,
- label: Some(path.file_name().unwrap().to_str().unwrap().to_string()),
- size: Some(fs::metadata(&path).await.unwrap().size() as usize),
- },
- path: path.into(),
- }))
- .await;
- }
- Command::Download { id, path } => {
- let (peer, _resource) = find_id(inst, id.clone(), "file").await?;
- state
- .write()
- .await
- .requested
- .insert(id.clone(), Box::new(DownloadHandler { path }));
- peer.request_resource(id).await;
- }
- Command::Expose { port, id } => {
- inst.add_local_resource(Box::new(PortExposer {
- port,
- info: ProvideInfo {
- kind: "port".to_string(),
- id: id.unwrap_or(format!("p{port}")),
- track_kind: None,
- label: Some(format!("port {port}")),
- size: None,
- },
- }))
- .await;
- }
- Command::Forward { id, port } => {
- let (peer, _resource) = find_id(inst, id.clone(), "port").await?;
- let state = state.clone();
- tokio::task::spawn(async move {
- let Ok(listener) = TcpListener::bind(("127.0.0.1", port.unwrap_or(0))).await else {
- error!("cannot bind tcp listener");
- return;
- };
- info!("tcp listener bound to {}", listener.local_addr().unwrap());
- while let Ok((stream, addr)) = listener.accept().await {
- info!("new connection from {addr:?}");
- state.write().await.requested.insert(
- id.clone(),
- Box::new(ForwardHandler {
- stream: Arc::new(RwLock::new(Some(stream))),
- }),
- );
- peer.request_resource(id.clone()).await;
- }
- });
- }
- Command::Chat { message } => {
- inst.send_relay(None, RelayMessage::Chat(ChatMesssage::Text(message)))
- .await;
- }
- }
- Ok(())
-}
-
-async fn find_id(
- inst: &Arc<Instance>,
- id: String,
- kind: &str,
-) -> anyhow::Result<(Arc<Peer>, ProvideInfo)> {
- let peers = inst.peers.read().await;
- for peer in peers.values() {
- for (rid, r) in peer.remote_provided.read().await.iter() {
- if rid == &id {
- if r.kind == kind {
- return Ok((peer.to_owned(), r.to_owned()));
- } else {
- bail!("wrong type: expected {kind:?}, found {:?}", r.kind)
- }
- }
- }
- }
- bail!("id not found")
-}
-
impl EventHandler for Handler {
fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> {
Box::pin(async move {})
diff --git a/client-native-rift/src/repl.rs b/client-native-rift/src/repl.rs
new file mode 100644
index 0000000..d462221
--- /dev/null
+++ b/client-native-rift/src/repl.rs
@@ -0,0 +1,139 @@
+use crate::{command::dispatch_command, Command, State};
+use clap::Parser;
+use libkeks::instance::Instance;
+use log::{error, info, warn};
+use rustyline::{
+ completion::{Candidate, Completer},
+ config::Configurer,
+ error::ReadlineError,
+ highlight::Highlighter,
+ hint::{Hint, Hinter},
+ validate::{ValidationResult, Validator},
+ Editor, Helper,
+};
+use std::{borrow::Cow, sync::Arc};
+use tokio::sync::RwLock;
+
+pub(crate) async fn repl(inst: Arc<Instance>, state: Arc<RwLock<State>>) -> anyhow::Result<()> {
+ let mut rl = Editor::new()?;
+ rl.set_auto_add_history(true);
+ rl.set_helper(Some(ReplHelper {}));
+ loop {
+ match rl.readline("> ") {
+ Ok(line) => match shlex::split(&line) {
+ Some(tokens) => match Command::try_parse_from(tokens) {
+ Ok(command) => match dispatch_command(&inst, &state, command).await {
+ Ok(()) => (),
+ Err(err) => error!(target: "rift", "{err}"),
+ },
+ Err(err) => err.print().unwrap(),
+ },
+ None => warn!("fix your quoting"),
+ },
+ Err(ReadlineError::Eof) => {
+ info!("exit");
+ break;
+ }
+ Err(ReadlineError::Interrupted) => {
+ info!("interrupted; exiting...");
+ break;
+ }
+ Err(e) => Err(e)?,
+ }
+ }
+ Ok(())
+}
+
+struct ReplHelper {}
+
+impl Helper for ReplHelper {}
+
+impl Validator for ReplHelper {
+ fn validate(
+ &self,
+ ctx: &mut rustyline::validate::ValidationContext,
+ ) -> rustyline::Result<rustyline::validate::ValidationResult> {
+ let _ = ctx;
+ match shlex::split(ctx.input()) {
+ Some(_tokens) => Ok(ValidationResult::Valid(None)),
+ None => Ok(ValidationResult::Invalid(Some(
+ " incorrect quoting".to_string(),
+ ))),
+ }
+ }
+ fn validate_while_typing(&self) -> bool {
+ true
+ }
+}
+
+impl Hinter for ReplHelper {
+ type Hint = ReplHint;
+ fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<Self::Hint> {
+ let _ = (line, pos, ctx);
+ None
+ }
+}
+
+struct ReplHint;
+impl Hint for ReplHint {
+ fn display(&self) -> &str {
+ ""
+ }
+ fn completion(&self) -> Option<&str> {
+ None
+ }
+}
+
+impl Completer for ReplHelper {
+ type Candidate = ReplCandidate;
+ fn complete(
+ &self,
+ line: &str,
+ pos: usize,
+ ctx: &rustyline::Context<'_>,
+ ) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
+ let _ = (line, pos, ctx);
+ Ok((0, Vec::with_capacity(0)))
+ }
+
+ fn update(
+ &self,
+ line: &mut rustyline::line_buffer::LineBuffer,
+ start: usize,
+ elected: &str,
+ cl: &mut rustyline::Changeset,
+ ) {
+ let end = line.pos();
+ line.replace(start..end, elected, cl);
+ }
+}
+
+struct ReplCandidate {}
+impl Candidate for ReplCandidate {
+ fn display(&self) -> &str {
+ ""
+ }
+
+ fn replacement(&self) -> &str {
+ ""
+ }
+}
+
+impl Highlighter for ReplHelper {
+ fn highlight_prompt<'b, 's: 'b, 'p: 'b>(
+ &'s self,
+ prompt: &'p str,
+ _default: bool,
+ ) -> Cow<'b, str> {
+ Cow::Borrowed(prompt)
+ }
+ fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> {
+ Cow::Owned("\x1b[1m".to_owned() + hint + "\x1b[m")
+ }
+ fn highlight<'l>(&self, line: &'l str, _pos: usize) -> Cow<'l, str> {
+ Cow::Borrowed(line)
+ }
+ fn highlight_char(&self, _line: &str, _pos: usize, _forced: bool) -> bool {
+ false
+ }
+}