diff options
author | metamuffin <yvchraiqi@protonmail.com> | 2022-08-17 17:48:42 +0200 |
---|---|---|
committer | metamuffin <yvchraiqi@protonmail.com> | 2022-08-17 17:48:42 +0200 |
commit | 9856b281b429fa3ba13b64f6c6cd99b3d05d1a2f (patch) | |
tree | dc576c31200c1059d658915688cca1fd03ef9424 /karld/src/interface | |
parent | 39e9e403cddbb72439a9b141aac14c434df3a745 (diff) | |
download | karlender-9856b281b429fa3ba13b64f6c6cd99b3d05d1a2f.tar karlender-9856b281b429fa3ba13b64f6c6cd99b3d05d1a2f.tar.bz2 karlender-9856b281b429fa3ba13b64f6c6cd99b3d05d1a2f.tar.zst |
modularize interfaces
Diffstat (limited to 'karld/src/interface')
-rw-r--r-- | karld/src/interface/mod.rs | 10 | ||||
-rw-r--r-- | karld/src/interface/unix.rs | 77 | ||||
-rw-r--r-- | karld/src/interface/websocket.rs | 46 |
3 files changed, 133 insertions, 0 deletions
diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs new file mode 100644 index 0000000..38c9895 --- /dev/null +++ b/karld/src/interface/mod.rs @@ -0,0 +1,10 @@ +use std::thread; +mod unix; +mod websocket; + +pub fn start() { + #[cfg(feature = "unix")] + thread::spawn(|| unix::run()); + #[cfg(feature = "websocket")] + thread::spawn(|| websocket::run()); +} diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs new file mode 100644 index 0000000..8d038d4 --- /dev/null +++ b/karld/src/interface/unix.rs @@ -0,0 +1,77 @@ +use crate::{handle_packet, CLIENT_ID_COUNTER}; +use karlcommon::{socket_path, version, ClientboundPacket, ProtoError, ServerboundPacket}; +use log::{debug, error, info, warn}; +use std::io; +use std::io::{BufRead, BufReader, ErrorKind, Write}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::thread; + +pub fn run() { + if socket_path().exists() { + info!("remove old socket"); + std::fs::remove_file(socket_path()).unwrap(); + } + let listener = UnixListener::bind(socket_path()).unwrap(); + info!("listening."); + + loop { + let (stream, addr) = listener.accept().unwrap(); + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + thread::spawn(move || { + info!("client connected: {:?}", addr); + if let Err(err) = handle_connection(id, stream) { + warn!("client dropped: {:?} ({})", addr, err); + } else { + info!("client dropped: {:?}", addr); + } + }); + } +} + +fn handle_connection(id: u32, mut stream: UnixStream) -> io::Result<()> { + let mut reader = BufReader::new(stream.try_clone()?); + let (responder, responses) = crossbeam_channel::unbounded(); + responder + .send(ClientboundPacket::Handshake { + version: version!(), + }) + .unwrap(); + thread::spawn(move || { + for m in responses { + debug!("{id} -> {m:?}"); + match stream + .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap())) + .map_err(|e| e.kind()) + { + Ok(_) => (), + Err(ErrorKind::BrokenPipe) => break, + Err(e) => error!("network error: {:?}", e), + } + } + }); + { + let mut buf = String::new(); + loop { + if reader.read_line(&mut buf)? == 0 { + break Ok(()); + }; + match serde_json::from_str::<ServerboundPacket>(buf.as_str()) { + Ok(packet) => { + debug!("{id} <- {packet:?}"); + handle_packet(id, packet, responder.clone()); + } + Err(err) => { + warn!("client error: {:?}", &err); + responder + .send(ClientboundPacket::Error(ProtoError::FormatError(format!( + "{}", + &err + )))) + .map_err(|_| io::Error::from(ErrorKind::InvalidInput))? + } + } + + buf.clear(); + } + } +} diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs new file mode 100644 index 0000000..b69de20 --- /dev/null +++ b/karld/src/interface/websocket.rs @@ -0,0 +1,46 @@ +use std::thread; +use websocket::sync::Server; +use websocket::OwnedMessage; + +pub fn run() { + let server = Server::bind("127.0.0.1:2794").unwrap(); + + for request in server.filter_map(Result::ok) { + // Spawn a new thread for each connection. + thread::spawn(|| { + if !request.protocols().contains(&"rust-websocket".to_string()) { + request.reject().unwrap(); + return; + } + + let mut client = request.use_protocol("rust-websocket").accept().unwrap(); + + let ip = client.peer_addr().unwrap(); + + println!("Connection from {}", ip); + + let message = OwnedMessage::Text("Hello".to_string()); + client.send_message(&message).unwrap(); + + let (mut receiver, mut sender) = client.split().unwrap(); + + for message in receiver.incoming_messages() { + let message = message.unwrap(); + + match message { + OwnedMessage::Close(_) => { + let message = OwnedMessage::Close(None); + sender.send_message(&message).unwrap(); + println!("Client {} disconnected", ip); + return; + } + OwnedMessage::Ping(ping) => { + let message = OwnedMessage::Pong(ping); + sender.send_message(&message).unwrap(); + } + _ => sender.send_message(&message).unwrap(), + } + } + }); + } +} |