diff options
-rw-r--r-- | scripts/complete_from_files.ts | 1 | ||||
-rw-r--r-- | scripts/dummy_worker.ts | 30 | ||||
-rw-r--r-- | src/helper.rs | 20 | ||||
-rw-r--r-- | src/main.rs | 3 | ||||
-rw-r--r-- | src/style.css | 3 | ||||
-rw-r--r-- | src/webui.rs | 32 | ||||
-rw-r--r-- | src/webui_live.js | 31 | ||||
-rw-r--r-- | src/webui_ws.rs | 10 | ||||
-rw-r--r-- | src/worker_ws.rs | 2 |
9 files changed, 121 insertions, 11 deletions
diff --git a/scripts/complete_from_files.ts b/scripts/complete_from_files.ts index 6aec7a2..f5770a2 100644 --- a/scripts/complete_from_files.ts +++ b/scripts/complete_from_files.ts @@ -26,6 +26,7 @@ ws.onopen = async () => { console.log("ws open"); ws.send(JSON.stringify({ t: "register", name: "complete from files", task_kinds: [] })) await traverse(root) + ws.send(JSON.stringify({ t: "save" })) console.log(`done, ${counter} tasks marked as complete`); setTimeout(() => Deno.exit(0), 200) // not sure if websockets are flushed since they're instant } diff --git a/scripts/dummy_worker.ts b/scripts/dummy_worker.ts new file mode 100644 index 0000000..00e6ad2 --- /dev/null +++ b/scripts/dummy_worker.ts @@ -0,0 +1,30 @@ + +const ws = new WebSocket(Deno.args[0]) + +async function do_work(key: string) { + let progress = 0 + while (progress < 1) { + await new Promise(r => setTimeout(r, 200)) + progress += 0.1 + ws.send(JSON.stringify({ t: "metadata", key, data: { progress } })) + } + ws.send(JSON.stringify({ t: "complete", key })) +} + +ws.onerror = () => console.error("ws error") +ws.onclose = () => console.error("ws closed") +ws.onopen = () => { + console.log("ws open"); + ws.send(JSON.stringify({ t: "register", name: "dummy worker", task_kinds: ["youtube"] })) + ws.send(JSON.stringify({ t: "accept" })) +} +ws.onmessage = async ev => { + if (typeof ev.data != "string") return + const p = JSON.parse(ev.data) + if (p.t == "error") console.error(`error: ${p.message}`); + if (p.t == "work") { + if (!p.data.output) throw new Error("no output"); + await do_work(p.key) + ws.send(JSON.stringify({ t: "accept" })) + } +} diff --git a/src/helper.rs b/src/helper.rs index 22ccd0d..a9f7cb4 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -19,3 +19,23 @@ where .into_response() } } + +#[derive(Clone, Copy, Debug)] +#[must_use] +pub struct Javascript<T>(pub T); + +impl<T> IntoResponse for Javascript<T> +where + T: IntoResponse, +{ + fn into_response(self) -> Response { + ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static("text/javascript"), + )], + self.0, + ) + .into_response() + } +} diff --git a/src/main.rs b/src/main.rs index f8f4fba..e4a17e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use tokio::{ mpsc::{self}, }, }; -use webui::{webui, webui_style}; +use webui::{webui, webui_script, webui_style}; use webui_ws::{WebuiEvent, webui_websocket}; use worker_ws::{WorkerID, WorkerResponse, worker_websocket}; @@ -54,6 +54,7 @@ async fn main() -> Result<()> { let router = Router::new() .route("/", get(webui)) .route("/style.css", get(webui_style)) + .route("/webui_live.js", get(webui_script)) .route("/webui_ws", get(webui_websocket)) .route("/worker_ws", get(worker_websocket)) .route("/api/queue.json", get(api_queue_json)) diff --git a/src/style.css b/src/style.css index a309979..5e1ed0b 100644 --- a/src/style.css +++ b/src/style.css @@ -26,6 +26,9 @@ } .task.loading { border-color: #ffc36e; + background-image: linear-gradient(#fff3); + background-repeat: no-repeat; + background-size: 0%; } .task.complete { border-color: #6eff70; diff --git a/src/webui.rs b/src/webui.rs index 5a66ffc..cd7c1de 100644 --- a/src/webui.rs +++ b/src/webui.rs @@ -1,4 +1,7 @@ -use crate::{State, helper::Css}; +use crate::{ + State, + helper::{Css, Javascript}, +}; use axum::extract::State as S; use axum::response::Html; use markup::doctype; @@ -13,6 +16,13 @@ pub(crate) async fn webui_style() -> Css<String> { include_str!("style.css").to_string() }) } +pub(crate) async fn webui_script() -> Javascript<String> { + Javascript(if cfg!(debug_assertions) { + read_to_string("src/webui_live.js").await.unwrap() + } else { + include_str!("webui_live.js").to_string() + }) +} pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> { let g = state.read().await; @@ -25,19 +35,20 @@ pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> { head { meta[charset="UTF-8"]; link[rel="stylesheet", href="/style.css"]; + script[src="/webui_live.js", defer] {} title { "Queue-Server" } } body { - section { + section.workers { h2 { "Workers"} ul { @for (id, w) in &g.workers { li { @Worker { id: *id, w } } }} } section.tasks { - @Taskbin {title: "Queued", class: "task queue", set: &g.queue, default, g } - @Taskbin {title: "Loading", class: "task loading", set: &g.loading, default, g } - @Taskbin {title: "Completed", class: "task complete", set: &g.complete, default, g } + @Taskbin {title: "Queued", state: "queue", set: &g.queue, default, g } + @Taskbin {title: "Loading", state: "loading", set: &g.loading, default, g } + @Taskbin {title: "Completed", state: "complete", set: &g.complete, default, g } } } } @@ -46,17 +57,18 @@ pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> { } markup::define!( - Taskbin<'a>(title: &'a str, class: &'a str, set: &'a HashSet<String>, g: &'a State, default: &'a Map<String, Value>) { - div { + Taskbin<'a>(title: &'a str, state: &'a str, set: &'a HashSet<String>, g: &'a State, default: &'a Map<String, Value>) { + div[id=format!("bin-{state}")] { h2 { @title } p.count { @set.len() " tasks" } + @let class = format!("task {state}"); ul { @for key in set.iter().take(128) { - li { @Task { key, data: g.metadata.get(key).unwrap_or(&default), class } } + li { @Task { key, data: g.metadata.get(key).unwrap_or(&default), class: &class } } }} } } Task<'a>(key: &'a str, data: &'a Map<String, Value>, class: &'a str) { - div[class=class] { + div[class=class, id=key] { // @if let Some(url) = data.get("thumbnail").and_then(Value::as_str) { // img[src=url, loading="lazy"]; // } @@ -68,7 +80,7 @@ markup::define!( } } Worker<'a>(id: u64, w: &'a crate::Worker) { - div[class=worker_class(w)] { + div[class=worker_class(w), id=format!("worker-{id}")] { h3 { @w.name } span { "ID: " @id } ", " @if !w.assigned_tasks.is_empty() { diff --git a/src/webui_live.js b/src/webui_live.js new file mode 100644 index 0000000..12b5fef --- /dev/null +++ b/src/webui_live.js @@ -0,0 +1,31 @@ +/// <reference lib="dom" /> + +const ws = new WebSocket("/webui_ws") +ws.onopen = () => console.log("ws open"); +ws.onclose = () => console.warn("ws close"); +ws.onerror = () => console.warn("ws error"); +ws.onmessage = ev => { + const u = JSON.parse(ev.data) + if (u.t == "update_worker") { + const e = document.getElementById(`worker-${u.id}`) + if (e) e.outerHTML = u.html + else document.getElementById("workers").innerHTML += u.html + } else if (u.t == "remove_worker") { + document.getElementById(`worker-${u.id}`)?.remove() + } else if (u.t == "update_task") { + const e = document.getElementById(u.key) + const parent_id = u.bin == "queue" ? "bin-queue" : u.bin == "loading" ? "bin-loading" : "bin-complete" + const parent = document.querySelector(`#${parent_id} > ul`) + if (e && e.parentElement == parent) e.outerHTML = u.html + else { + e?.remove() + parent.innerHTML += u.html + } + } else if (u.t == "remove_task") { + document.getElementById(u.key)?.remove() + } else if (u.t == "counters") { + document.querySelector("#bin-queue .count").textContent = `${u.queue} tasks` + document.querySelector("#bin-loading .count").textContent = `${u.loading} tasks` + document.querySelector("#bin-complete .count").textContent = `${u.complete} tasks` + } +} diff --git a/src/webui_ws.rs b/src/webui_ws.rs index a1fd348..bfa18b0 100644 --- a/src/webui_ws.rs +++ b/src/webui_ws.rs @@ -15,6 +15,11 @@ use tokio::sync::RwLock; #[derive(Debug, Serialize)] #[serde(tag = "t", rename_all = "snake_case")] pub enum WebuiEvent { + Counters { + queue: usize, + loading: usize, + complete: usize, + }, UpdateWorker { id: WorkerID, html: String, @@ -88,6 +93,11 @@ impl State { key: key.to_owned(), html: webui::Task { class, data, key }.to_string(), })); + let _ = self.webui_broadcast.send(Arc::new(WebuiEvent::Counters { + queue: self.queue.len(), + loading: self.loading.len(), + complete: self.complete.len(), + })); } } pub fn send_webui_worker_removal(&self, id: WorkerID) { diff --git a/src/worker_ws.rs b/src/worker_ws.rs index 526de82..c98e08d 100644 --- a/src/worker_ws.rs +++ b/src/worker_ws.rs @@ -85,6 +85,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { assigned_tasks: HashSet::new(), }, ); + g.send_webui_worker_update(id); id }; @@ -144,6 +145,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { g.loading.remove(&key); g.queue.insert(key); } + g.send_webui_worker_removal(worker); } } |