diff options
author | metamuffin <yvchraiqi@protonmail.com> | 2022-08-17 22:04:32 +0200 |
---|---|---|
committer | metamuffin <yvchraiqi@protonmail.com> | 2022-08-17 22:04:32 +0200 |
commit | 4cad1fc66422f84ade7ee29f86f5a43738c065f4 (patch) | |
tree | 815e44d69cc34d42669c866d043f7d09d3172290 | |
parent | e87219d47764f82d073f464a23aa18a6f7d0c9d3 (diff) | |
download | karlender-4cad1fc66422f84ade7ee29f86f5a43738c065f4.tar karlender-4cad1fc66422f84ade7ee29f86f5a43738c065f4.tar.bz2 karlender-4cad1fc66422f84ade7ee29f86f5a43738c065f4.tar.zst |
add more interfaces
-rw-r--r-- | karlcommon/src/lib.rs | 3 | ||||
-rw-r--r-- | karld/Cargo.toml | 4 | ||||
-rw-r--r-- | karld/src/interface/generic.rs | 61 | ||||
-rw-r--r-- | karld/src/interface/mod.rs | 10 | ||||
-rw-r--r-- | karld/src/interface/stdio.rs | 13 | ||||
-rw-r--r-- | karld/src/interface/tcp.rs | 24 | ||||
-rw-r--r-- | karld/src/interface/unix.rs | 63 | ||||
-rw-r--r-- | karld/src/interface/websocket.rs | 9 |
8 files changed, 127 insertions, 60 deletions
diff --git a/karlcommon/src/lib.rs b/karlcommon/src/lib.rs index e361423..bbc0ef7 100644 --- a/karlcommon/src/lib.rs +++ b/karlcommon/src/lib.rs @@ -35,4 +35,7 @@ pub mod interfaces { pub fn websocket_addr() -> SocketAddr { SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 18752) } + pub fn tcp_addr() -> SocketAddr { + SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 18751) + } } diff --git a/karld/Cargo.toml b/karld/Cargo.toml index c8b400c..845e2ad 100644 --- a/karld/Cargo.toml +++ b/karld/Cargo.toml @@ -18,6 +18,8 @@ lazy_static = "1.4.0" tungstenite = { version = "0.17.3", optional = true } [features] -default = ["unix"] +default = ["unix", "stdio", "tcp", "websocket"] websocket = ["dep:tungstenite"] unix = [] +stdio = [] +tcp = [] diff --git a/karld/src/interface/generic.rs b/karld/src/interface/generic.rs new file mode 100644 index 0000000..4c35fdf --- /dev/null +++ b/karld/src/interface/generic.rs @@ -0,0 +1,61 @@ +use crate::handle_packet; +use karlcommon::{version, ClientboundPacket, ProtoError, ServerboundPacket}; +use log::{debug, error, warn}; +use std::{ + io::{self, BufRead, BufReader, ErrorKind, Read, Write}, + thread, +}; + +pub fn generic_handle_connection<ReadStream: Read, WriteStream: Write + Send + 'static>( + id: u32, + rstream: ReadStream, + wstream: WriteStream, +) -> io::Result<()> { + let mut reader = BufReader::new(rstream); + let (responder, responses) = crossbeam_channel::unbounded(); + responder + .send(ClientboundPacket::Handshake { + version: version!(), + }) + .unwrap(); + + thread::spawn(move || { + let mut wstream = wstream; + for m in responses { + debug!("{id} -> {m:?}"); + match wstream + .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap())) + .map_err(|e| e.kind()) + { + Ok(_) => (), + Err(ErrorKind::BrokenPipe) => break, + Err(e) => error!("network error: {:?}", e), + } + } + }); + { + let mut buf = String::new(); + loop { + if reader.read_line(&mut buf)? == 0 { + break Ok(()); + }; + match serde_json::from_str::<ServerboundPacket>(buf.as_str()) { + Ok(packet) => { + debug!("{id} <- {packet:?}"); + handle_packet(id, packet, responder.clone()); + } + Err(err) => { + warn!("client error: {:?}", &err); + responder + .send(ClientboundPacket::Error(ProtoError::FormatError(format!( + "{}", + &err + )))) + .map_err(|_| io::Error::from(ErrorKind::InvalidInput))? + } + } + + buf.clear(); + } + } +} diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs index 6122a8c..597524a 100644 --- a/karld/src/interface/mod.rs +++ b/karld/src/interface/mod.rs @@ -2,12 +2,22 @@ mod unix; #[cfg(feature = "websocket")] mod websocket; +#[cfg(feature = "stdio")] +mod stdio; +#[cfg(feature = "tcp")] +mod tcp; + +pub mod generic; pub fn start() { #[cfg(feature = "unix")] std::thread::spawn(|| unix::run()); #[cfg(feature = "websocket")] std::thread::spawn(|| websocket::run()); + #[cfg(feature = "stdio")] + std::thread::spawn(|| stdio::run()); + #[cfg(feature = "tcp")] + std::thread::spawn(|| tcp::run()); #[cfg(not(feature = "websocket"))] #[cfg(not(feature = "unix"))] diff --git a/karld/src/interface/stdio.rs b/karld/src/interface/stdio.rs new file mode 100644 index 0000000..b10dfc0 --- /dev/null +++ b/karld/src/interface/stdio.rs @@ -0,0 +1,13 @@ +use log::{error, info}; + +use crate::CLIENT_ID_COUNTER; + +use super::generic::generic_handle_connection; + +pub fn run() { + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + info!("reading packets from stdin"); + if let Err(e) = generic_handle_connection(id, std::io::stdin(), std::io::stdout()) { + error!("{e}"); + } +} diff --git a/karld/src/interface/tcp.rs b/karld/src/interface/tcp.rs new file mode 100644 index 0000000..2225f4a --- /dev/null +++ b/karld/src/interface/tcp.rs @@ -0,0 +1,24 @@ +use crate::interface::generic::generic_handle_connection; +use crate::CLIENT_ID_COUNTER; +use karlcommon::interfaces::tcp_addr; +use log::{info, warn}; +use std::net::TcpListener; +use std::thread; + +pub fn run() { + let listener = TcpListener::bind(tcp_addr()).unwrap(); + info!("listening."); + + loop { + let (stream, addr) = listener.accept().unwrap(); + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + thread::spawn(move || { + info!("client connected: {:?}", addr); + if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) { + warn!("client dropped: {:?} ({})", addr, err); + } else { + info!("client dropped: {:?}", addr); + } + }); + } +} diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs index 324b336..350be4f 100644 --- a/karld/src/interface/unix.rs +++ b/karld/src/interface/unix.rs @@ -1,11 +1,8 @@ -use crate::{handle_packet, CLIENT_ID_COUNTER}; -use karlcommon::{ - interfaces::unix_path, version, ClientboundPacket, ProtoError, ServerboundPacket, -}; -use log::{debug, error, info, warn}; -use std::io; -use std::io::{BufRead, BufReader, ErrorKind, Write}; -use std::os::unix::net::{UnixListener, UnixStream}; +use crate::interface::generic::generic_handle_connection; +use crate::CLIENT_ID_COUNTER; +use karlcommon::interfaces::unix_path; +use log::{info, warn}; +use std::os::unix::net::UnixListener; use std::thread; pub fn run() { @@ -21,7 +18,7 @@ pub fn run() { let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); thread::spawn(move || { info!("client connected: {:?}", addr); - if let Err(err) = handle_connection(id, stream) { + if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) { warn!("client dropped: {:?} ({})", addr, err); } else { info!("client dropped: {:?}", addr); @@ -29,51 +26,3 @@ pub fn run() { }); } } - -fn handle_connection(id: u32, mut stream: UnixStream) -> io::Result<()> { - let mut reader = BufReader::new(stream.try_clone()?); - let (responder, responses) = crossbeam_channel::unbounded(); - responder - .send(ClientboundPacket::Handshake { - version: version!(), - }) - .unwrap(); - thread::spawn(move || { - for m in responses { - debug!("{id} -> {m:?}"); - match stream - .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap())) - .map_err(|e| e.kind()) - { - Ok(_) => (), - Err(ErrorKind::BrokenPipe) => break, - Err(e) => error!("network error: {:?}", e), - } - } - }); - { - let mut buf = String::new(); - loop { - if reader.read_line(&mut buf)? == 0 { - break Ok(()); - }; - match serde_json::from_str::<ServerboundPacket>(buf.as_str()) { - Ok(packet) => { - debug!("{id} <- {packet:?}"); - handle_packet(id, packet, responder.clone()); - } - Err(err) => { - warn!("client error: {:?}", &err); - responder - .send(ClientboundPacket::Error(ProtoError::FormatError(format!( - "{}", - &err - )))) - .map_err(|_| io::Error::from(ErrorKind::InvalidInput))? - } - } - - buf.clear(); - } - } -} diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs index e3207b6..e983e06 100644 --- a/karld/src/interface/websocket.rs +++ b/karld/src/interface/websocket.rs @@ -1,9 +1,8 @@ -use karlcommon::{ServerboundPacket, ProtoError}; +use karlcommon::{ServerboundPacket, ProtoError, ClientboundPacket, version}; use log::{debug, error, info}; use std::net::{TcpListener, TcpStream}; use std::thread; use tungstenite::{accept, Message}; - use crate::{handle_packet, CLIENT_ID_COUNTER}; pub fn run() { @@ -28,6 +27,12 @@ fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Resul let mut websocket = accept(stream)?; let (responder, responses) = crossbeam_channel::unbounded(); + responder + .send(ClientboundPacket::Handshake { + version: version!(), + }) + .unwrap(); + loop { match websocket.read_message() { Ok(Message::Text(t)) => { |