From 6566cbb3f25aa8b1247c259b5e546910b6044f93 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 7 Dec 2023 14:35:48 +0100 Subject: move some files around and add horrible access log --- src/filters/proxy.rs | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 src/filters/proxy.rs (limited to 'src/filters/proxy.rs') diff --git a/src/filters/proxy.rs b/src/filters/proxy.rs new file mode 100644 index 0000000..40ebf17 --- /dev/null +++ b/src/filters/proxy.rs @@ -0,0 +1,104 @@ +use crate::{helper::TokioIo, ServiceError, State}; +use http_body_util::{combinators::BoxBody, BodyExt}; +use hyper::{ + body::Incoming, + header::UPGRADE, + http::{ + uri::{PathAndQuery, Scheme}, + HeaderValue, + }, + upgrade::OnUpgrade, + Request, Uri, +}; +use log::{debug, error, warn}; +use std::{net::SocketAddr, sync::Arc}; +use tokio::net::TcpStream; + +pub async fn proxy_request( + state: &Arc, + mut req: Request, + addr: SocketAddr, + backend: &SocketAddr, +) -> Result>, ServiceError> { + #[cfg(feature = "mond")] + state.reporting.request_out.inc(); + + let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS); + *req.uri_mut() = Uri::builder() + .path_and_query( + req.uri() + .clone() + .path_and_query() + .cloned() + .unwrap_or(PathAndQuery::from_static("/")), + ) + .build() + .unwrap(); + + req.headers_mut().insert( + "x-forwarded-for", + HeaderValue::from_str(&format!("{addr}")).unwrap(), + ); + req.headers_mut().insert( + "x-forwarded-proto", + if scheme_secure { + HeaderValue::from_static("https") + } else { + HeaderValue::from_static("http") + }, + ); + + let do_upgrade = req.headers().contains_key(UPGRADE); + let on_upgrade_downstream = req.extensions_mut().remove::(); + + let _limit_guard = state.l_outgoing.try_acquire()?; + debug!("\tforwarding to {}", backend); + let mut resp = { + let client_stream = TokioIo( + TcpStream::connect(backend) + .await + .map_err(|_| ServiceError::CantConnect)?, + ); + + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; + + if do_upgrade { + let on_upgrade_upstream = resp.extensions_mut().remove::(); + tokio::task::spawn(async move { + debug!("about to upgrade connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut TokioIo(upgraded_downstream), + &mut TokioIo(upgraded_upstream), + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), + } + } + (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), + } + }); + } + Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) +} -- cgit v1.2.3-70-g09d2