aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/complete_from_files.ts1
-rw-r--r--scripts/dummy_worker.ts30
-rw-r--r--src/helper.rs20
-rw-r--r--src/main.rs3
-rw-r--r--src/style.css3
-rw-r--r--src/webui.rs32
-rw-r--r--src/webui_live.js31
-rw-r--r--src/webui_ws.rs10
-rw-r--r--src/worker_ws.rs2
9 files changed, 121 insertions, 11 deletions
diff --git a/scripts/complete_from_files.ts b/scripts/complete_from_files.ts
index 6aec7a2..f5770a2 100644
--- a/scripts/complete_from_files.ts
+++ b/scripts/complete_from_files.ts
@@ -26,6 +26,7 @@ ws.onopen = async () => {
console.log("ws open");
ws.send(JSON.stringify({ t: "register", name: "complete from files", task_kinds: [] }))
await traverse(root)
+ ws.send(JSON.stringify({ t: "save" }))
console.log(`done, ${counter} tasks marked as complete`);
setTimeout(() => Deno.exit(0), 200) // not sure if websockets are flushed since they're instant
}
diff --git a/scripts/dummy_worker.ts b/scripts/dummy_worker.ts
new file mode 100644
index 0000000..00e6ad2
--- /dev/null
+++ b/scripts/dummy_worker.ts
@@ -0,0 +1,30 @@
+
+const ws = new WebSocket(Deno.args[0])
+
+async function do_work(key: string) {
+ let progress = 0
+ while (progress < 1) {
+ await new Promise(r => setTimeout(r, 200))
+ progress += 0.1
+ ws.send(JSON.stringify({ t: "metadata", key, data: { progress } }))
+ }
+ ws.send(JSON.stringify({ t: "complete", key }))
+}
+
+ws.onerror = () => console.error("ws error")
+ws.onclose = () => console.error("ws closed")
+ws.onopen = () => {
+ console.log("ws open");
+ ws.send(JSON.stringify({ t: "register", name: "dummy worker", task_kinds: ["youtube"] }))
+ ws.send(JSON.stringify({ t: "accept" }))
+}
+ws.onmessage = async ev => {
+ if (typeof ev.data != "string") return
+ const p = JSON.parse(ev.data)
+ if (p.t == "error") console.error(`error: ${p.message}`);
+ if (p.t == "work") {
+ if (!p.data.output) throw new Error("no output");
+ await do_work(p.key)
+ ws.send(JSON.stringify({ t: "accept" }))
+ }
+}
diff --git a/src/helper.rs b/src/helper.rs
index 22ccd0d..a9f7cb4 100644
--- a/src/helper.rs
+++ b/src/helper.rs
@@ -19,3 +19,23 @@ where
.into_response()
}
}
+
+#[derive(Clone, Copy, Debug)]
+#[must_use]
+pub struct Javascript<T>(pub T);
+
+impl<T> IntoResponse for Javascript<T>
+where
+ T: IntoResponse,
+{
+ fn into_response(self) -> Response {
+ (
+ [(
+ header::CONTENT_TYPE,
+ HeaderValue::from_static("text/javascript"),
+ )],
+ self.0,
+ )
+ .into_response()
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index f8f4fba..e4a17e8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,7 +23,7 @@ use tokio::{
mpsc::{self},
},
};
-use webui::{webui, webui_style};
+use webui::{webui, webui_script, webui_style};
use webui_ws::{WebuiEvent, webui_websocket};
use worker_ws::{WorkerID, WorkerResponse, worker_websocket};
@@ -54,6 +54,7 @@ async fn main() -> Result<()> {
let router = Router::new()
.route("/", get(webui))
.route("/style.css", get(webui_style))
+ .route("/webui_live.js", get(webui_script))
.route("/webui_ws", get(webui_websocket))
.route("/worker_ws", get(worker_websocket))
.route("/api/queue.json", get(api_queue_json))
diff --git a/src/style.css b/src/style.css
index a309979..5e1ed0b 100644
--- a/src/style.css
+++ b/src/style.css
@@ -26,6 +26,9 @@
}
.task.loading {
border-color: #ffc36e;
+ background-image: linear-gradient(#fff3);
+ background-repeat: no-repeat;
+ background-size: 0%;
}
.task.complete {
border-color: #6eff70;
diff --git a/src/webui.rs b/src/webui.rs
index 5a66ffc..cd7c1de 100644
--- a/src/webui.rs
+++ b/src/webui.rs
@@ -1,4 +1,7 @@
-use crate::{State, helper::Css};
+use crate::{
+ State,
+ helper::{Css, Javascript},
+};
use axum::extract::State as S;
use axum::response::Html;
use markup::doctype;
@@ -13,6 +16,13 @@ pub(crate) async fn webui_style() -> Css<String> {
include_str!("style.css").to_string()
})
}
+pub(crate) async fn webui_script() -> Javascript<String> {
+ Javascript(if cfg!(debug_assertions) {
+ read_to_string("src/webui_live.js").await.unwrap()
+ } else {
+ include_str!("webui_live.js").to_string()
+ })
+}
pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
let g = state.read().await;
@@ -25,19 +35,20 @@ pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
head {
meta[charset="UTF-8"];
link[rel="stylesheet", href="/style.css"];
+ script[src="/webui_live.js", defer] {}
title { "Queue-Server" }
}
body {
- section {
+ section.workers {
h2 { "Workers"}
ul { @for (id, w) in &g.workers {
li { @Worker { id: *id, w } }
}}
}
section.tasks {
- @Taskbin {title: "Queued", class: "task queue", set: &g.queue, default, g }
- @Taskbin {title: "Loading", class: "task loading", set: &g.loading, default, g }
- @Taskbin {title: "Completed", class: "task complete", set: &g.complete, default, g }
+ @Taskbin {title: "Queued", state: "queue", set: &g.queue, default, g }
+ @Taskbin {title: "Loading", state: "loading", set: &g.loading, default, g }
+ @Taskbin {title: "Completed", state: "complete", set: &g.complete, default, g }
}
}
}
@@ -46,17 +57,18 @@ pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
}
markup::define!(
- Taskbin<'a>(title: &'a str, class: &'a str, set: &'a HashSet<String>, g: &'a State, default: &'a Map<String, Value>) {
- div {
+ Taskbin<'a>(title: &'a str, state: &'a str, set: &'a HashSet<String>, g: &'a State, default: &'a Map<String, Value>) {
+ div[id=format!("bin-{state}")] {
h2 { @title }
p.count { @set.len() " tasks" }
+ @let class = format!("task {state}");
ul { @for key in set.iter().take(128) {
- li { @Task { key, data: g.metadata.get(key).unwrap_or(&default), class } }
+ li { @Task { key, data: g.metadata.get(key).unwrap_or(&default), class: &class } }
}}
}
}
Task<'a>(key: &'a str, data: &'a Map<String, Value>, class: &'a str) {
- div[class=class] {
+ div[class=class, id=key] {
// @if let Some(url) = data.get("thumbnail").and_then(Value::as_str) {
// img[src=url, loading="lazy"];
// }
@@ -68,7 +80,7 @@ markup::define!(
}
}
Worker<'a>(id: u64, w: &'a crate::Worker) {
- div[class=worker_class(w)] {
+ div[class=worker_class(w), id=format!("worker-{id}")] {
h3 { @w.name }
span { "ID: " @id } ", "
@if !w.assigned_tasks.is_empty() {
diff --git a/src/webui_live.js b/src/webui_live.js
new file mode 100644
index 0000000..12b5fef
--- /dev/null
+++ b/src/webui_live.js
@@ -0,0 +1,31 @@
+/// <reference lib="dom" />
+
+const ws = new WebSocket("/webui_ws")
+ws.onopen = () => console.log("ws open");
+ws.onclose = () => console.warn("ws close");
+ws.onerror = () => console.warn("ws error");
+ws.onmessage = ev => {
+ const u = JSON.parse(ev.data)
+ if (u.t == "update_worker") {
+ const e = document.getElementById(`worker-${u.id}`)
+ if (e) e.outerHTML = u.html
+ else document.getElementById("workers").innerHTML += u.html
+ } else if (u.t == "remove_worker") {
+ document.getElementById(`worker-${u.id}`)?.remove()
+ } else if (u.t == "update_task") {
+ const e = document.getElementById(u.key)
+ const parent_id = u.bin == "queue" ? "bin-queue" : u.bin == "loading" ? "bin-loading" : "bin-complete"
+ const parent = document.querySelector(`#${parent_id} > ul`)
+ if (e && e.parentElement == parent) e.outerHTML = u.html
+ else {
+ e?.remove()
+ parent.innerHTML += u.html
+ }
+ } else if (u.t == "remove_task") {
+ document.getElementById(u.key)?.remove()
+ } else if (u.t == "counters") {
+ document.querySelector("#bin-queue .count").textContent = `${u.queue} tasks`
+ document.querySelector("#bin-loading .count").textContent = `${u.loading} tasks`
+ document.querySelector("#bin-complete .count").textContent = `${u.complete} tasks`
+ }
+}
diff --git a/src/webui_ws.rs b/src/webui_ws.rs
index a1fd348..bfa18b0 100644
--- a/src/webui_ws.rs
+++ b/src/webui_ws.rs
@@ -15,6 +15,11 @@ use tokio::sync::RwLock;
#[derive(Debug, Serialize)]
#[serde(tag = "t", rename_all = "snake_case")]
pub enum WebuiEvent {
+ Counters {
+ queue: usize,
+ loading: usize,
+ complete: usize,
+ },
UpdateWorker {
id: WorkerID,
html: String,
@@ -88,6 +93,11 @@ impl State {
key: key.to_owned(),
html: webui::Task { class, data, key }.to_string(),
}));
+ let _ = self.webui_broadcast.send(Arc::new(WebuiEvent::Counters {
+ queue: self.queue.len(),
+ loading: self.loading.len(),
+ complete: self.complete.len(),
+ }));
}
}
pub fn send_webui_worker_removal(&self, id: WorkerID) {
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index 526de82..c98e08d 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -85,6 +85,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
assigned_tasks: HashSet::new(),
},
);
+ g.send_webui_worker_update(id);
id
};
@@ -144,6 +145,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
g.loading.remove(&key);
g.queue.insert(key);
}
+ g.send_webui_worker_removal(worker);
}
}