use crate::{helper::TokioIo, ServiceError, State}; use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{body::Incoming, http::HeaderValue, upgrade::OnUpgrade, Request, StatusCode}; use log::{debug, warn}; use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpStream; pub async fn proxy_request( state: &Arc, mut req: Request, addr: SocketAddr, backend: &SocketAddr, ) -> Result>, ServiceError> { #[cfg(feature = "mond")] state.reporting.request_out.inc(); //? Do we know what this did? // *req.uri_mut() = Uri::builder() // .path_and_query( // req.uri() // .clone() // .path_and_query() // .cloned() // .unwrap_or(PathAndQuery::from_static("/")), // ) // .build() // .unwrap(); req.headers_mut().insert( "x-real-ip", HeaderValue::from_str(&format!("{}", addr.ip())).unwrap(), ); let on_upgrade_downstream = req.extensions_mut().remove::(); let _limit_guard = state.l_outgoing.try_acquire()?; debug!("\tforwarding to {}", backend); let mut resp = { let client_stream = TokioIo( TcpStream::connect(backend) .await .map_err(|_| ServiceError::CantConnect)?, ); let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) .await .map_err(ServiceError::Hyper)?; tokio::task::spawn(async move { if let Err(err) = conn.with_upgrades().await { warn!("connection failed: {:?}", err); } }); sender .send_request(req) .await .map_err(ServiceError::Hyper)? }; if resp.status() == StatusCode::SWITCHING_PROTOCOLS { let on_upgrade_upstream = resp .extensions_mut() .remove::() .ok_or(ServiceError::UpgradeFailed)?; let on_upgrade_downstream = on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?; tokio::task::spawn(async move { debug!("about to upgrade connection"); match (on_upgrade_upstream.await, on_upgrade_downstream.await) { (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { debug!("upgrade successful"); match tokio::io::copy_bidirectional( &mut TokioIo(upgraded_downstream), &mut TokioIo(upgraded_upstream), ) .await { Ok((from_client, from_server)) => { debug!("proxy socket terminated: {from_server} sent, {from_client} received") } Err(e) => warn!("proxy socket error: {e}"), } } (a, b) => warn!("upgrade error: upstream={a:?} downstream={b:?}"), } }); } Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) }