aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-06-02 22:22:11 +0200
committermetamuffin <metamuffin@disroot.org>2025-06-02 22:22:11 +0200
commit2b56668dc89d61248fffeb75a6c8d1136aa7fa39 (patch)
treec45810bd75e2792ee51e973d336485dbc669be11
parent63b0aa6a244d017a0e3e41766b403c7e5072bd4b (diff)
downloadisda-2b56668dc89d61248fffeb75a6c8d1136aa7fa39.tar
isda-2b56668dc89d61248fffeb75a6c8d1136aa7fa39.tar.bz2
isda-2b56668dc89d61248fffeb75a6c8d1136aa7fa39.tar.zst
add task query functionallity to worker_ws
-rw-r--r--src/webui.rs2
-rw-r--r--src/webui_ws.rs15
-rw-r--r--src/worker_ws.rs57
3 files changed, 64 insertions, 10 deletions
diff --git a/src/webui.rs b/src/webui.rs
index a17f53e..6a80bfe 100644
--- a/src/webui.rs
+++ b/src/webui.rs
@@ -1,7 +1,7 @@
use crate::{
State,
helper::{Css, Javascript},
- webui_ws::TaskState,
+ worker_ws::TaskState,
};
use axum::extract::State as S;
use axum::response::Html;
diff --git a/src/webui_ws.rs b/src/webui_ws.rs
index 6b5821e..57393df 100644
--- a/src/webui_ws.rs
+++ b/src/webui_ws.rs
@@ -1,4 +1,7 @@
-use crate::{State, webui, worker_ws::WorkerID};
+use crate::{
+ State, webui,
+ worker_ws::{TaskState, WorkerID},
+};
use axum::{
extract::{
State as S, WebSocketUpgrade,
@@ -37,20 +40,13 @@ pub enum WebuiEvent {
},
}
-#[derive(Debug, Serialize, Clone, Copy, PartialEq)]
-#[serde(rename_all = "snake_case")]
-pub enum TaskState {
- Queue,
- Loading,
- Complete,
-}
-
pub(crate) async fn webui_websocket(
ws: WebSocketUpgrade,
S(state): S<Arc<RwLock<State>>>,
) -> impl IntoResponse {
ws.on_upgrade(|ws| webui_websocket_inner(ws, state))
}
+
async fn webui_websocket_inner(mut ws: WebSocket, state: Arc<RwLock<State>>) {
let mut stream = state.read().await.webui_broadcast.subscribe();
while let Ok(ev) = stream.recv().await {
@@ -63,6 +59,7 @@ async fn webui_websocket_inner(mut ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
}
+
impl State {
pub fn send_webui_task_removal(&self, key: &str) {
if self.webui_broadcast.receiver_count() > 0 {
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index 9bd90ca..bf0bf7f 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -40,6 +40,12 @@ pub enum WorkerRequest {
#[serde(default)]
force: bool,
},
+ Query {
+ cookie: String,
+ state: Option<TaskState>,
+ kind: Option<String>,
+ data: Map<String, Value>,
+ },
Accept,
Save,
@@ -59,11 +65,23 @@ pub enum WorkerResponse {
Config {
config: Value,
},
+ QueryResponse {
+ cookie: String,
+ keys: Vec<String>,
+ },
Error {
message: String,
},
}
+#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub enum TaskState {
+ Queue,
+ Loading,
+ Complete,
+}
+
pub(crate) async fn worker_websocket(
ws: WebSocketUpgrade,
S(state): S<Arc<RwLock<State>>>,
@@ -226,6 +244,45 @@ impl State {
self.send_webui_worker_update(w);
self.dispatch_work();
}
+ WorkerRequest::Query {
+ cookie,
+ state,
+ kind,
+ data,
+ } => {
+ let mut output = Vec::new();
+ let kind_prefix = kind.map(|k| format!("{k}:"));
+ for (bin, set) in [
+ (TaskState::Queue, &self.queue),
+ (TaskState::Loading, &self.loading),
+ (TaskState::Complete, &self.complete),
+ ] {
+ if state.map_or(true, |s| s == bin) {
+ 'taskloop: for key in set {
+ if kind_prefix.as_ref().map_or(true, |p| key.starts_with(p)) {
+ if !data.is_empty() {
+ let Some(cdata) = self.metadata.get(key) else {
+ continue 'taskloop;
+ };
+ for (k, v) in &data {
+ if cdata.get(k) != Some(v) {
+ continue 'taskloop;
+ }
+ }
+ }
+ output.push(key.to_owned())
+ }
+ }
+ }
+ }
+ self.send_to_worker(
+ w,
+ WorkerResponse::QueryResponse {
+ cookie,
+ keys: output,
+ },
+ );
+ }
WorkerRequest::Save => {
self.save().await?;
}