aboutsummaryrefslogtreecommitdiff
path: root/karld/src/interface
diff options
context:
space:
mode:
Diffstat (limited to 'karld/src/interface')
-rw-r--r--karld/src/interface/generic.rs35
-rw-r--r--karld/src/interface/mod.rs2
-rw-r--r--karld/src/interface/stdio.rs12
-rw-r--r--karld/src/interface/tcp.rs16
-rw-r--r--karld/src/interface/unix.rs15
-rw-r--r--karld/src/interface/websocket.rs34
6 files changed, 53 insertions, 61 deletions
diff --git a/karld/src/interface/generic.rs b/karld/src/interface/generic.rs
index 4c35fdf..e9d57b5 100644
--- a/karld/src/interface/generic.rs
+++ b/karld/src/interface/generic.rs
@@ -1,23 +1,42 @@
-use crate::handle_packet;
+use crate::{handle_packet, CLIENTS, CLIENT_ID_COUNTER};
+use crossbeam_channel::{Receiver, Sender};
use karlcommon::{version, ClientboundPacket, ProtoError, ServerboundPacket};
-use log::{debug, error, warn};
+use log::{debug, error, info, warn};
use std::{
io::{self, BufRead, BufReader, ErrorKind, Read, Write},
thread,
};
-pub fn generic_handle_connection<ReadStream: Read, WriteStream: Write + Send + 'static>(
- id: u32,
- rstream: ReadStream,
- wstream: WriteStream,
-) -> io::Result<()> {
- let mut reader = BufReader::new(rstream);
+pub fn handle_connection<F, T>(handle_client: F, arg: T)
+where
+ F: FnOnce(
+ u32,
+ (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
+ T,
+ ) -> anyhow::Result<()>,
+{
+ let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (responder, responses) = crossbeam_channel::unbounded();
responder
.send(ClientboundPacket::Handshake {
version: version!(),
})
.unwrap();
+ info!("client connected: {id}");
+ CLIENTS.write().unwrap().insert(id, responder.clone());
+ match handle_client(id, (responder, responses), arg) {
+ Ok(_) => info!("client ({id}) dropped properly"),
+ Err(e) => error!("client ({id}) dropped bc error: {e}"),
+ }
+ CLIENTS.write().unwrap().remove(&id);
+}
+
+pub fn stream<ReadStream: Read, WriteStream: Write + Send + 'static>(
+ id: u32,
+ (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
+ (rstream, wstream): (ReadStream, WriteStream),
+) -> anyhow::Result<()> {
+ let mut reader = BufReader::new(rstream);
thread::spawn(move || {
let mut wstream = wstream;
diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs
index 3c465d4..3a0c45e 100644
--- a/karld/src/interface/mod.rs
+++ b/karld/src/interface/mod.rs
@@ -21,5 +21,7 @@ pub fn start() {
#[cfg(not(feature = "websocket"))]
#[cfg(not(feature = "unix"))]
+ #[cfg(not(feature = "tcp"))]
+ #[cfg(not(feature = "stdio"))]
log::warn!("no interfaces enabled, daemon will be inaccesssible")
}
diff --git a/karld/src/interface/stdio.rs b/karld/src/interface/stdio.rs
index b10dfc0..f4dcac7 100644
--- a/karld/src/interface/stdio.rs
+++ b/karld/src/interface/stdio.rs
@@ -1,13 +1,7 @@
-use log::{error, info};
-
-use crate::CLIENT_ID_COUNTER;
-
-use super::generic::generic_handle_connection;
+use super::generic;
+use log::info;
pub fn run() {
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
info!("reading packets from stdin");
- if let Err(e) = generic_handle_connection(id, std::io::stdin(), std::io::stdout()) {
- error!("{e}");
- }
+ generic::handle_connection(generic::stream, (std::io::stdin(), std::io::stdout()));
}
diff --git a/karld/src/interface/tcp.rs b/karld/src/interface/tcp.rs
index 296701b..94e961f 100644
--- a/karld/src/interface/tcp.rs
+++ b/karld/src/interface/tcp.rs
@@ -1,7 +1,6 @@
-use crate::interface::generic::generic_handle_connection;
-use crate::CLIENT_ID_COUNTER;
+use crate::interface::generic;
use karlcommon::interfaces::tcp_addr;
-use log::{info, warn};
+use log::info;
use std::net::TcpListener;
use std::thread;
@@ -9,17 +8,10 @@ pub fn run() {
info!("binding to socket");
let listener = TcpListener::bind(tcp_addr()).unwrap();
info!("listening.");
-
loop {
- let (stream, addr) = listener.accept().unwrap();
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let (stream, _addr) = listener.accept().unwrap();
thread::spawn(move || {
- info!("client connected: {:?}", addr);
- if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) {
- warn!("client dropped: {:?} ({})", addr, err);
- } else {
- info!("client dropped: {:?}", addr);
- }
+ generic::handle_connection(generic::stream, (stream.try_clone().unwrap(), stream))
});
}
}
diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs
index 620ff64..9989b0f 100644
--- a/karld/src/interface/unix.rs
+++ b/karld/src/interface/unix.rs
@@ -1,7 +1,6 @@
-use crate::interface::generic::generic_handle_connection;
-use crate::CLIENT_ID_COUNTER;
+use crate::interface::generic;
use karlcommon::interfaces::unix_path;
-use log::{info, warn};
+use log::info;
use std::os::unix::net::UnixListener;
use std::thread;
@@ -15,15 +14,9 @@ pub fn run() {
info!("listening.");
loop {
- let (stream, addr) = listener.accept().unwrap();
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let (stream, _addr) = listener.accept().unwrap();
thread::spawn(move || {
- info!("client connected: {:?}", addr);
- if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) {
- warn!("client dropped: {:?} ({})", addr, err);
- } else {
- info!("client dropped: {:?}", addr);
- }
+ generic::handle_connection(generic::stream, (stream.try_clone().unwrap(), stream))
});
}
}
diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs
index da7d918..624c28c 100644
--- a/karld/src/interface/websocket.rs
+++ b/karld/src/interface/websocket.rs
@@ -1,37 +1,29 @@
-use karlcommon::{ServerboundPacket, ProtoError, ClientboundPacket, version};
-use log::{debug, error, info};
+use crate::handle_packet;
+use crate::interface::generic;
+use crossbeam_channel::{Receiver, Sender};
+use karlcommon::{ClientboundPacket, ProtoError, ServerboundPacket};
+use log::{debug, info};
use std::net::{TcpListener, TcpStream};
use std::thread;
use tungstenite::{accept, Message};
-use crate::{handle_packet, CLIENT_ID_COUNTER};
pub fn run() {
info!("binding websocket server");
let server = TcpListener::bind("127.0.0.1:9001").unwrap();
info!("listening");
-
for stream in server.incoming() {
- thread::spawn(move || {
- if let Err(e) = handle_connection(stream) {
- error!("{e}");
- }
- });
+ thread::spawn(move || generic::handle_connection(handle_connection, stream));
}
}
-fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Result<()> {
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
-
+fn handle_connection(
+ id: u32,
+ (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
+ stream: Result<TcpStream, std::io::Error>,
+) -> anyhow::Result<()> {
let stream = stream?;
stream.set_nonblocking(true)?;
let mut websocket = accept(stream)?;
-
- let (responder, responses) = crossbeam_channel::unbounded();
- responder
- .send(ClientboundPacket::Handshake {
- version: version!(),
- })
- .unwrap();
loop {
match websocket.read_message() {
@@ -50,14 +42,14 @@ fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Resul
Err(tungstenite::Error::ConnectionClosed) => {
break Ok(());
}
- Err(tungstenite::Error::Io(e))
+ Err(tungstenite::Error::Io(e))
if let std::io::ErrorKind::WouldBlock = e.kind() => {
// its fine
}
Err(e) => Err(e)?,
}
for r in responses.try_iter() {
- websocket.write_message(Message::Text( serde_json::to_string(&r)?))?;
+ websocket.write_message(Message::Text(serde_json::to_string(&r)?))?;
}
thread::sleep(std::time::Duration::from_millis(50)); // how would you do this properly??
}