diff options
Diffstat (limited to 'src/worker_ws.rs')
-rw-r--r-- | src/worker_ws.rs | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/src/worker_ws.rs b/src/worker_ws.rs index f645c11..2d82e4d 100644 --- a/src/worker_ws.rs +++ b/src/worker_ws.rs @@ -37,6 +37,7 @@ pub enum WorkerRequest { key: String, }, Accept, + Save, _EchoError { message: String, @@ -93,6 +94,7 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { } } }); + let state2 = state.clone(); let mut recv_task = spawn(async move { while let Some(message) = recv.next().await { if let Ok(m) = message { @@ -107,7 +109,8 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { } } }; - if let Err(e) = state.write().await.handle_worker_message(worker, req) { + if let Err(e) = state.write().await.handle_worker_message(worker, req).await + { warn!("error processing request: {e:?}"); state.write().await.send_to_worker( worker, @@ -127,6 +130,11 @@ async fn worker_websocket_inner(ws: WebSocket, state: Arc<RwLock<State>>) { _ = &mut send_task => recv_task.abort(), _ = &mut recv_task => send_task.abort(), }; + + { + let mut g = state2.write().await; + g.workers.remove(&worker).unwrap(); + } } impl State { @@ -135,7 +143,7 @@ impl State { warn!("worker ws response overflow"); } } - pub fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { + pub async fn handle_worker_message(&mut self, w: WorkerID, req: WorkerRequest) -> Result<()> { let worker = self.workers.get_mut(&w).unwrap(); match req { WorkerRequest::Register { name, sources } => { @@ -159,6 +167,9 @@ impl State { WorkerRequest::Accept => { worker.accept += 1; } + WorkerRequest::Save => { + self.save().await?; + } WorkerRequest::_EchoError { message } => { self.send_to_worker(w, WorkerResponse::Error { message }); } |