diff options
Diffstat (limited to 'src/proxy.rs')
-rw-r--r-- | src/proxy.rs | 91 |
1 files changed, 48 insertions, 43 deletions
diff --git a/src/proxy.rs b/src/proxy.rs index 036bbc1..7070500 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,4 +1,4 @@ -use crate::ServiceError; +use crate::{ServiceError, State}; use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{ body::Incoming, @@ -11,10 +11,11 @@ use hyper::{ Request, Uri, }; use log::{debug, error, warn}; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpStream; pub async fn proxy_request( + state: &Arc<State>, mut req: Request<Incoming>, addr: SocketAddr, backend: &SocketAddr, @@ -47,51 +48,55 @@ pub async fn proxy_request( let do_upgrade = req.headers().contains_key(UPGRADE); let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); - debug!("\tforwarding to {}", backend); - let mut resp = { - let client_stream = TcpStream::connect(backend) - .await - .map_err(|_| ServiceError::CantConnect)?; + if let Some(_limit_guard) = state.l_outgoing.obtain() { + debug!("\tforwarding to {}", backend); + let mut resp = { + let client_stream = TcpStream::connect(backend) + .await + .map_err(|_| ServiceError::CantConnect)?; - let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) - .await - .map_err(ServiceError::Hyper)?; - tokio::task::spawn(async move { - if let Err(err) = conn.await { - warn!("connection failed: {:?}", err); - } - }); - sender - .send_request(req) - .await - .map_err(ServiceError::Hyper)? - }; + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; - if do_upgrade { - let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); - tokio::task::spawn(async move { - debug!("about upgrading connection, sending empty response"); - match ( - on_upgrade_upstream.unwrap().await, - on_upgrade_downstream.unwrap().await, - ) { - (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { - debug!("upgrade successful"); - match tokio::io::copy_bidirectional( - &mut upgraded_downstream, - &mut upgraded_upstream, - ) - .await - { - Ok((from_client, from_server)) => { - debug!("proxy socket terminated: {from_server} sent, {from_client} received") + if do_upgrade { + let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); + tokio::task::spawn(async move { + debug!("about upgrading connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut upgraded_downstream, + &mut upgraded_upstream, + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), } - Err(e) => warn!("proxy socket error: {e}"), } + (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), } - (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), - } - }); + }); + } + Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) + } else { + Err(ServiceError::Limit) } - Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) } |