summaryrefslogtreecommitdiff
path: root/client-native-rift
diff options
context:
space:
mode:
Diffstat (limited to 'client-native-rift')
-rw-r--r--client-native-rift/Cargo.toml7
-rw-r--r--client-native-rift/src/file.rs109
-rw-r--r--client-native-rift/src/main.rs524
-rw-r--r--client-native-rift/src/port.rs99
4 files changed, 468 insertions, 271 deletions
diff --git a/client-native-rift/Cargo.toml b/client-native-rift/Cargo.toml
index f8d5811..d9a7915 100644
--- a/client-native-rift/Cargo.toml
+++ b/client-native-rift/Cargo.toml
@@ -4,10 +4,10 @@ version = "1.0.0"
edition = "2021"
[dependencies]
-client-native-lib = { path = "../client-native-lib" }
+libkeks = { path = "../client-native-lib" }
clap = { version = "4.4.18", features = ["derive"] }
-env_logger = "0.11.1"
+pretty_env_logger = "0.5.0"
log = "0.4"
tokio = { version = "1.35", features = ["full"] }
@@ -16,3 +16,6 @@ bytes = "1.5.0"
indicatif = "0.17.7"
humansize = "2.1.3"
users = "0.11.0"
+anyhow = "1.0.81"
+shlex = "1.3.0"
+rustyline = "14.0.0"
diff --git a/client-native-rift/src/file.rs b/client-native-rift/src/file.rs
new file mode 100644
index 0000000..bfe32fd
--- /dev/null
+++ b/client-native-rift/src/file.rs
@@ -0,0 +1,109 @@
+use bytes::Bytes;
+use humansize::DECIMAL;
+use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
+use log::{debug, error, info};
+use std::{
+ path::PathBuf,
+ pin::Pin,
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
+};
+use tokio::{
+ fs::File,
+ io::{AsyncRead, AsyncReadExt},
+ sync::RwLock,
+};
+
+pub struct FileSender {
+ pub path: Arc<PathBuf>,
+ pub info: ProvideInfo,
+}
+
+impl LocalResource for FileSender {
+ fn info(&self) -> ProvideInfo {
+ self.info.clone()
+ }
+
+ fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
+ let id = self.info().id.clone();
+ let total_size = self.info().size.unwrap_or(0);
+ let path = self.path.clone();
+ Box::pin(async move {
+ let channel = peer
+ .peer_connection
+ .create_data_channel(&id, None)
+ .await
+ .unwrap();
+ let pos = Arc::new(AtomicUsize::new(0));
+ let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
+ Arc::new(RwLock::new(None));
+ {
+ let reader = reader.clone();
+ let path = path.clone();
+ channel.on_open(Box::new(move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel open");
+ *reader.write().await = Some(Box::pin(File::open(&*path).await.unwrap()));
+ })
+ }))
+ }
+ {
+ let reader = reader.clone();
+ channel.on_close(Box::new(move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel closed");
+ *reader.write().await = None;
+ })
+ }))
+ }
+ {
+ let reader = reader.clone();
+ let pos = pos.clone();
+ let channel2 = channel.clone();
+ channel
+ .on_buffered_amount_low(Box::new(move || {
+ let pos = pos.clone();
+ let reader = reader.clone();
+ let channel = channel2.clone();
+ Box::pin(async move {
+ debug!("buffered amount low");
+ let mut buf = [0u8; 1 << 15];
+ let size = reader
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .read(&mut buf)
+ .await
+ .unwrap();
+ if size == 0 {
+ info!("reached EOF, closing channel");
+ let _ = channel.send_text("end").await;
+ channel.close().await.unwrap();
+ } else {
+ let progress_size = pos.fetch_add(size, Ordering::Relaxed);
+ info!(
+ "sending {size} bytes ({} of {})",
+ humansize::format_size(progress_size, DECIMAL),
+ humansize::format_size(total_size, DECIMAL),
+ );
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ })
+ }))
+ .await;
+ channel.set_buffered_amount_low_threshold(1).await;
+ }
+ channel.on_error(Box::new(move |err| {
+ Box::pin(async move { error!("channel error: {err}") })
+ }))
+ })
+ }
+}
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs
index 0b6327f..57a4947 100644
--- a/client-native-rift/src/main.rs
+++ b/client-native-rift/src/main.rs
@@ -4,36 +4,28 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use bytes::Bytes;
-use clap::{Parser, Subcommand};
-use client_native_lib::{
+pub mod file;
+pub mod port;
+
+use clap::{ColorChoice, Parser};
+use file::FileSender;
+use libkeks::{
instance::Instance,
peer::{Peer, TransportChannel},
protocol::ProvideInfo,
- Config, DynFut, EventHandler, LocalResource,
-};
-use humansize::DECIMAL;
-use log::{debug, error, info, warn};
-use std::{
- os::unix::prelude::MetadataExt,
- pin::Pin,
- process::exit,
- sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
- },
-};
-use tokio::{
- fs::{self, File},
- io::{stdin, stdout, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
- sync::RwLock,
+ Config, DynFut, EventHandler,
};
+use log::{info, warn};
+use port::PortExposer;
+use rustyline::{error::ReadlineError, DefaultEditor};
+use std::{collections::HashMap, os::unix::prelude::MetadataExt, path::PathBuf, sync::Arc};
+use tokio::{fs, sync::RwLock};
use users::get_current_username;
fn main() {
- env_logger::builder()
+ pretty_env_logger::formatted_builder()
.filter_module("rift", log::LevelFilter::Info)
- .filter_module("client_native_lib", log::LevelFilter::Info)
+ .filter_module("libkeks", log::LevelFilter::Info)
.parse_env("LOG")
.init();
tokio::runtime::Builder::new_multi_thread()
@@ -41,9 +33,11 @@ fn main() {
.build()
.unwrap()
.block_on(run())
+ .unwrap();
}
#[derive(Parser, Clone)]
+/// If no command is provided, rift will enter REPL-mode.
pub struct Args {
/// keks-meet server used for establishing p2p connection
#[clap(long, default_value = "wss://meet.metamuffin.org")]
@@ -53,11 +47,24 @@ pub struct Args {
username: String,
/// pre-shared secret (aka. room name)
secret: String,
- #[clap(subcommand)]
- action: Action,
- /// end after completion of the first transfer
- #[clap(short, long)]
- one_file: bool,
+ // /// Dispatch a single command after startup
+ // #[clap(subcommand)]
+ // command: Option<Command>,
+}
+
+#[derive(Parser, Debug, Clone)]
+#[clap(multicall = true, color = ColorChoice::Always)]
+pub enum Command {
+ /// List all peers and their services.
+ List,
+ /// Provide a file for download to other peers
+ Provide { path: PathBuf, id: Option<String> },
+ /// Download another peer's files.
+ Download { id: String, path: Option<PathBuf> },
+ /// Expose a local TCP port to other peers.
+ Expose { port: u16, id: Option<String> },
+ /// Forward TCP connections to local port to another peer.
+ Forward { id: String, port: Option<u16> },
}
fn get_username() -> String {
@@ -67,79 +74,161 @@ fn get_username() -> String {
.to_owned()
}
-async fn run() {
+async fn run() -> anyhow::Result<()> {
let args = Args::parse();
-
+ let state = Arc::new(RwLock::new(State {
+ requested: Default::default(),
+ }));
let inst = Instance::new(
Config {
signaling_uri: args.signaling_uri.clone(),
username: args.username.clone(),
},
Arc::new(Handler {
- args: Arc::new(args.clone()),
+ state: state.clone(),
}),
)
.await;
inst.join(Some(&args.secret)).await;
- match &args.action {
- Action::Send { filename } => {
- inst.add_local_resource(Box::new(FileSender {
- info: ProvideInfo {
- id: "the-file".to_string(), // we only share a single file so its fine
- kind: "file".to_string(),
- track_kind: None,
- label: Some(filename.clone().unwrap_or("stdin".to_string())),
- size: if let Some(filename) = &filename {
- Some(fs::metadata(filename).await.unwrap().size() as usize)
- } else {
- None
- },
+ inst.spawn_ping().await;
+ tokio::task::spawn(inst.clone().receive_loop());
+
+ let mut rl = DefaultEditor::new()?;
+ loop {
+ match rl.readline("> ") {
+ Ok(line) => match Command::try_parse_from(shlex::split(&line).unwrap()) {
+ Ok(command) => match command {
+ Command::List => {
+ let peers = inst.peers.read().await;
+ println!("{} clients available", peers.len());
+ for p in peers.values() {
+ let username = p
+ .username
+ .read()
+ .await
+ .clone()
+ .unwrap_or("<unknown>".to_string());
+ println!("{username}:");
+ for (rid, r) in p.remote_provided.read().await.iter() {
+ println!(
+ "\t{rid:?}: {} {:?}",
+ r.kind,
+ r.label.clone().unwrap_or_default()
+ )
+ }
+ }
+ }
+ Command::Provide { path, id } => {
+ inst.add_local_resource(Box::new(FileSender {
+ info: ProvideInfo {
+ id: id.unwrap_or("file".to_owned()),
+ kind: "file".to_string(),
+ track_kind: None,
+ label: Some(
+ path.file_name().unwrap().to_str().unwrap().to_string(),
+ ),
+ size: Some(fs::metadata(&path).await.unwrap().size() as usize),
+ },
+ path: path.into(),
+ }))
+ .await;
+ }
+ Command::Download { id, path } => {
+ let peers = inst.peers.read().await;
+ 'outer: for p in peers.values() {
+ for (rid, r) in p.remote_provided.read().await.iter() {
+ if rid == &id {
+ if r.kind == "file" {
+ p.request_resource(id).await;
+ } else {
+ warn!("not a file");
+ }
+ break 'outer;
+ }
+ }
+ }
+ }
+ Command::Expose { port, id } => {
+ inst.add_local_resource(Box::new(PortExposer {
+ port,
+ info: ProvideInfo {
+ kind: "port".to_string(),
+ id: id.unwrap_or(format!("p{port}")),
+ track_kind: None,
+ label: Some(format!("port {port}")),
+ size: None,
+ },
+ }))
+ .await;
+ }
+ Command::Forward { id, port } => {}
},
- reader_factory: args.action,
- }))
- .await;
+ Err(err) => err.print().unwrap(),
+ },
+ Err(ReadlineError::Interrupted) => {
+ info!("interrupted; exiting...");
+ break;
+ }
+ Err(e) => Err(e)?,
}
- _ => (),
}
- inst.spawn_ping().await;
- inst.receive_loop().await;
+ // match &args.action {
+ // Action::Send { filename } => {
+ // inst.add_local_resource(Box::new(FileSender {
+ // info: ProvideInfo {
+ // id: "the-file".to_string(), // we only share a single file so its fine
+ // kind: "file".to_string(),
+ // track_kind: None,
+ // label: Some(filename.clone().unwrap_or("stdin".to_string())),
+ // size: if let Some(filename) = &filename {
+ // Some(fs::metadata(filename).await.unwrap().size() as usize)
+ // } else {
+ // None
+ // },
+ // },
+ // reader_factory: args.action,
+ // }))
+ // .await;
+ // }
+ // _ => (),
+ // }
+ Ok(())
+}
- tokio::signal::ctrl_c().await.unwrap();
- error!("interrupt received, exiting");
+struct State {
+ requested: HashMap<String, Box<dyn RequestHandler>>,
+}
+pub trait RequestHandler: Send + Sync + 'static {
+
}
#[derive(Clone)]
struct Handler {
- args: Arc<Args>,
+ state: Arc<RwLock<State>>,
}
impl EventHandler for Handler {
- fn peer_join(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
+ fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> {
Box::pin(async move {})
}
- fn peer_leave(&self, _peer: Arc<Peer>) -> client_native_lib::DynFut<()> {
+ fn peer_leave(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> {
Box::pin(async move {})
}
- fn resource_added(
- &self,
- peer: Arc<Peer>,
- info: client_native_lib::protocol::ProvideInfo,
- ) -> DynFut<()> {
+ fn resource_added(&self, peer: Arc<Peer>, info: libkeks::protocol::ProvideInfo) -> DynFut<()> {
let id = info.id.clone();
- let args = self.args.clone();
Box::pin(async move {
- match &args.action {
- Action::Receive { .. } => {
- if info.kind == "file" {
- peer.request_resource(id).await;
- }
- }
- _ => (),
- }
+ // match &args.action {
+ // Action::Receive { .. } => {
+ // if info.kind == "file" {
+ // peer.request_resource(id).await;
+ // }
+ // }
+ // _ => (),
+ // }
})
}
fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> {
@@ -151,211 +240,108 @@ impl EventHandler for Handler {
_peer: Arc<Peer>,
resource: &ProvideInfo,
channel: TransportChannel,
- ) -> client_native_lib::DynFut<()> {
+ ) -> libkeks::DynFut<()> {
let resource = resource.clone();
let s = self.clone();
Box::pin(async move {
- match channel {
- TransportChannel::Track(_) => warn!("wrong type"),
- TransportChannel::DataChannel(dc) => {
- if resource.kind != "file" {
- return error!("we got a non-file resource for some reason…");
- }
- let pos = Arc::new(AtomicUsize::new(0));
- let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
- Arc::new(RwLock::new(None));
- {
- let writer = writer.clone();
- let s = s.clone();
- dc.on_open(Box::new(move || {
- let s = s.clone();
- let writer = writer.clone();
- Box::pin(async move {
- info!("channel opened");
- *writer.write().await = Some(s.args.action.create_writer().await)
- })
- }));
- }
- {
- let writer = writer.clone();
- let args = s.args.clone();
- dc.on_close(Box::new(move || {
- let writer = writer.clone();
- let args = args.clone();
- Box::pin(async move {
- info!("channel closed");
- *writer.write().await = None;
- if args.one_file {
- exit(0);
- }
- })
- }));
- }
- {
- let writer = writer.clone();
- dc.on_message(Box::new(move |mesg| {
- let writer = writer.clone();
- let pos = pos.clone();
- Box::pin(async move {
- // TODO
- if mesg.is_string {
- let s = String::from_utf8((&mesg.data).to_vec()).unwrap();
- if &s == "end" {
- info!("EOF reached")
- }
- } else {
- let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed);
- info!(
- "recv {:?} ({} of {})",
- mesg.data.len(),
- humansize::format_size(pos, DECIMAL),
- humansize::format_size(resource.size.unwrap_or(0), DECIMAL),
- );
- writer
- .write()
- .await
- .as_mut()
- .unwrap()
- .write_all(&mesg.data)
- .await
- .unwrap();
- }
- })
- }))
- }
- dc.on_error(Box::new(move |err| {
- Box::pin(async move {
- error!("data channel errored: {err}");
- })
- }));
- }
- }
+ // match channel {
+ // TransportChannel::Track(_) => warn!("wrong type"),
+ // TransportChannel::DataChannel(dc) => {
+ // if resource.kind != "file" {
+ // return error!("we got a non-file resource for some reason…");
+ // }
+ // let pos = Arc::new(AtomicUsize::new(0));
+ // let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> =
+ // Arc::new(RwLock::new(None));
+ // {
+ // let writer = writer.clone();
+ // let s = s.clone();
+ // dc.on_open(Box::new(move || {
+ // let s = s.clone();
+ // let writer = writer.clone();
+ // Box::pin(async move {
+ // info!("channel opened");
+ // *writer.write().await = Some(s.args.action.create_writer().await)
+ // })
+ // }));
+ // }
+ // {
+ // let writer = writer.clone();
+ // dc.on_close(Box::new(move || {
+ // let writer = writer.clone();
+ // Box::pin(async move {
+ // info!("channel closed");
+ // *writer.write().await = None;
+ // exit(0);
+ // })
+ // }));
+ // }
+ // {
+ // let writer = writer.clone();
+ // dc.on_message(Box::new(move |mesg| {
+ // let writer = writer.clone();
+ // let pos = pos.clone();
+ // Box::pin(async move {
+ // // TODO
+ // if mesg.is_string {
+ // let s = String::from_utf8((&mesg.data).to_vec()).unwrap();
+ // if &s == "end" {
+ // info!("EOF reached")
+ // }
+ // } else {
+ // let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed);
+ // info!(
+ // "recv {:?} ({} of {})",
+ // mesg.data.len(),
+ // humansize::format_size(pos, DECIMAL),
+ // humansize::format_size(resource.size.unwrap_or(0), DECIMAL),
+ // );
+ // writer
+ // .write()
+ // .await
+ // .as_mut()
+ // .unwrap()
+ // .write_all(&mesg.data)
+ // .await
+ // .unwrap();
+ // }
+ // })
+ // }))
+ // }
+ // dc.on_error(Box::new(move |err| {
+ // Box::pin(async move {
+ // error!("data channel errored: {err}");
+ // })
+ // }));
+ // }
+ // }
})
}
}
-#[derive(Subcommand, Clone)]
-pub enum Action {
- /// Send a file
- Send { filename: Option<String> },
- /// Receive a file
- Receive { filename: Option<String> },
-}
-
-impl Action {
- pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> {
- match self {
- Action::Receive { filename } => {
- if let Some(filename) = filename {
- Box::pin(File::create(filename).await.unwrap())
- } else {
- Box::pin(stdout())
- }
- }
- _ => unreachable!(),
- }
- }
- pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
- match self {
- Action::Send { filename } => {
- if let Some(filename) = filename {
- Box::pin(File::open(filename).await.unwrap())
- } else {
- Box::pin(stdin())
- }
- }
- _ => unreachable!(),
- }
- }
-}
-
-struct FileSender {
- reader_factory: Action, // TODO use Box<dyn Fn() -> DynFut<dyn AsyncRead + Send + Sync> + Send + Sync>,
- info: ProvideInfo,
-}
-
-impl LocalResource for FileSender {
- fn info(&self) -> ProvideInfo {
- self.info.clone()
- }
-
- fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
- let id = self.info().id.clone();
- let total_size = self.info().size.unwrap_or(0);
- let reader_factory = self.reader_factory.clone();
- Box::pin(async move {
- let channel = peer
- .peer_connection
- .create_data_channel(&id, None)
- .await
- .unwrap();
- let pos = Arc::new(AtomicUsize::new(0));
- let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
- Arc::new(RwLock::new(None));
- {
- let reader = reader.clone();
- let reader_factory = reader_factory.clone();
- channel.on_open(Box::new(move || {
- let reader = reader.clone();
- Box::pin(async move {
- info!("channel open");
- *reader.write().await = Some(reader_factory.create_reader().await);
- })
- }))
- }
- {
- let reader = reader.clone();
- channel.on_close(Box::new(move || {
- let reader = reader.clone();
- Box::pin(async move {
- info!("channel closed");
- *reader.write().await = None;
- })
- }))
- }
- {
- let reader = reader.clone();
- let pos = pos.clone();
- let channel2 = channel.clone();
- channel
- .on_buffered_amount_low(Box::new(move || {
- let pos = pos.clone();
- let reader = reader.clone();
- let channel = channel2.clone();
- Box::pin(async move {
- debug!("buffered amount low");
- let mut buf = [0u8; 1 << 15];
- let size = reader
- .write()
- .await
- .as_mut()
- .unwrap()
- .read(&mut buf)
- .await
- .unwrap();
- if size == 0 {
- info!("reached EOF, closing channel");
- channel.close().await.unwrap();
- } else {
- let progress_size = pos.fetch_add(size, Ordering::Relaxed);
- info!(
- "sending {size} bytes ({} of {})",
- humansize::format_size(progress_size, DECIMAL),
- humansize::format_size(total_size, DECIMAL),
- );
- channel
- .send(&Bytes::copy_from_slice(&buf[..size]))
- .await
- .unwrap();
- }
- })
- }))
- .await;
- channel.set_buffered_amount_low_threshold(1).await;
- }
- channel.on_error(Box::new(move |err| {
- Box::pin(async move { error!("channel error: {err}") })
- }))
- })
- }
-}
+// impl Action {
+// pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> {
+// match self {
+// Action::Receive { filename } => {
+// if let Some(filename) = filename {
+// Box::pin(File::create(filename).await.unwrap())
+// } else {
+// Box::pin(stdout())
+// }
+// }
+// _ => unreachable!(),
+// }
+// }
+// pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
+// match self {
+// Action::Send { filename } => {
+// if let Some(filename) = filename {
+// Box::pin(File::open(filename).await.unwrap())
+// } else {
+// Box::pin(stdin())
+// }
+// }
+// _ => unreachable!(),
+// }
+// }
+// }
diff --git a/client-native-rift/src/port.rs b/client-native-rift/src/port.rs
new file mode 100644
index 0000000..ae56f2c
--- /dev/null
+++ b/client-native-rift/src/port.rs
@@ -0,0 +1,99 @@
+use bytes::Bytes;
+use libkeks::{peer::Peer, protocol::ProvideInfo, DynFut, LocalResource};
+use log::{debug, error, info, warn};
+use std::{pin::Pin, sync::Arc};
+use tokio::{
+ io::{AsyncRead, AsyncReadExt},
+ net::TcpStream,
+ sync::RwLock,
+};
+
+pub struct PortExposer {
+ pub port: u16,
+ pub info: ProvideInfo,
+}
+
+impl LocalResource for PortExposer {
+ fn info(&self) -> ProvideInfo {
+ self.info.clone()
+ }
+
+ fn on_request(&self, peer: Arc<Peer>) -> DynFut<()> {
+ let id = self.info().id.clone();
+ let port = self.port;
+ Box::pin(async move {
+ let channel = peer
+ .peer_connection
+ .create_data_channel(&id, None)
+ .await
+ .unwrap();
+ let reader: Arc<RwLock<Option<Pin<Box<dyn AsyncRead + Send + Sync>>>>> =
+ Arc::new(RwLock::new(None));
+ {
+ let reader = reader.clone();
+ let channel2 = channel.clone();
+ channel.on_open(Box::new(move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel open");
+ match TcpStream::connect(("127.0.0.1", port)).await {
+ Ok(stream) => {
+ *reader.write().await = Some(Box::pin(stream));
+ }
+ Err(e) => {
+ warn!("upstream connect failed: {e}");
+ channel2.close().await.unwrap();
+ }
+ }
+ })
+ }))
+ }
+ {
+ let reader = reader.clone();
+ channel.on_close(Box::new(move || {
+ let reader = reader.clone();
+ Box::pin(async move {
+ info!("channel closed");
+ *reader.write().await = None;
+ })
+ }))
+ }
+ {
+ let reader = reader.clone();
+ let channel2 = channel.clone();
+ channel
+ .on_buffered_amount_low(Box::new(move || {
+ let reader = reader.clone();
+ let channel = channel2.clone();
+ Box::pin(async move {
+ debug!("buffered amount low");
+ let mut buf = [0u8; 1 << 15];
+ let size = reader
+ .write()
+ .await
+ .as_mut()
+ .unwrap()
+ .read(&mut buf)
+ .await
+ .unwrap();
+ if size == 0 {
+ info!("reached EOF, closing channel");
+ let _ = channel.send_text("end").await;
+ channel.close().await.unwrap();
+ } else {
+ channel
+ .send(&Bytes::copy_from_slice(&buf[..size]))
+ .await
+ .unwrap();
+ }
+ })
+ }))
+ .await;
+ channel.set_buffered_amount_low_threshold(1).await;
+ }
+ channel.on_error(Box::new(move |err| {
+ Box::pin(async move { error!("channel error: {err}") })
+ }))
+ })
+ }
+}