diff options
Diffstat (limited to 'karld/src/interface/websocket.rs')
-rw-r--r-- | karld/src/interface/websocket.rs | 85 |
1 files changed, 49 insertions, 36 deletions
diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs index b69de20..e3207b6 100644 --- a/karld/src/interface/websocket.rs +++ b/karld/src/interface/websocket.rs @@ -1,46 +1,59 @@ +use karlcommon::{ServerboundPacket, ProtoError}; +use log::{debug, error, info}; +use std::net::{TcpListener, TcpStream}; use std::thread; -use websocket::sync::Server; -use websocket::OwnedMessage; +use tungstenite::{accept, Message}; + +use crate::{handle_packet, CLIENT_ID_COUNTER}; pub fn run() { - let server = Server::bind("127.0.0.1:2794").unwrap(); + info!("binding websocket server"); + let server = TcpListener::bind("127.0.0.1:9001").unwrap(); + info!("listening"); - for request in server.filter_map(Result::ok) { - // Spawn a new thread for each connection. - thread::spawn(|| { - if !request.protocols().contains(&"rust-websocket".to_string()) { - request.reject().unwrap(); - return; + for stream in server.incoming() { + thread::spawn(move || { + if let Err(e) = handle_connection(stream) { + error!("{e}"); } + }); + } +} - let mut client = request.use_protocol("rust-websocket").accept().unwrap(); - - let ip = client.peer_addr().unwrap(); - - println!("Connection from {}", ip); - - let message = OwnedMessage::Text("Hello".to_string()); - client.send_message(&message).unwrap(); - - let (mut receiver, mut sender) = client.split().unwrap(); - - for message in receiver.incoming_messages() { - let message = message.unwrap(); - - match message { - OwnedMessage::Close(_) => { - let message = OwnedMessage::Close(None); - sender.send_message(&message).unwrap(); - println!("Client {} disconnected", ip); - return; - } - OwnedMessage::Ping(ping) => { - let message = OwnedMessage::Pong(ping); - sender.send_message(&message).unwrap(); - } - _ => sender.send_message(&message).unwrap(), +fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Result<()> { + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let stream = stream?; + stream.set_nonblocking(true)?; + let mut websocket = accept(stream)?; + + let (responder, responses) = crossbeam_channel::unbounded(); + loop { + match websocket.read_message() { + Ok(Message::Text(t)) => { + debug!("<- {t:?}"); + match serde_json::from_str::<ServerboundPacket>(&t) { + Ok(packet) => { + handle_packet(id, packet, responder.clone()) + }, + Err(e) => { + responder.send(karlcommon::ClientboundPacket::Error(ProtoError::FormatError(format!("{e}"))))? + }, } } - }); + Ok(_) => (), + Err(tungstenite::Error::ConnectionClosed) => { + break Ok(()); + } + Err(tungstenite::Error::Io(e)) + if let std::io::ErrorKind::WouldBlock = e.kind() => { + // its fine^ + } + Err(e) => Err(e)?, + } + for r in responses.try_iter() { + websocket.write_message(Message::Text( serde_json::to_string(&r)?))?; + } + thread::sleep(std::time::Duration::from_millis(50)); // how would you do this properly?? } } |