use crate::{helper::TokioIo, ServiceError, State}; use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{ body::Incoming, header::UPGRADE, http::{ uri::{PathAndQuery, Scheme}, HeaderValue, }, upgrade::OnUpgrade, Request, Uri, }; use log::{debug, error, warn}; use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpStream; pub async fn proxy_request( state: &Arc, mut req: Request, addr: SocketAddr, backend: &SocketAddr, ) -> Result>, ServiceError> { #[cfg(feature = "mond")] state.reporting.request_out.inc(); let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS); *req.uri_mut() = Uri::builder() .path_and_query( req.uri() .clone() .path_and_query() .cloned() .unwrap_or(PathAndQuery::from_static("/")), ) .build() .unwrap(); req.headers_mut().insert( "x-forwarded-for", HeaderValue::from_str(&format!("{addr}")).unwrap(), ); req.headers_mut().insert( "x-forwarded-proto", if scheme_secure { HeaderValue::from_static("https") } else { HeaderValue::from_static("http") }, ); let do_upgrade = req.headers().contains_key(UPGRADE); let on_upgrade_downstream = req.extensions_mut().remove::(); let _limit_guard = state.l_outgoing.try_acquire()?; debug!("\tforwarding to {}", backend); let mut resp = { let client_stream = TokioIo( TcpStream::connect(backend) .await .map_err(|_| ServiceError::CantConnect)?, ); let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) .await .map_err(ServiceError::Hyper)?; tokio::task::spawn(async move { if let Err(err) = conn.await { warn!("connection failed: {:?}", err); } }); sender .send_request(req) .await .map_err(ServiceError::Hyper)? }; if do_upgrade { let on_upgrade_upstream = resp.extensions_mut().remove::(); tokio::task::spawn(async move { debug!("about to upgrade connection, sending empty response"); match ( on_upgrade_upstream.unwrap().await, on_upgrade_downstream.unwrap().await, ) { (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { debug!("upgrade successful"); match tokio::io::copy_bidirectional( &mut TokioIo(upgraded_downstream), &mut TokioIo(upgraded_upstream), ) .await { Ok((from_client, from_server)) => { debug!("proxy socket terminated: {from_server} sent, {from_client} received") } Err(e) => warn!("proxy socket error: {e}"), } } (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), } }); } Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) }