aboutsummaryrefslogtreecommitdiff
path: root/src/worker_ws.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker_ws.rs')
-rw-r--r--src/worker_ws.rs35
1 files changed, 29 insertions, 6 deletions
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index 7b76476..5f0e47c 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -32,9 +32,13 @@ pub enum WorkerRequest {
},
Enqueue {
key: String,
+ #[serde(default)]
+ ignore_complete: bool,
},
Complete {
key: String,
+ #[serde(default)]
+ force: bool,
},
Accept,
Save,
@@ -162,16 +166,35 @@ impl State {
let m = self.metadata.entry(key).or_default();
m.extend(data);
}
- WorkerRequest::Enqueue { key } => {
- self.queue.insert(key);
- self.dispatch_work();
+ WorkerRequest::Enqueue {
+ key,
+ ignore_complete,
+ } => {
+ if ignore_complete {
+ if !self.loading.contains(&key) {
+ self.complete.remove(&key);
+ self.queue.insert(key);
+ self.dispatch_work();
+ }
+ } else {
+ if !(self.complete.contains(&key) || self.loading.contains(&key)) {
+ self.queue.insert(key);
+ self.dispatch_work();
+ }
+ }
}
- WorkerRequest::Complete { key } => {
- if worker.assigned_tasks.remove(&key) {
+ WorkerRequest::Complete { key, force } => {
+ if force {
+ self.queue.remove(&key);
self.loading.remove(&key);
self.complete.insert(key);
} else {
- bail!("task was not assigned")
+ if worker.assigned_tasks.remove(&key) {
+ self.loading.remove(&key);
+ self.complete.insert(key);
+ } else {
+ bail!("task was not assigned")
+ }
}
}
WorkerRequest::Accept => {