aboutsummaryrefslogtreecommitdiff
path: root/src/worker_ws.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker_ws.rs')
-rw-r--r--src/worker_ws.rs57
1 files changed, 57 insertions, 0 deletions
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index 9bd90ca..bf0bf7f 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -40,6 +40,12 @@ pub enum WorkerRequest {
#[serde(default)]
force: bool,
},
+ Query {
+ cookie: String,
+ state: Option<TaskState>,
+ kind: Option<String>,
+ data: Map<String, Value>,
+ },
Accept,
Save,
@@ -59,11 +65,23 @@ pub enum WorkerResponse {
Config {
config: Value,
},
+ QueryResponse {
+ cookie: String,
+ keys: Vec<String>,
+ },
Error {
message: String,
},
}
+#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub enum TaskState {
+ Queue,
+ Loading,
+ Complete,
+}
+
pub(crate) async fn worker_websocket(
ws: WebSocketUpgrade,
S(state): S<Arc<RwLock<State>>>,
@@ -226,6 +244,45 @@ impl State {
self.send_webui_worker_update(w);
self.dispatch_work();
}
+ WorkerRequest::Query {
+ cookie,
+ state,
+ kind,
+ data,
+ } => {
+ let mut output = Vec::new();
+ let kind_prefix = kind.map(|k| format!("{k}:"));
+ for (bin, set) in [
+ (TaskState::Queue, &self.queue),
+ (TaskState::Loading, &self.loading),
+ (TaskState::Complete, &self.complete),
+ ] {
+ if state.map_or(true, |s| s == bin) {
+ 'taskloop: for key in set {
+ if kind_prefix.as_ref().map_or(true, |p| key.starts_with(p)) {
+ if !data.is_empty() {
+ let Some(cdata) = self.metadata.get(key) else {
+ continue 'taskloop;
+ };
+ for (k, v) in &data {
+ if cdata.get(k) != Some(v) {
+ continue 'taskloop;
+ }
+ }
+ }
+ output.push(key.to_owned())
+ }
+ }
+ }
+ }
+ self.send_to_worker(
+ w,
+ WorkerResponse::QueryResponse {
+ cookie,
+ keys: output,
+ },
+ );
+ }
WorkerRequest::Save => {
self.save().await?;
}