From 56fb681279b2f2221eef933617d521469c6e6d83 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 6 Apr 2023 20:23:00 +0200 Subject: apply limits --- src/proxy.rs | 91 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 48 insertions(+), 43 deletions(-) (limited to 'src/proxy.rs') diff --git a/src/proxy.rs b/src/proxy.rs index 036bbc1..7070500 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,4 +1,4 @@ -use crate::ServiceError; +use crate::{ServiceError, State}; use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{ body::Incoming, @@ -11,10 +11,11 @@ use hyper::{ Request, Uri, }; use log::{debug, error, warn}; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpStream; pub async fn proxy_request( + state: &Arc, mut req: Request, addr: SocketAddr, backend: &SocketAddr, @@ -47,51 +48,55 @@ pub async fn proxy_request( let do_upgrade = req.headers().contains_key(UPGRADE); let on_upgrade_downstream = req.extensions_mut().remove::(); - debug!("\tforwarding to {}", backend); - let mut resp = { - let client_stream = TcpStream::connect(backend) - .await - .map_err(|_| ServiceError::CantConnect)?; + if let Some(_limit_guard) = state.l_outgoing.obtain() { + debug!("\tforwarding to {}", backend); + let mut resp = { + let client_stream = TcpStream::connect(backend) + .await + .map_err(|_| ServiceError::CantConnect)?; - let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) - .await - .map_err(ServiceError::Hyper)?; - tokio::task::spawn(async move { - if let Err(err) = conn.await { - warn!("connection failed: {:?}", err); - } - }); - sender - .send_request(req) - .await - .map_err(ServiceError::Hyper)? - }; + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; - if do_upgrade { - let on_upgrade_upstream = resp.extensions_mut().remove::(); - tokio::task::spawn(async move { - debug!("about upgrading connection, sending empty response"); - match ( - on_upgrade_upstream.unwrap().await, - on_upgrade_downstream.unwrap().await, - ) { - (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { - debug!("upgrade successful"); - match tokio::io::copy_bidirectional( - &mut upgraded_downstream, - &mut upgraded_upstream, - ) - .await - { - Ok((from_client, from_server)) => { - debug!("proxy socket terminated: {from_server} sent, {from_client} received") + if do_upgrade { + let on_upgrade_upstream = resp.extensions_mut().remove::(); + tokio::task::spawn(async move { + debug!("about upgrading connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut upgraded_downstream, + &mut upgraded_upstream, + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), } - Err(e) => warn!("proxy socket error: {e}"), } + (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), } - (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), - } - }); + }); + } + Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) + } else { + Err(ServiceError::Limit) } - Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) } -- cgit v1.2.3-70-g09d2