aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/enqueue.ts6
-rw-r--r--scripts/ytdlp_flatten.ts55
-rw-r--r--src/main.rs5
-rw-r--r--src/style.css12
-rw-r--r--src/webui.rs65
-rw-r--r--src/worker_ws.rs58
6 files changed, 153 insertions, 48 deletions
diff --git a/scripts/enqueue.ts b/scripts/enqueue.ts
index 2c2871d..e6e3958 100644
--- a/scripts/enqueue.ts
+++ b/scripts/enqueue.ts
@@ -27,9 +27,11 @@ ws.onerror = () => console.error("ws error")
ws.onclose = () => console.error("ws closed")
ws.onopen = () => {
console.log("ws open");
- ws.send(JSON.stringify({ t: "register", name: "enqueuer", sources: [] }))
+ ws.send(JSON.stringify({ t: "register", name: "enqueuer", task_kinds: [] }))
run_enqueue()
}
ws.onmessage = ev => {
- console.log(ev.data);
+ if (typeof ev.data != "string") return
+ const p = JSON.parse(ev.data)
+ if (p.t == "error") console.error(`error: ${p.message}`);
}
diff --git a/scripts/ytdlp_flatten.ts b/scripts/ytdlp_flatten.ts
index 6a3eb5e..7cd9da1 100644
--- a/scripts/ytdlp_flatten.ts
+++ b/scripts/ytdlp_flatten.ts
@@ -1,14 +1,61 @@
-
const ws = new WebSocket(Deno.args[0])
+
+function key_to_url(key: string): [string, string] {
+ const [kind, id] = key.split(":", 2)
+ if (kind == "youtube-channel") return ["youtube", `https://youtube.com/channel/${id}`]
+ throw new Error("unknown kind");
+}
+
+async function flat_playlist(url: string, kind: string, output: string) {
+ console.log(output, url);
+ const o = await new Deno.Command("yt-dlp", {
+ args: [
+ "--flat-playlist",
+ "--print-json",
+ "--match-filter", "availability=public & live_status=not_live",
+ url
+ ]
+ }).output()
+ const otext = new TextDecoder().decode(o.stdout)
+ for (const line of otext.split("\n")) {
+ if (!line.length) continue
+ const ob = JSON.parse(line)
+ const key = `${kind}:${ob.id}`;
+ ws.send(JSON.stringify({
+ t: "metadata",
+ key,
+ data: {
+ title: ob.title,
+ subtitle: `by ${ob.playlist_uploader}; duration ${ob.duration_string}`,
+ description: ob.description,
+ thumbnail: ob.thumbnails[0]?.url,
+ output,
+ }
+ }))
+ ws.send(JSON.stringify({ t: "enqueue", key }))
+ }
+ console.log("done");
+}
+
ws.onerror = () => console.error("ws error")
ws.onclose = () => console.error("ws closed")
ws.onopen = () => {
console.log("ws open");
- ws.send(JSON.stringify({ t: "register", name: "yt-dlp playlist flattener", sources: ["ytdlp-flatten"] }))
+ ws.send(JSON.stringify({ t: "register", name: "yt-dlp playlist flattener", task_kinds: ["youtube-channel"] }))
ws.send(JSON.stringify({ t: "accept" }))
}
-ws.onmessage = ev => {
- console.log(ev.data);
+ws.onmessage = async ev => {
+ if (typeof ev.data != "string") return
+ const p = JSON.parse(ev.data)
+ if (p.t == "error") console.error(`error: ${p.message}`);
+ if (p.t == "work") {
+ const [outkind, url] = key_to_url(p.key)
+ if (!p.data.output) throw new Error("no output");
+ await flat_playlist(url, outkind, p.data.output)
+ ws.send(JSON.stringify({ t: "complete", key: p.key }))
+ ws.send(JSON.stringify({ t: "save" }))
+ ws.send(JSON.stringify({ t: "accept" }))
+ }
}
diff --git a/src/main.rs b/src/main.rs
index 354d303..e0af010 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -21,12 +21,13 @@ use worker_ws::{WorkerID, WorkerResponse, worker_websocket};
pub struct Worker {
accept: usize,
name: String,
- sources: Vec<String>,
+ task_kinds: Vec<String>,
+ assigned_tasks: HashSet<String>,
send: Sender<WorkerResponse>,
}
#[derive(Default)]
-struct State {
+pub struct State {
worker_id_counter: WorkerID,
workers: HashMap<WorkerID, Worker>,
diff --git a/src/style.css b/src/style.css
index b0baf42..9eb1187 100644
--- a/src/style.css
+++ b/src/style.css
@@ -1,6 +1,7 @@
:root {
background-color: #111111;
color: white;
+ font-family: sans-serif;
}
.task {
@@ -9,6 +10,17 @@
border-radius: 10px;
border: 2px solid white;
}
+.task h3 {
+ text-overflow: ellipsis;
+ white-space: nowrap;
+ overflow: hidden;
+ width: calc(33dvw - 3em);
+}
+.task img {
+ width: 128px;
+ height: 72px;
+ float: left;
+}
.task.queue {
border-color: #6e6eff;
}
diff --git a/src/webui.rs b/src/webui.rs
index 9b21ab0..5a66ffc 100644
--- a/src/webui.rs
+++ b/src/webui.rs
@@ -3,7 +3,7 @@ use axum::extract::State as S;
use axum::response::Html;
use markup::doctype;
use serde_json::{Map, Value};
-use std::sync::Arc;
+use std::{collections::HashSet, sync::Arc};
use tokio::{fs::read_to_string, sync::RwLock};
pub(crate) async fn webui_style() -> Css<String> {
@@ -16,6 +16,8 @@ pub(crate) async fn webui_style() -> Css<String> {
pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
let g = state.read().await;
+ let default = &Map::new();
+ let g = &g;
let doc = markup::new! {
@doctype()
@@ -33,27 +35,9 @@ pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
}}
}
section.tasks {
- div {
- h2 { "Queued" }
- p.count { @g.queue.len() " tasks" }
- ul { @for key in &g.queue {
- li { @Task { key, data: g.metadata.get(key).unwrap(), class: "task queue" } }
- }}
- }
- div {
- h2 { "Loading" }
- p.count { @g.loading.len() " tasks" }
- ul { @for key in &g.loading {
- li { @Task { key, data: g.metadata.get(key).unwrap(), class: "task loading" } }
- }}
- }
- div {
- h2 { "Completed" }
- p.count { @g.complete.len() " tasks" }
- ul { @for key in &g.complete {
- li { @Task { key, data: g.metadata.get(key).unwrap(), class: "task complete" } }
- }}
- }
+ @Taskbin {title: "Queued", class: "task queue", set: &g.queue, default, g }
+ @Taskbin {title: "Loading", class: "task loading", set: &g.loading, default, g }
+ @Taskbin {title: "Completed", class: "task complete", set: &g.complete, default, g }
}
}
}
@@ -62,28 +46,47 @@ pub(crate) async fn webui(S(state): S<Arc<RwLock<State>>>) -> Html<String> {
}
markup::define!(
+ Taskbin<'a>(title: &'a str, class: &'a str, set: &'a HashSet<String>, g: &'a State, default: &'a Map<String, Value>) {
+ div {
+ h2 { @title }
+ p.count { @set.len() " tasks" }
+ ul { @for key in set.iter().take(128) {
+ li { @Task { key, data: g.metadata.get(key).unwrap_or(&default), class } }
+ }}
+ }
+ }
+ Task<'a>(key: &'a str, data: &'a Map<String, Value>, class: &'a str) {
+ div[class=class] {
+ // @if let Some(url) = data.get("thumbnail").and_then(Value::as_str) {
+ // img[src=url, loading="lazy"];
+ // }
+ h3 { @data.get("title").and_then(Value::as_str).unwrap_or(key) }
+ @if let Some(s) = data.get("subtitle").and_then(Value::as_str) {
+ span.subtitle { @s } br;
+ }
+ span.key { @key }
+ }
+ }
Worker<'a>(id: u64, w: &'a crate::Worker) {
div[class=worker_class(w)] {
h3 { @w.name }
- span { "ID: " @id } " "
- @if w.accept > 0 {
- span { "Accepting Jobs (" @w.accept ")" }
+ span { "ID: " @id } ", "
+ @if !w.assigned_tasks.is_empty() {
+ span { "Busy (" @w.assigned_tasks.len() ")" }
+ } else if w.accept > 0 {
+ span { "Accepting Tasks (" @w.accept ")" }
} else {
span { "Idle" }
}
}
}
- Task<'a>(key: &'a str, data: &'a Map<String, Value>, class: &'a str) {
- div[class=class] {
- h3 { @data["title"].as_str().unwrap_or(key) }
- span.key { @key }
- }
- }
);
fn worker_class(w: &crate::Worker) -> &'static str {
if w.accept > 0 {
"worker accepting"
+ } else if w.assigned_tasks.is_empty() {
+ "worker idle"
} else {
"worker busy"
}
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index 2d82e4d..7b76476 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -8,10 +8,10 @@ use axum::{
response::IntoResponse,
};
use futures::{SinkExt, StreamExt};
-use log::warn;
+use log::{debug, warn};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
-use std::sync::Arc;
+use std::{collections::HashSet, sync::Arc};
use tokio::{
spawn,
sync::{RwLock, mpsc::channel},
@@ -24,7 +24,7 @@ pub type WorkerID = u64;
pub enum WorkerRequest {
Register {
name: String,
- sources: Vec<String>,
+ task_kinds: Vec<String>,
},
Metadata {
key: String,
@@ -77,7 +77,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
send: tx,
accept: 0,
name: "unknown".to_string(),
- sources: vec![],
+ task_kinds: vec![],
+ assigned_tasks: HashSet::new(),
},
);
id
@@ -133,22 +134,29 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
{
let mut g = state2.write().await;
- g.workers.remove(&worker).unwrap();
+ let w = g.workers.remove(&worker).unwrap();
+ // recycle incomplete tasks
+ for key in w.assigned_tasks {
+ g.loading.remove(&key);
+ g.queue.insert(key);
+ }
}
}
impl State {
pub fn send_to_worker(&mut self, w: WorkerID, resp: WorkerResponse) {
+ debug!("{w} -> {resp:?}");
if let Err(_) = self.workers[&w].send.try_send(resp) {
warn!("worker ws response overflow");
}
}
pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
+ debug!("{w} <- {req:?}");
let worker = self.workers.get_mut(&w).unwrap();
match req {
- WorkerRequest::Register { name, sources } => {
+ WorkerRequest::Register { name, task_kinds } => {
worker.name = name;
- worker.sources = sources;
+ worker.task_kinds = task_kinds;
}
WorkerRequest::Metadata { key, data } => {
let m = self.metadata.entry(key).or_default();
@@ -156,16 +164,19 @@ impl State {
}
WorkerRequest::Enqueue { key } => {
self.queue.insert(key);
+ self.dispatch_work();
}
WorkerRequest::Complete { key } => {
- if self.loading.remove(&key) {
+ if worker.assigned_tasks.remove(&key) {
+ self.loading.remove(&key);
self.complete.insert(key);
} else {
- bail!("was not loading")
+ bail!("task was not assigned")
}
}
WorkerRequest::Accept => {
worker.accept += 1;
+ self.dispatch_work();
}
WorkerRequest::Save => {
self.save().await?;
@@ -177,4 +188,33 @@ impl State {
Ok(())
}
+
+ pub fn dispatch_work(&mut self) {
+ let mut to_send = Vec::new();
+ for (id, w) in &mut self.workers {
+ if w.accept >= 1 {
+ for kind in &w.task_kinds {
+ let prefix = format!("{kind}:");
+ let Some(first) = self.queue.iter().find(|e| e.starts_with(&prefix)).cloned()
+ else {
+ continue;
+ };
+ w.accept -= 1;
+ w.assigned_tasks.insert(first.clone());
+ to_send.push((*id, first));
+ }
+ }
+ }
+ for (w, key) in to_send {
+ self.queue.remove(&key);
+ self.loading.insert(key.clone());
+ self.send_to_worker(
+ w,
+ WorkerResponse::Work {
+ data: self.metadata[&key].clone(),
+ key,
+ },
+ );
+ }
+ }
}