diff options
-rw-r--r-- | karld/Cargo.toml | 2 | ||||
-rw-r--r-- | karld/src/demo.rs | 2 | ||||
-rw-r--r-- | karld/src/interface/generic.rs | 35 | ||||
-rw-r--r-- | karld/src/interface/mod.rs | 2 | ||||
-rw-r--r-- | karld/src/interface/stdio.rs | 12 | ||||
-rw-r--r-- | karld/src/interface/tcp.rs | 16 | ||||
-rw-r--r-- | karld/src/interface/unix.rs | 15 | ||||
-rw-r--r-- | karld/src/interface/websocket.rs | 34 | ||||
-rw-r--r-- | karld/src/main.rs | 27 | ||||
-rw-r--r-- | karlgui/src/globals.rs | 7 | ||||
-rw-r--r-- | karlgui/src/views/mod.rs | 2 |
11 files changed, 81 insertions, 73 deletions
diff --git a/karld/Cargo.toml b/karld/Cargo.toml index e5f79bd..e4accc0 100644 --- a/karld/Cargo.toml +++ b/karld/Cargo.toml @@ -18,7 +18,7 @@ lazy_static = "1.4.0" tungstenite = { version = "0.17.3", optional = true } [features] -default = ["unix"] +default = ["full"] full = ["unix", "stdio", "tcp", "websocket"] websocket = ["dep:tungstenite"] unix = [] diff --git a/karld/src/demo.rs b/karld/src/demo.rs index fb5b43a..80eff7d 100644 --- a/karld/src/demo.rs +++ b/karld/src/demo.rs @@ -1,4 +1,4 @@ -use karlcommon::{Task, Schedule, Condition, Property}; +use karlcommon::{Condition, Property, Schedule, Task}; use crate::TASKS; diff --git a/karld/src/interface/generic.rs b/karld/src/interface/generic.rs index 4c35fdf..e9d57b5 100644 --- a/karld/src/interface/generic.rs +++ b/karld/src/interface/generic.rs @@ -1,23 +1,42 @@ -use crate::handle_packet; +use crate::{handle_packet, CLIENTS, CLIENT_ID_COUNTER}; +use crossbeam_channel::{Receiver, Sender}; use karlcommon::{version, ClientboundPacket, ProtoError, ServerboundPacket}; -use log::{debug, error, warn}; +use log::{debug, error, info, warn}; use std::{ io::{self, BufRead, BufReader, ErrorKind, Read, Write}, thread, }; -pub fn generic_handle_connection<ReadStream: Read, WriteStream: Write + Send + 'static>( - id: u32, - rstream: ReadStream, - wstream: WriteStream, -) -> io::Result<()> { - let mut reader = BufReader::new(rstream); +pub fn handle_connection<F, T>(handle_client: F, arg: T) +where + F: FnOnce( + u32, + (Sender<ClientboundPacket>, Receiver<ClientboundPacket>), + T, + ) -> anyhow::Result<()>, +{ + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let (responder, responses) = crossbeam_channel::unbounded(); responder .send(ClientboundPacket::Handshake { version: version!(), }) .unwrap(); + info!("client connected: {id}"); + CLIENTS.write().unwrap().insert(id, responder.clone()); + match handle_client(id, (responder, responses), arg) { + Ok(_) => info!("client ({id}) dropped properly"), + Err(e) => error!("client ({id}) dropped bc error: {e}"), + } + CLIENTS.write().unwrap().remove(&id); +} + +pub fn stream<ReadStream: Read, WriteStream: Write + Send + 'static>( + id: u32, + (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>), + (rstream, wstream): (ReadStream, WriteStream), +) -> anyhow::Result<()> { + let mut reader = BufReader::new(rstream); thread::spawn(move || { let mut wstream = wstream; diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs index 3c465d4..3a0c45e 100644 --- a/karld/src/interface/mod.rs +++ b/karld/src/interface/mod.rs @@ -21,5 +21,7 @@ pub fn start() { #[cfg(not(feature = "websocket"))] #[cfg(not(feature = "unix"))] + #[cfg(not(feature = "tcp"))] + #[cfg(not(feature = "stdio"))] log::warn!("no interfaces enabled, daemon will be inaccesssible") } diff --git a/karld/src/interface/stdio.rs b/karld/src/interface/stdio.rs index b10dfc0..f4dcac7 100644 --- a/karld/src/interface/stdio.rs +++ b/karld/src/interface/stdio.rs @@ -1,13 +1,7 @@ -use log::{error, info}; - -use crate::CLIENT_ID_COUNTER; - -use super::generic::generic_handle_connection; +use super::generic; +use log::info; pub fn run() { - let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); info!("reading packets from stdin"); - if let Err(e) = generic_handle_connection(id, std::io::stdin(), std::io::stdout()) { - error!("{e}"); - } + generic::handle_connection(generic::stream, (std::io::stdin(), std::io::stdout())); } diff --git a/karld/src/interface/tcp.rs b/karld/src/interface/tcp.rs index 296701b..94e961f 100644 --- a/karld/src/interface/tcp.rs +++ b/karld/src/interface/tcp.rs @@ -1,7 +1,6 @@ -use crate::interface::generic::generic_handle_connection; -use crate::CLIENT_ID_COUNTER; +use crate::interface::generic; use karlcommon::interfaces::tcp_addr; -use log::{info, warn}; +use log::info; use std::net::TcpListener; use std::thread; @@ -9,17 +8,10 @@ pub fn run() { info!("binding to socket"); let listener = TcpListener::bind(tcp_addr()).unwrap(); info!("listening."); - loop { - let (stream, addr) = listener.accept().unwrap(); - let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let (stream, _addr) = listener.accept().unwrap(); thread::spawn(move || { - info!("client connected: {:?}", addr); - if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) { - warn!("client dropped: {:?} ({})", addr, err); - } else { - info!("client dropped: {:?}", addr); - } + generic::handle_connection(generic::stream, (stream.try_clone().unwrap(), stream)) }); } } diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs index 620ff64..9989b0f 100644 --- a/karld/src/interface/unix.rs +++ b/karld/src/interface/unix.rs @@ -1,7 +1,6 @@ -use crate::interface::generic::generic_handle_connection; -use crate::CLIENT_ID_COUNTER; +use crate::interface::generic; use karlcommon::interfaces::unix_path; -use log::{info, warn}; +use log::info; use std::os::unix::net::UnixListener; use std::thread; @@ -15,15 +14,9 @@ pub fn run() { info!("listening."); loop { - let (stream, addr) = listener.accept().unwrap(); - let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let (stream, _addr) = listener.accept().unwrap(); thread::spawn(move || { - info!("client connected: {:?}", addr); - if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) { - warn!("client dropped: {:?} ({})", addr, err); - } else { - info!("client dropped: {:?}", addr); - } + generic::handle_connection(generic::stream, (stream.try_clone().unwrap(), stream)) }); } } diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs index da7d918..624c28c 100644 --- a/karld/src/interface/websocket.rs +++ b/karld/src/interface/websocket.rs @@ -1,37 +1,29 @@ -use karlcommon::{ServerboundPacket, ProtoError, ClientboundPacket, version}; -use log::{debug, error, info}; +use crate::handle_packet; +use crate::interface::generic; +use crossbeam_channel::{Receiver, Sender}; +use karlcommon::{ClientboundPacket, ProtoError, ServerboundPacket}; +use log::{debug, info}; use std::net::{TcpListener, TcpStream}; use std::thread; use tungstenite::{accept, Message}; -use crate::{handle_packet, CLIENT_ID_COUNTER}; pub fn run() { info!("binding websocket server"); let server = TcpListener::bind("127.0.0.1:9001").unwrap(); info!("listening"); - for stream in server.incoming() { - thread::spawn(move || { - if let Err(e) = handle_connection(stream) { - error!("{e}"); - } - }); + thread::spawn(move || generic::handle_connection(handle_connection, stream)); } } -fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Result<()> { - let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - +fn handle_connection( + id: u32, + (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>), + stream: Result<TcpStream, std::io::Error>, +) -> anyhow::Result<()> { let stream = stream?; stream.set_nonblocking(true)?; let mut websocket = accept(stream)?; - - let (responder, responses) = crossbeam_channel::unbounded(); - responder - .send(ClientboundPacket::Handshake { - version: version!(), - }) - .unwrap(); loop { match websocket.read_message() { @@ -50,14 +42,14 @@ fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Resul Err(tungstenite::Error::ConnectionClosed) => { break Ok(()); } - Err(tungstenite::Error::Io(e)) + Err(tungstenite::Error::Io(e)) if let std::io::ErrorKind::WouldBlock = e.kind() => { // its fine } Err(e) => Err(e)?, } for r in responses.try_iter() { - websocket.write_message(Message::Text( serde_json::to_string(&r)?))?; + websocket.write_message(Message::Text(serde_json::to_string(&r)?))?; } thread::sleep(std::time::Duration::from_millis(50)); // how would you do this properly?? } diff --git a/karld/src/main.rs b/karld/src/main.rs index d5dfd5f..6f92fe5 100644 --- a/karld/src/main.rs +++ b/karld/src/main.rs @@ -47,27 +47,39 @@ fn main() { lazy_static::lazy_static! { static ref TASKS: RwLock<HashMap<u64, Task>> = RwLock::new(HashMap::new()); + static ref CLIENTS: RwLock<HashMap<u32, Sender<ClientboundPacket>>> = RwLock::new(HashMap::new()); static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::new(0); } +fn broadcast_invalidation() { + CLIENTS + .write() + .unwrap() + .values() + .for_each(|r| drop(r.send(ClientboundPacket::InvalidateState))); +} + pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender<ClientboundPacket>) { // std::thread::sleep(std::time::Duration::from_millis(75)); // for testing clients with latency match packet { ServerboundPacket::Sync => { - let _ = responder.send(ClientboundPacket::Sync); + drop(responder.send(ClientboundPacket::Sync)); } ServerboundPacket::ListTasks => { - let _ = responder.send(ClientboundPacket::TaskList( + drop(responder.send(ClientboundPacket::TaskList( TASKS.read().unwrap().values().map(|e| e.clone()).collect(), - )); + ))); } ServerboundPacket::UpdateTask(t) => { TASKS.write().unwrap().insert(t.id, t); + broadcast_invalidation(); savestate::save(); } ServerboundPacket::RemoveTask(i) => { - if TASKS.write().unwrap().remove(&i).is_none() { - let _ = responder.send(ClientboundPacket::Error(ProtoError::UnknownTask)); + if TASKS.write().unwrap().remove(&i).is_some() { + broadcast_invalidation(); + } else { + drop(responder.send(ClientboundPacket::Error(ProtoError::UnknownTask))); } savestate::save(); } @@ -78,8 +90,7 @@ pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender<C let t = match TASKS.read().unwrap().get(&task).cloned() { Some(t) => t, None => { - let _ = responder.send(ClientboundPacket::Error(ProtoError::UnknownTask)); - return; + return drop(responder.send(ClientboundPacket::Error(ProtoError::UnknownTask))) } }; @@ -113,7 +124,7 @@ pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender<C } } } - let _ = responder.send(ClientboundPacket::InstanceList(ocs)); + drop(responder.send(ClientboundPacket::InstanceList(ocs))); } } } diff --git a/karlgui/src/globals.rs b/karlgui/src/globals.rs index fca2e86..f757f2e 100644 --- a/karlgui/src/globals.rs +++ b/karlgui/src/globals.rs @@ -26,7 +26,7 @@ impl Globals { } } pub fn update_network(&mut self) { - for p in self.client.receiver.try_iter() { + while let Ok(p) = self.client.receiver.try_recv() { match p { ClientboundPacket::TaskList(t) => { self.tasks = HashMap::from_iter(t.into_iter().map(|e| (e.id, e))); @@ -48,6 +48,11 @@ impl Globals { warn!("got unknown instance list packet") } } + ClientboundPacket::InvalidateState => { + self.instance_cache.clear(); + self.tasks.clear(); + self.client.send_sync(ServerboundPacket::ListTasks); + } _ => {} } } diff --git a/karlgui/src/views/mod.rs b/karlgui/src/views/mod.rs index 9c6ea7c..4ed65a0 100644 --- a/karlgui/src/views/mod.rs +++ b/karlgui/src/views/mod.rs @@ -1,2 +1,2 @@ -pub mod edit; pub mod calendar; +pub mod edit; |