aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-05-17 18:27:00 +0200
committermetamuffin <metamuffin@disroot.org>2025-05-17 18:27:00 +0200
commit0c4cb405f9b166398a2bf7e128c47fa56dfa2d71 (patch)
tree06db763a47dcfa9c975f3d797d8ee2534d37b1be
parent1c27a83409a7f51c5d07098cb6ca65bcee870d9c (diff)
downloadisda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar
isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.bz2
isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.zst
enqueue works
-rw-r--r--scripts/enqueue.ts35
-rw-r--r--scripts/ytdlp_flatten.ts2
-rw-r--r--src/main.rs5
-rw-r--r--src/ui.rs0
-rw-r--r--src/webui.rs63
-rw-r--r--src/worker_ws.rs15
6 files changed, 116 insertions, 4 deletions
diff --git a/scripts/enqueue.ts b/scripts/enqueue.ts
new file mode 100644
index 0000000..2c2871d
--- /dev/null
+++ b/scripts/enqueue.ts
@@ -0,0 +1,35 @@
+
+const file = await Deno.readTextFile(Deno.args[1])
+const note_filter = Deno.args.length >= 3 ? Deno.args[2] : ""
+
+const ws = new WebSocket(Deno.args[0])
+
+function run_enqueue() {
+ let kind = "http"
+ for (const line of file.split("\n")) {
+ if (!line.trim().length) continue
+ else if (line.startsWith("[") && line.endsWith("]"))
+ kind = line.substring(1, line.length - 1)
+ else {
+ const [name, rest] = line.split("=", 2)
+ const [id, note] = rest.split(";", 2)
+ if (note_filter.length && note != note_filter) continue
+ const key = `${kind}:${id}`;
+ ws.send(JSON.stringify({ t: "metadata", key, data: { output: name, title: name } }))
+ ws.send(JSON.stringify({ t: "enqueue", key }))
+ }
+ }
+ ws.send(JSON.stringify({ t: "save" }))
+ console.log("done");
+}
+
+ws.onerror = () => console.error("ws error")
+ws.onclose = () => console.error("ws closed")
+ws.onopen = () => {
+ console.log("ws open");
+ ws.send(JSON.stringify({ t: "register", name: "enqueuer", sources: [] }))
+ run_enqueue()
+}
+ws.onmessage = ev => {
+ console.log(ev.data);
+}
diff --git a/scripts/ytdlp_flatten.ts b/scripts/ytdlp_flatten.ts
index f2eaa65..6a3eb5e 100644
--- a/scripts/ytdlp_flatten.ts
+++ b/scripts/ytdlp_flatten.ts
@@ -5,7 +5,9 @@ const ws = new WebSocket(Deno.args[0])
ws.onerror = () => console.error("ws error")
ws.onclose = () => console.error("ws closed")
ws.onopen = () => {
+ console.log("ws open");
ws.send(JSON.stringify({ t: "register", name: "yt-dlp playlist flattener", sources: ["ytdlp-flatten"] }))
+ ws.send(JSON.stringify({ t: "accept" }))
}
ws.onmessage = ev => {
console.log(ev.data);
diff --git a/src/main.rs b/src/main.rs
index a08ae6b..5096ae8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,4 +1,4 @@
-pub mod ui;
+pub mod webui;
pub mod worker_ws;
use anyhow::Result;
@@ -14,6 +14,7 @@ use tokio::{
net::TcpListener,
sync::{RwLock, mpsc::Sender},
};
+use webui::webui;
use worker_ws::{WorkerID, WorkerResponse, worker_websocket};
struct Worker {
@@ -40,7 +41,7 @@ async fn main() -> Result<()> {
let mut state = State::default();
state.load().await?;
let router = Router::new()
- .route("/", get(async || "Hello world!"))
+ .route("/", get(webui))
.route("/worker_ws", get(worker_websocket))
.with_state(Arc::new(RwLock::new(state)));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
diff --git a/src/ui.rs b/src/ui.rs
deleted file mode 100644
index e69de29..0000000
--- a/src/ui.rs
+++ /dev/null
diff --git a/src/webui.rs b/src/webui.rs
new file mode 100644
index 0000000..9e8b9c3
--- /dev/null
+++ b/src/webui.rs
@@ -0,0 +1,63 @@
+use crate::State;
+use axum::extract::State as S;
+use axum::response::Html;
+use serde_json::{Map, Value};
+use std::sync::Arc;
+use tokio::sync::RwLock;
+
+pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
+ let g = state.read().await;
+
+ let doc = markup::new! {
+ html {
+ head {
+ meta[charset="UTF-8"];
+ title { "" }
+ }
+ body {
+ section {
+ h2 { "Workers"}
+ ul { @for (id, w) in &g.workers {
+ li { div {
+ h3 { @w.name }
+ span { "ID: " @id } " "
+ @if w.accept > 0 {
+ span { "Accepting Jobs (" @w.accept ")" }
+ } else {
+ span { "Idle" }
+ }
+ }}
+ }}
+ }
+ section {
+ h2 { "Queued" }
+ ul { @for key in &g.queue {
+ li { @key }
+ }}
+ }
+ section {
+ h2 { "Loading" }
+ ul { @for key in &g.loading {
+ li { @key }
+ }}
+ }
+ section {
+ h2 { "Completed" }
+ ul { @for key in &g.complete {
+ li { @key }
+ }}
+ }
+ }
+ }
+ };
+ Html(doc.to_string())
+}
+
+markup::define!(
+ Task<'a>(key: &'a str, data: &'a Map<String, Value>) {
+ div.task {
+ h3 { @data["title"].as_str().unwrap_or(key) }
+ spawn.key { @key }
+ }
+ }
+);
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index f645c11..2d82e4d 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -37,6 +37,7 @@ pub enum WorkerRequest {
key: String,
},
Accept,
+ Save,
_EchoError {
message: String,
@@ -93,6 +94,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
});
+ let state2 = state.clone();
let mut recv_task = spawn(async move {
while let Some(message) = recv.next().await {
if let Ok(m) = message {
@@ -107,7 +109,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
};
- if let Err(e) = state.write().await.handle_worker_message(worker, req) {
+ if let Err(e) = state.write().await.handle_worker_message(worker, req).await
+ {
warn!("error processing request: {e:?}");
state.write().await.send_to_worker(
worker,
@@ -127,6 +130,11 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
_ = &mut send_task => recv_task.abort(),
_ = &mut recv_task => send_task.abort(),
};
+
+ {
+ let mut g = state2.write().await;
+ g.workers.remove(&worker).unwrap();
+ }
}
impl State {
@@ -135,7 +143,7 @@ impl State {
warn!("worker ws response overflow");
}
}
- pub fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
+ pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
let worker = self.workers.get_mut(&w).unwrap();
match req {
WorkerRequest::Register { name, sources } => {
@@ -159,6 +167,9 @@ impl State {
WorkerRequest::Accept => {
worker.accept += 1;
}
+ WorkerRequest::Save => {
+ self.save().await?;
+ }
WorkerRequest::_EchoError { message } => {
self.send_to_worker(w, WorkerResponse::Error { message });
}