diff options
Diffstat (limited to 'karld/src')
-rw-r--r-- | karld/src/interface/mod.rs | 3 | ||||
-rw-r--r-- | karld/src/interface/unix.rs | 10 | ||||
-rw-r--r-- | karld/src/interface/websocket.rs | 85 | ||||
-rw-r--r-- | karld/src/main.rs | 5 |
4 files changed, 60 insertions, 43 deletions
diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs index 38c9895..3e13aa1 100644 --- a/karld/src/interface/mod.rs +++ b/karld/src/interface/mod.rs @@ -1,5 +1,8 @@ use std::thread; + +#[cfg(feature = "unix")] mod unix; +#[cfg(feature = "websocket")] mod websocket; pub fn start() { diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs index 8d038d4..324b336 100644 --- a/karld/src/interface/unix.rs +++ b/karld/src/interface/unix.rs @@ -1,5 +1,7 @@ use crate::{handle_packet, CLIENT_ID_COUNTER}; -use karlcommon::{socket_path, version, ClientboundPacket, ProtoError, ServerboundPacket}; +use karlcommon::{ + interfaces::unix_path, version, ClientboundPacket, ProtoError, ServerboundPacket, +}; use log::{debug, error, info, warn}; use std::io; use std::io::{BufRead, BufReader, ErrorKind, Write}; @@ -7,11 +9,11 @@ use std::os::unix::net::{UnixListener, UnixStream}; use std::thread; pub fn run() { - if socket_path().exists() { + if unix_path().exists() { info!("remove old socket"); - std::fs::remove_file(socket_path()).unwrap(); + std::fs::remove_file(unix_path()).unwrap(); } - let listener = UnixListener::bind(socket_path()).unwrap(); + let listener = UnixListener::bind(unix_path()).unwrap(); info!("listening."); loop { diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs index b69de20..e3207b6 100644 --- a/karld/src/interface/websocket.rs +++ b/karld/src/interface/websocket.rs @@ -1,46 +1,59 @@ +use karlcommon::{ServerboundPacket, ProtoError}; +use log::{debug, error, info}; +use std::net::{TcpListener, TcpStream}; use std::thread; -use websocket::sync::Server; -use websocket::OwnedMessage; +use tungstenite::{accept, Message}; + +use crate::{handle_packet, CLIENT_ID_COUNTER}; pub fn run() { - let server = Server::bind("127.0.0.1:2794").unwrap(); + info!("binding websocket server"); + let server = TcpListener::bind("127.0.0.1:9001").unwrap(); + info!("listening"); - for request in server.filter_map(Result::ok) { - // Spawn a new thread for each connection. - thread::spawn(|| { - if !request.protocols().contains(&"rust-websocket".to_string()) { - request.reject().unwrap(); - return; + for stream in server.incoming() { + thread::spawn(move || { + if let Err(e) = handle_connection(stream) { + error!("{e}"); } + }); + } +} - let mut client = request.use_protocol("rust-websocket").accept().unwrap(); - - let ip = client.peer_addr().unwrap(); - - println!("Connection from {}", ip); - - let message = OwnedMessage::Text("Hello".to_string()); - client.send_message(&message).unwrap(); - - let (mut receiver, mut sender) = client.split().unwrap(); - - for message in receiver.incoming_messages() { - let message = message.unwrap(); - - match message { - OwnedMessage::Close(_) => { - let message = OwnedMessage::Close(None); - sender.send_message(&message).unwrap(); - println!("Client {} disconnected", ip); - return; - } - OwnedMessage::Ping(ping) => { - let message = OwnedMessage::Pong(ping); - sender.send_message(&message).unwrap(); - } - _ => sender.send_message(&message).unwrap(), +fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Result<()> { + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let stream = stream?; + stream.set_nonblocking(true)?; + let mut websocket = accept(stream)?; + + let (responder, responses) = crossbeam_channel::unbounded(); + loop { + match websocket.read_message() { + Ok(Message::Text(t)) => { + debug!("<- {t:?}"); + match serde_json::from_str::<ServerboundPacket>(&t) { + Ok(packet) => { + handle_packet(id, packet, responder.clone()) + }, + Err(e) => { + responder.send(karlcommon::ClientboundPacket::Error(ProtoError::FormatError(format!("{e}"))))? + }, } } - }); + Ok(_) => (), + Err(tungstenite::Error::ConnectionClosed) => { + break Ok(()); + } + Err(tungstenite::Error::Io(e)) + if let std::io::ErrorKind::WouldBlock = e.kind() => { + // its fine^ + } + Err(e) => Err(e)?, + } + for r in responses.try_iter() { + websocket.write_message(Message::Text( serde_json::to_string(&r)?))?; + } + thread::sleep(std::time::Duration::from_millis(50)); // how would you do this properly?? } } diff --git a/karld/src/main.rs b/karld/src/main.rs index 2b95cb7..3e85b04 100644 --- a/karld/src/main.rs +++ b/karld/src/main.rs @@ -1,5 +1,6 @@ #![feature(box_syntax)] #![feature(fs_try_exists)] +#![feature(if_let_guard)] pub mod condition; pub mod demo; @@ -12,9 +13,7 @@ use chrono::NaiveDateTime; use condition::ConditionFind; use crossbeam_channel::Sender; use helper::Overlaps; -use karlcommon::{ - ClientboundPacket, Condition, Property, ProtoError, Schedule, ServerboundPacket, Task, -}; +use karlcommon::{ClientboundPacket, ProtoError, Schedule, ServerboundPacket, Task}; use log::{debug, error, info}; use std::{ collections::HashMap, |