From 0c4cb405f9b166398a2bf7e128c47fa56dfa2d71 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sat, 17 May 2025 18:27:00 +0200 Subject: enqueue works --- src/main.rs | 5 +++-- src/ui.rs | 0 src/webui.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/worker_ws.rs | 15 ++++++++++++-- 4 files changed, 79 insertions(+), 4 deletions(-) delete mode 100644 src/ui.rs create mode 100644 src/webui.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs index a08ae6b..5096ae8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -pub mod ui; +pub mod webui; pub mod worker_ws; use anyhow::Result; @@ -14,6 +14,7 @@ use tokio::{ net::TcpListener, sync::{RwLock, mpsc::Sender}, }; +use webui::webui; use worker_ws::{WorkerID, WorkerResponse, worker_websocket}; struct Worker { @@ -40,7 +41,7 @@ async fn main() -> Result<()> { let mut state = State::default(); state.load().await?; let router = Router::new() - .route("/", get(async || "Hello world!")) + .route("/", get(webui)) .route("/worker_ws", get(worker_websocket)) .with_state(Arc::new(RwLock::new(state))); let listener = TcpListener::bind("127.0.0.1:8080").await?; diff --git a/src/ui.rs b/src/ui.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/webui.rs b/src/webui.rs new file mode 100644 index 0000000..9e8b9c3 --- /dev/null +++ b/src/webui.rs @@ -0,0 +1,63 @@ +use crate::State; +use axum::extract::State as S; +use axum::response::Html; +use serde_json::{Map, Value}; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub(crate) async fn webui(S(state): S>>) -> Html { + let g = state.read().await; + + let doc = markup::new! { + html { + head { + meta[charset="UTF-8"]; + title { "" } + } + body { + section { + h2 { "Workers"} + ul { @for (id, w) in &g.workers { + li { div { + h3 { @w.name } + span { "ID: " @id } " " + @if w.accept > 0 { + span { "Accepting Jobs (" @w.accept ")" } + } else { + span { "Idle" } + } + }} + }} + } + section { + h2 { "Queued" } + ul { @for key in &g.queue { + li { @key } + }} + } + section { + h2 { "Loading" } + ul { @for key in &g.loading { + li { @key } + }} + } + section { + h2 { "Completed" } + ul { @for key in &g.complete { + li { @key } + }} + } + } + } + }; + Html(doc.to_string()) +} + +markup::define!( + Task<'a>(key: &'a str, data: &'a Map) { + div.task { + h3 { @data["title"].as_str().unwrap_or(key) } + spawn.key { @key } + } + } +); diff --git a/src/worker_ws.rs b/src/worker_ws.rs index f645c11..2d82e4d 100644 --- a/src/worker_ws.rs +++ b/src/worker_ws.rs @@ -37,6 +37,7 @@ pub enum WorkerRequest { key: String, }, Accept, + Save, _EchoError { message: String, @@ -93,6 +94,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc>) { } } }); + let state2 = state.clone(); let mut recv_task = spawn(async move { while let Some(message) = recv.next().await { if let Ok(m) = message { @@ -107,7 +109,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc>) { } } }; - if let Err(e) = state.write().await.handle_worker_message(worker, req) { + if let Err(e) = state.write().await.handle_worker_message(worker, req).await + { warn!("error processing request: {e:?}"); state.write().await.send_to_worker( worker, @@ -127,6 +130,11 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc>) { _ = &mut send_task => recv_task.abort(), _ = &mut recv_task => send_task.abort(), }; + + { + let mut g = state2.write().await; + g.workers.remove(&worker).unwrap(); + } } impl State { @@ -135,7 +143,7 @@ impl State { warn!("worker ws response overflow"); } } - pub fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { + pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { let worker = self.workers.get_mut(&w).unwrap(); match req { WorkerRequest::Register { name, sources } => { @@ -159,6 +167,9 @@ impl State { WorkerRequest::Accept => { worker.accept += 1; } + WorkerRequest::Save => { + self.save().await?; + } WorkerRequest::_EchoError { message } => { self.send_to_worker(w, WorkerResponse::Error { message }); } -- cgit v1.2.3-70-g09d2