diff options
author | metamuffin <metamuffin@disroot.org> | 2023-08-28 15:02:14 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-08-28 15:02:14 +0200 |
commit | 186bf476aeab0ff0838d1ae26a9dbcb2e40a8eb5 (patch) | |
tree | 384ed6e8faaacd77b1a5f4f11a251ee228f1e927 /src/proxy.rs | |
parent | 2bc557bbddb01b535dd8512fe3aadb0d4091a42d (diff) | |
download | gnix-186bf476aeab0ff0838d1ae26a9dbcb2e40a8eb5.tar gnix-186bf476aeab0ff0838d1ae26a9dbcb2e40a8eb5.tar.bz2 gnix-186bf476aeab0ff0838d1ae26a9dbcb2e40a8eb5.tar.zst |
what i invented here already existed: semaphore
Diffstat (limited to 'src/proxy.rs')
-rw-r--r-- | src/proxy.rs | 91 |
1 files changed, 44 insertions, 47 deletions
diff --git a/src/proxy.rs b/src/proxy.rs index 10d7e3d..d38de4d 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -48,57 +48,54 @@ pub async fn proxy_request( let do_upgrade = req.headers().contains_key(UPGRADE); let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); - if let Some(_limit_guard) = state.l_outgoing.obtain() { - debug!("\tforwarding to {}", backend); - let mut resp = { - let client_stream = TokioIo( - TcpStream::connect(backend) - .await - .map_err(|_| ServiceError::CantConnect)?, - ); - - let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) - .await - .map_err(ServiceError::Hyper)?; - tokio::task::spawn(async move { - if let Err(err) = conn.await { - warn!("connection failed: {:?}", err); - } - }); - sender - .send_request(req) + let _limit_guard = state.l_outgoing.try_acquire()?; + debug!("\tforwarding to {}", backend); + let mut resp = { + let client_stream = TokioIo( + TcpStream::connect(backend) .await - .map_err(ServiceError::Hyper)? - }; + .map_err(|_| ServiceError::CantConnect)?, + ); + + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; - if do_upgrade { - let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); - tokio::task::spawn(async move { - debug!("about upgrading connection, sending empty response"); - match ( - on_upgrade_upstream.unwrap().await, - on_upgrade_downstream.unwrap().await, - ) { - (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { - debug!("upgrade successful"); - match tokio::io::copy_bidirectional( - &mut TokioIo(upgraded_downstream), - &mut TokioIo(upgraded_upstream), - ) - .await - { - Ok((from_client, from_server)) => { - debug!("proxy socket terminated: {from_server} sent, {from_client} received") - } - Err(e) => warn!("proxy socket error: {e}"), + if do_upgrade { + let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); + tokio::task::spawn(async move { + debug!("about upgrading connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut TokioIo(upgraded_downstream), + &mut TokioIo(upgraded_upstream), + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") } + Err(e) => warn!("proxy socket error: {e}"), } - (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), } - }); - } - Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) - } else { - Err(ServiceError::Limit) + (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), + } + }); } + Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) } |