aboutsummaryrefslogtreecommitdiff
path: root/src/worker_ws.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-05-17 18:27:00 +0200
committermetamuffin <metamuffin@disroot.org>2025-05-17 18:27:00 +0200
commit0c4cb405f9b166398a2bf7e128c47fa56dfa2d71 (patch)
tree06db763a47dcfa9c975f3d797d8ee2534d37b1be /src/worker_ws.rs
parent1c27a83409a7f51c5d07098cb6ca65bcee870d9c (diff)
downloadisda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar
isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.bz2
isda-0c4cb405f9b166398a2bf7e128c47fa56dfa2d71.tar.zst
enqueue works
Diffstat (limited to 'src/worker_ws.rs')
-rw-r--r--src/worker_ws.rs15
1 files changed, 13 insertions, 2 deletions
diff --git a/src/worker_ws.rs b/src/worker_ws.rs
index f645c11..2d82e4d 100644
--- a/src/worker_ws.rs
+++ b/src/worker_ws.rs
@@ -37,6 +37,7 @@ pub enum WorkerRequest {
key: String,
},
Accept,
+ Save,
_EchoError {
message: String,
@@ -93,6 +94,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
});
+ let state2 = state.clone();
let mut recv_task = spawn(async move {
while let Some(message) = recv.next().await {
if let Ok(m) = message {
@@ -107,7 +109,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
}
}
};
- if let Err(e) = state.write().await.handle_worker_message(worker, req) {
+ if let Err(e) = state.write().await.handle_worker_message(worker, req).await
+ {
warn!("error processing request: {e:?}");
state.write().await.send_to_worker(
worker,
@@ -127,6 +130,11 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) {
_ = &mut send_task => recv_task.abort(),
_ = &mut recv_task => send_task.abort(),
};
+
+ {
+ let mut g = state2.write().await;
+ g.workers.remove(&worker).unwrap();
+ }
}
impl State {
@@ -135,7 +143,7 @@ impl State {
warn!("worker ws response overflow");
}
}
- pub fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
+ pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> {
let worker = self.workers.get_mut(&w).unwrap();
match req {
WorkerRequest::Register { name, sources } => {
@@ -159,6 +167,9 @@ impl State {
WorkerRequest::Accept => {
worker.accept += 1;
}
+ WorkerRequest::Save => {
+ self.save().await?;
+ }
WorkerRequest::_EchoError { message } => {
self.send_to_worker(w, WorkerResponse::Error { message });
}