diff options
author | metamuffin <metamuffin@disroot.org> | 2025-06-02 22:22:11 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-06-02 22:22:11 +0200 |
commit | 2b56668dc89d61248fffeb75a6c8d1136aa7fa39 (patch) | |
tree | c45810bd75e2792ee51e973d336485dbc669be11 | |
parent | 63b0aa6a244d017a0e3e41766b403c7e5072bd4b (diff) | |
download | isda-2b56668dc89d61248fffeb75a6c8d1136aa7fa39.tar isda-2b56668dc89d61248fffeb75a6c8d1136aa7fa39.tar.bz2 isda-2b56668dc89d61248fffeb75a6c8d1136aa7fa39.tar.zst |
add task query functionallity to worker_ws
-rw-r--r-- | src/webui.rs | 2 | ||||
-rw-r--r-- | src/webui_ws.rs | 15 | ||||
-rw-r--r-- | src/worker_ws.rs | 57 |
3 files changed, 64 insertions, 10 deletions
diff --git a/src/webui.rs b/src/webui.rs index a17f53e..6a80bfe 100644 --- a/src/webui.rs +++ b/src/webui.rs @@ -1,7 +1,7 @@ use crate::{ State, helper::{Css, Javascript}, - webui_ws::TaskState, + worker_ws::TaskState, }; use axum::extract::State as S; use axum::response::Html; diff --git a/src/webui_ws.rs b/src/webui_ws.rs index 6b5821e..57393df 100644 --- a/src/webui_ws.rs +++ b/src/webui_ws.rs @@ -1,4 +1,7 @@ -use crate::{State, webui, worker_ws::WorkerID}; +use crate::{ + State, webui, + worker_ws::{TaskState, WorkerID}, +}; use axum::{ extract::{ State as S, WebSocketUpgrade, @@ -37,20 +40,13 @@ pub enum WebuiEvent { }, } -#[derive(Debug, Serialize, Clone, Copy, PartialEq)] -#[serde(rename_all = "snake_case")] -pub enum TaskState { - Queue, - Loading, - Complete, -} - pub(crate) async fn webui_websocket( ws: WebSocketUpgrade, S(state): S<Arc<RwLock<State>>>, ) -> impl IntoResponse { ws.on_upgrade(|ws| webui_websocket_inner(ws, state)) } + async fn webui_websocket_inner(mut ws: WebSocket, state: Arc<RwLock<State>>) { let mut stream = state.read().await.webui_broadcast.subscribe(); while let Ok(ev) = stream.recv().await { @@ -63,6 +59,7 @@ async fn webui_websocket_inner(mut ws: WebSocket, state: Arc<RwLock<State>>) { } } } + impl State { pub fn send_webui_task_removal(&self, key: &str) { if self.webui_broadcast.receiver_count() > 0 { diff --git a/src/worker_ws.rs b/src/worker_ws.rs index 9bd90ca..bf0bf7f 100644 --- a/src/worker_ws.rs +++ b/src/worker_ws.rs @@ -40,6 +40,12 @@ pub enum WorkerRequest { #[serde(default)] force: bool, }, + Query { + cookie: String, + state: Option<TaskState>, + kind: Option<String>, + data: Map<String, Value>, + }, Accept, Save, @@ -59,11 +65,23 @@ pub enum WorkerResponse { Config { config: Value, }, + QueryResponse { + cookie: String, + keys: Vec<String>, + }, Error { message: String, }, } +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum TaskState { + Queue, + Loading, + Complete, +} + pub(crate) async fn worker_websocket( ws: WebSocketUpgrade, S(state): S<Arc<RwLock<State>>>, @@ -226,6 +244,45 @@ impl State { self.send_webui_worker_update(w); self.dispatch_work(); } + WorkerRequest::Query { + cookie, + state, + kind, + data, + } => { + let mut output = Vec::new(); + let kind_prefix = kind.map(|k| format!("{k}:")); + for (bin, set) in [ + (TaskState::Queue, &self.queue), + (TaskState::Loading, &self.loading), + (TaskState::Complete, &self.complete), + ] { + if state.map_or(true, |s| s == bin) { + 'taskloop: for key in set { + if kind_prefix.as_ref().map_or(true, |p| key.starts_with(p)) { + if !data.is_empty() { + let Some(cdata) = self.metadata.get(key) else { + continue 'taskloop; + }; + for (k, v) in &data { + if cdata.get(k) != Some(v) { + continue 'taskloop; + } + } + } + output.push(key.to_owned()) + } + } + } + } + self.send_to_worker( + w, + WorkerResponse::QueryResponse { + cookie, + keys: output, + }, + ); + } WorkerRequest::Save => { self.save().await?; } |