From 15065f924214e00594bbc77ea9b4c7adf53c9b2a Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sat, 17 May 2025 23:05:56 +0200 Subject: flat playlist works --- scripts/enqueue.ts | 6 +++-- scripts/ytdlp_flatten.ts | 55 +++++++++++++++++++++++++++++++++++++--- src/main.rs | 5 ++-- src/style.css | 12 +++++++++ src/webui.rs | 65 +++++++++++++++++++++++++----------------------- src/worker_ws.rs | 58 +++++++++++++++++++++++++++++++++++------- 6 files changed, 153 insertions(+), 48 deletions(-) diff --git a/scripts/enqueue.ts b/scripts/enqueue.ts index 2c2871d..e6e3958 100644 --- a/scripts/enqueue.ts +++ b/scripts/enqueue.ts @@ -27,9 +27,11 @@ ws.onerror = () => console.error("ws error") ws.onclose = () => console.error("ws closed") ws.onopen = () => { console.log("ws open"); - ws.send(JSON.stringify({ t: "register", name: "enqueuer", sources: [] })) + ws.send(JSON.stringify({ t: "register", name: "enqueuer", task_kinds: [] })) run_enqueue() } ws.onmessage = ev => { - console.log(ev.data); + if (typeof ev.data != "string") return + const p = JSON.parse(ev.data) + if (p.t == "error") console.error(`error: ${p.message}`); } diff --git a/scripts/ytdlp_flatten.ts b/scripts/ytdlp_flatten.ts index 6a3eb5e..7cd9da1 100644 --- a/scripts/ytdlp_flatten.ts +++ b/scripts/ytdlp_flatten.ts @@ -1,14 +1,61 @@ - const ws = new WebSocket(Deno.args[0]) + +function key_to_url(key: string): [string, string] { + const [kind, id] = key.split(":", 2) + if (kind == "youtube-channel") return ["youtube", `https://youtube.com/channel/${id}`] + throw new Error("unknown kind"); +} + +async function flat_playlist(url: string, kind: string, output: string) { + console.log(output, url); + const o = await new Deno.Command("yt-dlp", { + args: [ + "--flat-playlist", + "--print-json", + "--match-filter", "availability=public & live_status=not_live", + url + ] + }).output() + const otext = new TextDecoder().decode(o.stdout) + for (const line of otext.split("\n")) { + if (!line.length) continue + const ob = JSON.parse(line) + const key = `${kind}:${ob.id}`; + ws.send(JSON.stringify({ + t: "metadata", + key, + data: { + title: ob.title, + subtitle: `by ${ob.playlist_uploader}; duration ${ob.duration_string}`, + description: ob.description, + thumbnail: ob.thumbnails[0]?.url, + output, + } + })) + ws.send(JSON.stringify({ t: "enqueue", key })) + } + console.log("done"); +} + ws.onerror = () => console.error("ws error") ws.onclose = () => console.error("ws closed") ws.onopen = () => { console.log("ws open"); - ws.send(JSON.stringify({ t: "register", name: "yt-dlp playlist flattener", sources: ["ytdlp-flatten"] })) + ws.send(JSON.stringify({ t: "register", name: "yt-dlp playlist flattener", task_kinds: ["youtube-channel"] })) ws.send(JSON.stringify({ t: "accept" })) } -ws.onmessage = ev => { - console.log(ev.data); +ws.onmessage = async ev => { + if (typeof ev.data != "string") return + const p = JSON.parse(ev.data) + if (p.t == "error") console.error(`error: ${p.message}`); + if (p.t == "work") { + const [outkind, url] = key_to_url(p.key) + if (!p.data.output) throw new Error("no output"); + await flat_playlist(url, outkind, p.data.output) + ws.send(JSON.stringify({ t: "complete", key: p.key })) + ws.send(JSON.stringify({ t: "save" })) + ws.send(JSON.stringify({ t: "accept" })) + } } diff --git a/src/main.rs b/src/main.rs index 354d303..e0af010 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,12 +21,13 @@ use worker_ws::{WorkerID, WorkerResponse, worker_websocket}; pub struct Worker { accept: usize, name: String, - sources: Vec, + task_kinds: Vec, + assigned_tasks: HashSet, send: Sender, } #[derive(Default)] -struct State { +pub struct State { worker_id_counter: WorkerID, workers: HashMap, diff --git a/src/style.css b/src/style.css index b0baf42..9eb1187 100644 --- a/src/style.css +++ b/src/style.css @@ -1,6 +1,7 @@ :root { background-color: #111111; color: white; + font-family: sans-serif; } .task { @@ -9,6 +10,17 @@ border-radius: 10px; border: 2px solid white; } +.task h3 { + text-overflow: ellipsis; + white-space: nowrap; + overflow: hidden; + width: calc(33dvw - 3em); +} +.task img { + width: 128px; + height: 72px; + float: left; +} .task.queue { border-color: #6e6eff; } diff --git a/src/webui.rs b/src/webui.rs index 9b21ab0..5a66ffc 100644 --- a/src/webui.rs +++ b/src/webui.rs @@ -3,7 +3,7 @@ use axum::extract::State as S; use axum::response::Html; use markup::doctype; use serde_json::{Map, Value}; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use tokio::{fs::read_to_string, sync::RwLock}; pub(crate) async fn webui_style() -> Css { @@ -16,6 +16,8 @@ pub(crate) async fn webui_style() -> Css { pub(crate) async fn webui(S(state): S>>) -> Html { let g = state.read().await; + let default = &Map::new(); + let g = &g; let doc = markup::new! { @doctype() @@ -33,27 +35,9 @@ pub(crate) async fn webui(S(state): S>>) -> Html { }} } section.tasks { - div { - h2 { "Queued" } - p.count { @g.queue.len() " tasks" } - ul { @for key in &g.queue { - li { @Task { key, data: g.metadata.get(key).unwrap(), class: "task queue" } } - }} - } - div { - h2 { "Loading" } - p.count { @g.loading.len() " tasks" } - ul { @for key in &g.loading { - li { @Task { key, data: g.metadata.get(key).unwrap(), class: "task loading" } } - }} - } - div { - h2 { "Completed" } - p.count { @g.complete.len() " tasks" } - ul { @for key in &g.complete { - li { @Task { key, data: g.metadata.get(key).unwrap(), class: "task complete" } } - }} - } + @Taskbin {title: "Queued", class: "task queue", set: &g.queue, default, g } + @Taskbin {title: "Loading", class: "task loading", set: &g.loading, default, g } + @Taskbin {title: "Completed", class: "task complete", set: &g.complete, default, g } } } } @@ -62,28 +46,47 @@ pub(crate) async fn webui(S(state): S>>) -> Html { } markup::define!( + Taskbin<'a>(title: &'a str, class: &'a str, set: &'a HashSet, g: &'a State, default: &'a Map) { + div { + h2 { @title } + p.count { @set.len() " tasks" } + ul { @for key in set.iter().take(128) { + li { @Task { key, data: g.metadata.get(key).unwrap_or(&default), class } } + }} + } + } + Task<'a>(key: &'a str, data: &'a Map, class: &'a str) { + div[class=class] { + // @if let Some(url) = data.get("thumbnail").and_then(Value::as_str) { + // img[src=url, loading="lazy"]; + // } + h3 { @data.get("title").and_then(Value::as_str).unwrap_or(key) } + @if let Some(s) = data.get("subtitle").and_then(Value::as_str) { + span.subtitle { @s } br; + } + span.key { @key } + } + } Worker<'a>(id: u64, w: &'a crate::Worker) { div[class=worker_class(w)] { h3 { @w.name } - span { "ID: " @id } " " - @if w.accept > 0 { - span { "Accepting Jobs (" @w.accept ")" } + span { "ID: " @id } ", " + @if !w.assigned_tasks.is_empty() { + span { "Busy (" @w.assigned_tasks.len() ")" } + } else if w.accept > 0 { + span { "Accepting Tasks (" @w.accept ")" } } else { span { "Idle" } } } } - Task<'a>(key: &'a str, data: &'a Map, class: &'a str) { - div[class=class] { - h3 { @data["title"].as_str().unwrap_or(key) } - span.key { @key } - } - } ); fn worker_class(w: &crate::Worker) -> &'static str { if w.accept > 0 { "worker accepting" + } else if w.assigned_tasks.is_empty() { + "worker idle" } else { "worker busy" } diff --git a/src/worker_ws.rs b/src/worker_ws.rs index 2d82e4d..7b76476 100644 --- a/src/worker_ws.rs +++ b/src/worker_ws.rs @@ -8,10 +8,10 @@ use axum::{ response::IntoResponse, }; use futures::{SinkExt, StreamExt}; -use log::warn; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use tokio::{ spawn, sync::{RwLock, mpsc::channel}, @@ -24,7 +24,7 @@ pub type WorkerID = u64; pub enum WorkerRequest { Register { name: String, - sources: Vec, + task_kinds: Vec, }, Metadata { key: String, @@ -77,7 +77,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc>) { send: tx, accept: 0, name: "unknown".to_string(), - sources: vec![], + task_kinds: vec![], + assigned_tasks: HashSet::new(), }, ); id @@ -133,22 +134,29 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc>) { { let mut g = state2.write().await; - g.workers.remove(&worker).unwrap(); + let w = g.workers.remove(&worker).unwrap(); + // recycle incomplete tasks + for key in w.assigned_tasks { + g.loading.remove(&key); + g.queue.insert(key); + } } } impl State { pub fn send_to_worker(&mut self, w: WorkerID, resp: WorkerResponse) { + debug!("{w} -> {resp:?}"); if let Err(_) = self.workers[&w].send.try_send(resp) { warn!("worker ws response overflow"); } } pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { + debug!("{w} <- {req:?}"); let worker = self.workers.get_mut(&w).unwrap(); match req { - WorkerRequest::Register { name, sources } => { + WorkerRequest::Register { name, task_kinds } => { worker.name = name; - worker.sources = sources; + worker.task_kinds = task_kinds; } WorkerRequest::Metadata { key, data } => { let m = self.metadata.entry(key).or_default(); @@ -156,16 +164,19 @@ impl State { } WorkerRequest::Enqueue { key } => { self.queue.insert(key); + self.dispatch_work(); } WorkerRequest::Complete { key } => { - if self.loading.remove(&key) { + if worker.assigned_tasks.remove(&key) { + self.loading.remove(&key); self.complete.insert(key); } else { - bail!("was not loading") + bail!("task was not assigned") } } WorkerRequest::Accept => { worker.accept += 1; + self.dispatch_work(); } WorkerRequest::Save => { self.save().await?; @@ -177,4 +188,33 @@ impl State { Ok(()) } + + pub fn dispatch_work(&mut self) { + let mut to_send = Vec::new(); + for (id, w) in &mut self.workers { + if w.accept >= 1 { + for kind in &w.task_kinds { + let prefix = format!("{kind}:"); + let Some(first) = self.queue.iter().find(|e| e.starts_with(&prefix)).cloned() + else { + continue; + }; + w.accept -= 1; + w.assigned_tasks.insert(first.clone()); + to_send.push((*id, first)); + } + } + } + for (w, key) in to_send { + self.queue.remove(&key); + self.loading.insert(key.clone()); + self.send_to_worker( + w, + WorkerResponse::Work { + data: self.metadata[&key].clone(), + key, + }, + ); + } + } } -- cgit v1.2.3-70-g09d2