aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-05-17 18:27:00 +0200
committermetamuffin <metamuffin@disroot.org>2025-05-17 18:27:00 +0200
commit0c4cb405f9b166398a2bf7e128c47fa56dfa2d71 (patch)
tree06db763a47dcfa9c975f3d797d8ee2534d37b1be /src
parent1c27a83409a7f51c5d07098cb6ca65bcee870d9c (diff)
downloadisda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar
isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.bz2
isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.zst
enqueue works
Diffstat (limited to 'src')
-rw-r--r--src/main.rs5
-rw-r--r--src/ui.rs0
-rw-r--r--src/webui.rs63
-rw-r--r--src/worker_ws.rs15
4 files changed, 79 insertions, 4 deletions
diff --git a/src/main.rs b/src/main.rs
index a08ae6b..5096ae8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,4 +1,4 @@
-pub mod ui;
+pub mod webui;
pub mod worker_ws;
use anyhow::Result;
@@ -14,6 +14,7 @@ use tokio::{
net::TcpListener,
sync::{RwLock, mpsc::Sender},
};
+use webui::webui;
use worker_ws::{WorkerID, WorkerResponse, worker_websocket};
struct Worker {
@@ -40,7 +41,7 @@ async fn main() -> Result<()> {
let mut state = State::default();
state.load().await?;
let router = Router::new()
- .route("/", get(async || "Hello world!"))
+ .route("/", get(webui))
.route("/worker_ws", get(worker_websocket))
.with_state(Arc::new(RwLock::new(state)));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
diff --git a/src/ui.rs b/src/ui.rs
deleted file mode 100644
index e69de29..0000000
--- a/src/ui.rs
+++ /dev/null
diff --git a/src/webui.rs b/src/webui.rs
new file mode 100644
index 0000000..9e8b9c3
--- /dev/null
+++ b/src/webui.rs
@@ -0,0 +1,63 @@
+use crate::State;
+use axum::extract::State as S;
+use axum::response::Html;
+use serde_json::{Map, Value};
+use std::sync::Arc;
+use tokio::sync::RwLock;
+
+pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
+ let g = state.read().await;
+
+ let doc = markup::new! {
+ html {
+ head {
+ meta[charset="UTF-8"];
+ title { "" }
+ }
+ body {
+ section {
+ h2 { "Workers"}
+ ul { @for (id, w) in &g.workers {
+ li { div {
+ h3 { @w.name }
+ span { "ID: " @id } " "
+ @if w.accept > 0 {
+ span { "Accepting Jobs (" @w.accept ")" }
+ } else {
+ span { "Idle" }
+ }
+ }}
+ }}
+ }
+ section {
+ h2 { "Queued" }
+ ul { @for key in &g.queue {
+ li { @key }
+ }}
+ }
+ section {
+ h2 { "Loading" }
+ ul { @for key in &g.loading {
+ li { @key }
+ }}
+ }
+ section {
+ h2 { "Completed" }
+ ul { @for key in &g.complete {
+ li { @key }
+ }}
+ }
+ }
+ }
+ };
+ Html(doc.to_string())
+}
+
+markup::define!(
+ Task<'a>(key: &'a str, data: &'a Map<String, Value>) {
+ div.task {
+ h3 { @data["title"].as_str().unwrap_or(key) }
+ spawn.key { @key }
+ }
+ }
+);
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index f645c11..2d82e4d 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -37,6 +37,7 @@ pub enum WorkerRequest {
key: String,
},
Accept,
+ Save,
_EchoError {
message: String,
@@ -93,6 +94,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
});
+ let state2 = state.clone();
let mut recv_task = spawn(async move {
while let Some(message) = recv.next().await {
if let Ok(m) = message {
@@ -107,7 +109,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
};
- if let Err(e) = state.write().await.handle_worker_message(worker, req) {
+ if let Err(e) = state.write().await.handle_worker_message(worker, req).await
+ {
warn!("error processing request: {e:?}");
state.write().await.send_to_worker(
worker,
@@ -127,6 +130,11 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
_ = &mut send_task => recv_task.abort(),
_ = &mut recv_task => send_task.abort(),
};
+
+ {
+ let mut g = state2.write().await;
+ g.workers.remove(&worker).unwrap();
+ }
}
impl State {
@@ -135,7 +143,7 @@ impl State {
warn!("worker ws response overflow");
}
}
- pub fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
+ pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
let worker = self.workers.get_mut(&w).unwrap();
match req {
WorkerRequest::Register { name, sources } => {
@@ -159,6 +167,9 @@ impl State {
WorkerRequest::Accept => {
worker.accept += 1;
}
+ WorkerRequest::Save => {
+ self.save().await?;
+ }
WorkerRequest::_EchoError { message } => {
self.send_to_worker(w, WorkerResponse::Error { message });
}