aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--karld/Cargo.toml2
-rw-r--r--karld/src/demo.rs2
-rw-r--r--karld/src/interface/generic.rs35
-rw-r--r--karld/src/interface/mod.rs2
-rw-r--r--karld/src/interface/stdio.rs12
-rw-r--r--karld/src/interface/tcp.rs16
-rw-r--r--karld/src/interface/unix.rs15
-rw-r--r--karld/src/interface/websocket.rs34
-rw-r--r--karld/src/main.rs27
-rw-r--r--karlgui/src/globals.rs7
-rw-r--r--karlgui/src/views/mod.rs2
11 files changed, 81 insertions, 73 deletions
diff --git a/karld/Cargo.toml b/karld/Cargo.toml
index e5f79bd..e4accc0 100644
--- a/karld/Cargo.toml
+++ b/karld/Cargo.toml
@@ -18,7 +18,7 @@ lazy_static = "1.4.0"
tungstenite = { version = "0.17.3", optional = true }
[features]
-default = ["unix"]
+default = ["full"]
full = ["unix", "stdio", "tcp", "websocket"]
websocket = ["dep:tungstenite"]
unix = []
diff --git a/karld/src/demo.rs b/karld/src/demo.rs
index fb5b43a..80eff7d 100644
--- a/karld/src/demo.rs
+++ b/karld/src/demo.rs
@@ -1,4 +1,4 @@
-use karlcommon::{Task, Schedule, Condition, Property};
+use karlcommon::{Condition, Property, Schedule, Task};
use crate::TASKS;
diff --git a/karld/src/interface/generic.rs b/karld/src/interface/generic.rs
index 4c35fdf..e9d57b5 100644
--- a/karld/src/interface/generic.rs
+++ b/karld/src/interface/generic.rs
@@ -1,23 +1,42 @@
-use crate::handle_packet;
+use crate::{handle_packet, CLIENTS, CLIENT_ID_COUNTER};
+use crossbeam_channel::{Receiver, Sender};
use karlcommon::{version, ClientboundPacket, ProtoError, ServerboundPacket};
-use log::{debug, error, warn};
+use log::{debug, error, info, warn};
use std::{
io::{self, BufRead, BufReader, ErrorKind, Read, Write},
thread,
};
-pub fn generic_handle_connection<ReadStream: Read, WriteStream: Write + Send + 'static>(
- id: u32,
- rstream: ReadStream,
- wstream: WriteStream,
-) -> io::Result<()> {
- let mut reader = BufReader::new(rstream);
+pub fn handle_connection<F, T>(handle_client: F, arg: T)
+where
+ F: FnOnce(
+ u32,
+ (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
+ T,
+ ) -> anyhow::Result<()>,
+{
+ let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (responder, responses) = crossbeam_channel::unbounded();
responder
.send(ClientboundPacket::Handshake {
version: version!(),
})
.unwrap();
+ info!("client connected: {id}");
+ CLIENTS.write().unwrap().insert(id, responder.clone());
+ match handle_client(id, (responder, responses), arg) {
+ Ok(_) => info!("client ({id}) dropped properly"),
+ Err(e) => error!("client ({id}) dropped bc error: {e}"),
+ }
+ CLIENTS.write().unwrap().remove(&id);
+}
+
+pub fn stream<ReadStream: Read, WriteStream: Write + Send + 'static>(
+ id: u32,
+ (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
+ (rstream, wstream): (ReadStream, WriteStream),
+) -> anyhow::Result<()> {
+ let mut reader = BufReader::new(rstream);
thread::spawn(move || {
let mut wstream = wstream;
diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs
index 3c465d4..3a0c45e 100644
--- a/karld/src/interface/mod.rs
+++ b/karld/src/interface/mod.rs
@@ -21,5 +21,7 @@ pub fn start() {
#[cfg(not(feature = "websocket"))]
#[cfg(not(feature = "unix"))]
+ #[cfg(not(feature = "tcp"))]
+ #[cfg(not(feature = "stdio"))]
log::warn!("no interfaces enabled, daemon will be inaccesssible")
}
diff --git a/karld/src/interface/stdio.rs b/karld/src/interface/stdio.rs
index b10dfc0..f4dcac7 100644
--- a/karld/src/interface/stdio.rs
+++ b/karld/src/interface/stdio.rs
@@ -1,13 +1,7 @@
-use log::{error, info};
-
-use crate::CLIENT_ID_COUNTER;
-
-use super::generic::generic_handle_connection;
+use super::generic;
+use log::info;
pub fn run() {
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
info!("reading packets from stdin");
- if let Err(e) = generic_handle_connection(id, std::io::stdin(), std::io::stdout()) {
- error!("{e}");
- }
+ generic::handle_connection(generic::stream, (std::io::stdin(), std::io::stdout()));
}
diff --git a/karld/src/interface/tcp.rs b/karld/src/interface/tcp.rs
index 296701b..94e961f 100644
--- a/karld/src/interface/tcp.rs
+++ b/karld/src/interface/tcp.rs
@@ -1,7 +1,6 @@
-use crate::interface::generic::generic_handle_connection;
-use crate::CLIENT_ID_COUNTER;
+use crate::interface::generic;
use karlcommon::interfaces::tcp_addr;
-use log::{info, warn};
+use log::info;
use std::net::TcpListener;
use std::thread;
@@ -9,17 +8,10 @@ pub fn run() {
info!("binding to socket");
let listener = TcpListener::bind(tcp_addr()).unwrap();
info!("listening.");
-
loop {
- let (stream, addr) = listener.accept().unwrap();
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let (stream, _addr) = listener.accept().unwrap();
thread::spawn(move || {
- info!("client connected: {:?}", addr);
- if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) {
- warn!("client dropped: {:?} ({})", addr, err);
- } else {
- info!("client dropped: {:?}", addr);
- }
+ generic::handle_connection(generic::stream, (stream.try_clone().unwrap(), stream))
});
}
}
diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs
index 620ff64..9989b0f 100644
--- a/karld/src/interface/unix.rs
+++ b/karld/src/interface/unix.rs
@@ -1,7 +1,6 @@
-use crate::interface::generic::generic_handle_connection;
-use crate::CLIENT_ID_COUNTER;
+use crate::interface::generic;
use karlcommon::interfaces::unix_path;
-use log::{info, warn};
+use log::info;
use std::os::unix::net::UnixListener;
use std::thread;
@@ -15,15 +14,9 @@ pub fn run() {
info!("listening.");
loop {
- let (stream, addr) = listener.accept().unwrap();
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ let (stream, _addr) = listener.accept().unwrap();
thread::spawn(move || {
- info!("client connected: {:?}", addr);
- if let Err(err) = generic_handle_connection(id, stream.try_clone().unwrap(), stream) {
- warn!("client dropped: {:?} ({})", addr, err);
- } else {
- info!("client dropped: {:?}", addr);
- }
+ generic::handle_connection(generic::stream, (stream.try_clone().unwrap(), stream))
});
}
}
diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs
index da7d918..624c28c 100644
--- a/karld/src/interface/websocket.rs
+++ b/karld/src/interface/websocket.rs
@@ -1,37 +1,29 @@
-use karlcommon::{ServerboundPacket, ProtoError, ClientboundPacket, version};
-use log::{debug, error, info};
+use crate::handle_packet;
+use crate::interface::generic;
+use crossbeam_channel::{Receiver, Sender};
+use karlcommon::{ClientboundPacket, ProtoError, ServerboundPacket};
+use log::{debug, info};
use std::net::{TcpListener, TcpStream};
use std::thread;
use tungstenite::{accept, Message};
-use crate::{handle_packet, CLIENT_ID_COUNTER};
pub fn run() {
info!("binding websocket server");
let server = TcpListener::bind("127.0.0.1:9001").unwrap();
info!("listening");
-
for stream in server.incoming() {
- thread::spawn(move || {
- if let Err(e) = handle_connection(stream) {
- error!("{e}");
- }
- });
+ thread::spawn(move || generic::handle_connection(handle_connection, stream));
}
}
-fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Result<()> {
- let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
-
+fn handle_connection(
+ id: u32,
+ (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
+ stream: Result<TcpStream, std::io::Error>,
+) -> anyhow::Result<()> {
let stream = stream?;
stream.set_nonblocking(true)?;
let mut websocket = accept(stream)?;
-
- let (responder, responses) = crossbeam_channel::unbounded();
- responder
- .send(ClientboundPacket::Handshake {
- version: version!(),
- })
- .unwrap();
loop {
match websocket.read_message() {
@@ -50,14 +42,14 @@ fn handle_connection(stream: Result<TcpStream, std::io::Error>) -> anyhow::Resul
Err(tungstenite::Error::ConnectionClosed) => {
break Ok(());
}
- Err(tungstenite::Error::Io(e))
+ Err(tungstenite::Error::Io(e))
if let std::io::ErrorKind::WouldBlock = e.kind() => {
// its fine
}
Err(e) => Err(e)?,
}
for r in responses.try_iter() {
- websocket.write_message(Message::Text( serde_json::to_string(&r)?))?;
+ websocket.write_message(Message::Text(serde_json::to_string(&r)?))?;
}
thread::sleep(std::time::Duration::from_millis(50)); // how would you do this properly??
}
diff --git a/karld/src/main.rs b/karld/src/main.rs
index d5dfd5f..6f92fe5 100644
--- a/karld/src/main.rs
+++ b/karld/src/main.rs
@@ -47,27 +47,39 @@ fn main() {
lazy_static::lazy_static! {
static ref TASKS: RwLock<HashMap<u64, Task>> = RwLock::new(HashMap::new());
+ static ref CLIENTS: RwLock<HashMap<u32, Sender<ClientboundPacket>>> = RwLock::new(HashMap::new());
static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::new(0);
}
+fn broadcast_invalidation() {
+ CLIENTS
+ .write()
+ .unwrap()
+ .values()
+ .for_each(|r| drop(r.send(ClientboundPacket::InvalidateState)));
+}
+
pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender<ClientboundPacket>) {
// std::thread::sleep(std::time::Duration::from_millis(75)); // for testing clients with latency
match packet {
ServerboundPacket::Sync => {
- let _ = responder.send(ClientboundPacket::Sync);
+ drop(responder.send(ClientboundPacket::Sync));
}
ServerboundPacket::ListTasks => {
- let _ = responder.send(ClientboundPacket::TaskList(
+ drop(responder.send(ClientboundPacket::TaskList(
TASKS.read().unwrap().values().map(|e| e.clone()).collect(),
- ));
+ )));
}
ServerboundPacket::UpdateTask(t) => {
TASKS.write().unwrap().insert(t.id, t);
+ broadcast_invalidation();
savestate::save();
}
ServerboundPacket::RemoveTask(i) => {
- if TASKS.write().unwrap().remove(&i).is_none() {
- let _ = responder.send(ClientboundPacket::Error(ProtoError::UnknownTask));
+ if TASKS.write().unwrap().remove(&i).is_some() {
+ broadcast_invalidation();
+ } else {
+ drop(responder.send(ClientboundPacket::Error(ProtoError::UnknownTask)));
}
savestate::save();
}
@@ -78,8 +90,7 @@ pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender<C
let t = match TASKS.read().unwrap().get(&task).cloned() {
Some(t) => t,
None => {
- let _ = responder.send(ClientboundPacket::Error(ProtoError::UnknownTask));
- return;
+ return drop(responder.send(ClientboundPacket::Error(ProtoError::UnknownTask)))
}
};
@@ -113,7 +124,7 @@ pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender<C
}
}
}
- let _ = responder.send(ClientboundPacket::InstanceList(ocs));
+ drop(responder.send(ClientboundPacket::InstanceList(ocs)));
}
}
}
diff --git a/karlgui/src/globals.rs b/karlgui/src/globals.rs
index fca2e86..f757f2e 100644
--- a/karlgui/src/globals.rs
+++ b/karlgui/src/globals.rs
@@ -26,7 +26,7 @@ impl Globals {
}
}
pub fn update_network(&mut self) {
- for p in self.client.receiver.try_iter() {
+ while let Ok(p) = self.client.receiver.try_recv() {
match p {
ClientboundPacket::TaskList(t) => {
self.tasks = HashMap::from_iter(t.into_iter().map(|e| (e.id, e)));
@@ -48,6 +48,11 @@ impl Globals {
warn!("got unknown instance list packet")
}
}
+ ClientboundPacket::InvalidateState => {
+ self.instance_cache.clear();
+ self.tasks.clear();
+ self.client.send_sync(ServerboundPacket::ListTasks);
+ }
_ => {}
}
}
diff --git a/karlgui/src/views/mod.rs b/karlgui/src/views/mod.rs
index 9c6ea7c..4ed65a0 100644
--- a/karlgui/src/views/mod.rs
+++ b/karlgui/src/views/mod.rs
@@ -1,2 +1,2 @@
-pub mod edit;
pub mod calendar;
+pub mod edit;