aboutsummaryrefslogtreecommitdiff
path: root/src/proxy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/proxy.rs')
-rw-r--r--src/proxy.rs91
1 files changed, 48 insertions, 43 deletions
diff --git a/src/proxy.rs b/src/proxy.rs
index 036bbc1..7070500 100644
--- a/src/proxy.rs
+++ b/src/proxy.rs
@@ -1,4 +1,4 @@
-use crate::ServiceError;
+use crate::{ServiceError, State};
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{
body::Incoming,
@@ -11,10 +11,11 @@ use hyper::{
Request, Uri,
};
use log::{debug, error, warn};
-use std::net::SocketAddr;
+use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpStream;
pub async fn proxy_request(
+ state: &Arc<State>,
mut req: Request<Incoming>,
addr: SocketAddr,
backend: &SocketAddr,
@@ -47,51 +48,55 @@ pub async fn proxy_request(
let do_upgrade = req.headers().contains_key(UPGRADE);
let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();
- debug!("\tforwarding to {}", backend);
- let mut resp = {
- let client_stream = TcpStream::connect(backend)
- .await
- .map_err(|_| ServiceError::CantConnect)?;
+ if let Some(_limit_guard) = state.l_outgoing.obtain() {
+ debug!("\tforwarding to {}", backend);
+ let mut resp = {
+ let client_stream = TcpStream::connect(backend)
+ .await
+ .map_err(|_| ServiceError::CantConnect)?;
- let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
- .await
- .map_err(ServiceError::Hyper)?;
- tokio::task::spawn(async move {
- if let Err(err) = conn.await {
- warn!("connection failed: {:?}", err);
- }
- });
- sender
- .send_request(req)
- .await
- .map_err(ServiceError::Hyper)?
- };
+ let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
+ .await
+ .map_err(ServiceError::Hyper)?;
+ tokio::task::spawn(async move {
+ if let Err(err) = conn.await {
+ warn!("connection failed: {:?}", err);
+ }
+ });
+ sender
+ .send_request(req)
+ .await
+ .map_err(ServiceError::Hyper)?
+ };
- if do_upgrade {
- let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>();
- tokio::task::spawn(async move {
- debug!("about upgrading connection, sending empty response");
- match (
- on_upgrade_upstream.unwrap().await,
- on_upgrade_downstream.unwrap().await,
- ) {
- (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => {
- debug!("upgrade successful");
- match tokio::io::copy_bidirectional(
- &mut upgraded_downstream,
- &mut upgraded_upstream,
- )
- .await
- {
- Ok((from_client, from_server)) => {
- debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ if do_upgrade {
+ let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>();
+ tokio::task::spawn(async move {
+ debug!("about upgrading connection, sending empty response");
+ match (
+ on_upgrade_upstream.unwrap().await,
+ on_upgrade_downstream.unwrap().await,
+ ) {
+ (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => {
+ debug!("upgrade successful");
+ match tokio::io::copy_bidirectional(
+ &mut upgraded_downstream,
+ &mut upgraded_upstream,
+ )
+ .await
+ {
+ Ok((from_client, from_server)) => {
+ debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ }
+ Err(e) => warn!("proxy socket error: {e}"),
}
- Err(e) => warn!("proxy socket error: {e}"),
}
+ (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"),
}
- (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"),
- }
- });
+ });
+ }
+ Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed()))
+ } else {
+ Err(ServiceError::Limit)
}
- Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed()))
}