diff options
author | metamuffin <metamuffin@disroot.org> | 2025-05-17 18:27:00 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-05-17 18:27:00 +0200 |
commit | 0c4cb405f9b166398a2bf7e128c47fa56dfa2d71 (patch) | |
tree | 06db763a47dcfa9c975f3d797d8ee2534d37b1be | |
parent | 1c27a83409a7f51c5d07098cb6ca65bcee870d9c (diff) | |
download | isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.bz2 isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.zst |
enqueue works
-rw-r--r-- | scripts/enqueue.ts | 35 | ||||
-rw-r--r-- | scripts/ytdlp_flatten.ts | 2 | ||||
-rw-r--r-- | src/main.rs | 5 | ||||
-rw-r--r-- | src/ui.rs | 0 | ||||
-rw-r--r-- | src/webui.rs | 63 | ||||
-rw-r--r-- | src/worker_ws.rs | 15 |
6 files changed, 116 insertions, 4 deletions
diff --git a/scripts/enqueue.ts b/scripts/enqueue.ts new file mode 100644 index 0000000..2c2871d --- /dev/null +++ b/scripts/enqueue.ts @@ -0,0 +1,35 @@ + +const file = await Deno.readTextFile(Deno.args[1]) +const note_filter = Deno.args.length >= 3 ? Deno.args[2] : "" + +const ws = new WebSocket(Deno.args[0]) + +function run_enqueue() { + let kind = "http" + for (const line of file.split("\n")) { + if (!line.trim().length) continue + else if (line.startsWith("[") && line.endsWith("]")) + kind = line.substring(1, line.length - 1) + else { + const [name, rest] = line.split("=", 2) + const [id, note] = rest.split(";", 2) + if (note_filter.length && note != note_filter) continue + const key = `${kind}:${id}`; + ws.send(JSON.stringify({ t: "metadata", key, data: { output: name, title: name } })) + ws.send(JSON.stringify({ t: "enqueue", key })) + } + } + ws.send(JSON.stringify({ t: "save" })) + console.log("done"); +} + +ws.onerror = () => console.error("ws error") +ws.onclose = () => console.error("ws closed") +ws.onopen = () => { + console.log("ws open"); + ws.send(JSON.stringify({ t: "register", name: "enqueuer", sources: [] })) + run_enqueue() +} +ws.onmessage = ev => { + console.log(ev.data); +} diff --git a/scripts/ytdlp_flatten.ts b/scripts/ytdlp_flatten.ts index f2eaa65..6a3eb5e 100644 --- a/scripts/ytdlp_flatten.ts +++ b/scripts/ytdlp_flatten.ts @@ -5,7 +5,9 @@ const ws = new WebSocket(Deno.args[0]) ws.onerror = () => console.error("ws error") ws.onclose = () => console.error("ws closed") ws.onopen = () => { + console.log("ws open"); ws.send(JSON.stringify({ t: "register", name: "yt-dlp playlist flattener", sources: ["ytdlp-flatten"] })) + ws.send(JSON.stringify({ t: "accept" })) } ws.onmessage = ev => { console.log(ev.data); diff --git a/src/main.rs b/src/main.rs index a08ae6b..5096ae8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -pub mod ui; +pub mod webui; pub mod worker_ws; use anyhow::Result; @@ -14,6 +14,7 @@ use tokio::{ net::TcpListener, sync::{RwLock, mpsc::Sender}, }; +use webui::webui; use worker_ws::{WorkerID, WorkerResponse, worker_websocket}; struct Worker { @@ -40,7 +41,7 @@ async fn main() -> Result<()> { let mut state = State::default(); state.load().await?; let router = Router::new() - .route("/", get(async || "Hello world!")) + .route("/", get(webui)) .route("/worker_ws", get(worker_websocket)) .with_state(Arc::new(RwLock::new(state))); let listener = TcpListener::bind("127.0.0.1:8080").await?; diff --git a/src/ui.rs b/src/ui.rs deleted file mode 100644 index e69de29..0000000 --- a/src/ui.rs +++ /dev/null diff --git a/src/webui.rs b/src/webui.rs new file mode 100644 index 0000000..9e8b9c3 --- /dev/null +++ b/src/webui.rs @@ -0,0 +1,63 @@ +use crate::State; +use axum::extract::State as S; +use axum::response::Html; +use serde_json::{Map, Value}; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> { + let g = state.read().await; + + let doc = markup::new! { + html { + head { + meta[charset="UTF-8"]; + title { "" } + } + body { + section { + h2 { "Workers"} + ul { @for (id, w) in &g.workers { + li { div { + h3 { @w.name } + span { "ID: " @id } " " + @if w.accept > 0 { + span { "Accepting Jobs (" @w.accept ")" } + } else { + span { "Idle" } + } + }} + }} + } + section { + h2 { "Queued" } + ul { @for key in &g.queue { + li { @key } + }} + } + section { + h2 { "Loading" } + ul { @for key in &g.loading { + li { @key } + }} + } + section { + h2 { "Completed" } + ul { @for key in &g.complete { + li { @key } + }} + } + } + } + }; + Html(doc.to_string()) +} + +markup::define!( + Task<'a>(key: &'a str, data: &'a Map<String, Value>) { + div.task { + h3 { @data["title"].as_str().unwrap_or(key) } + spawn.key { @key } + } + } +); diff --git a/src/worker_ws.rs b/src/worker_ws.rs index f645c11..2d82e4d 100644 --- a/src/worker_ws.rs +++ b/src/worker_ws.rs @@ -37,6 +37,7 @@ pub enum WorkerRequest { key: String, }, Accept, + Save, _EchoError { message: String, @@ -93,6 +94,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { } } }); + let state2 = state.clone(); let mut recv_task = spawn(async move { while let Some(message) = recv.next().await { if let Ok(m) = message { @@ -107,7 +109,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { } } }; - if let Err(e) = state.write().await.handle_worker_message(worker, req) { + if let Err(e) = state.write().await.handle_worker_message(worker, req).await + { warn!("error processing request: {e:?}"); state.write().await.send_to_worker( worker, @@ -127,6 +130,11 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { _ = &mut send_task => recv_task.abort(), _ = &mut recv_task => send_task.abort(), }; + + { + let mut g = state2.write().await; + g.workers.remove(&worker).unwrap(); + } } impl State { @@ -135,7 +143,7 @@ impl State { warn!("worker ws response overflow"); } } - pub fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { + pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { let worker = self.workers.get_mut(&w).unwrap(); match req { WorkerRequest::Register { name, sources } => { @@ -159,6 +167,9 @@ impl State { WorkerRequest::Accept => { worker.accept += 1; } + WorkerRequest::Save => { + self.save().await?; + } WorkerRequest::_EchoError { message } => { self.send_to_worker(w, WorkerResponse::Error { message }); } |