diff options
author | metamuffin <metamuffin@disroot.org> | 2023-12-07 14:35:48 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-12-07 14:35:48 +0100 |
commit | 6566cbb3f25aa8b1247c259b5e546910b6044f93 (patch) | |
tree | e94dd775fc1fd90b4ea7b272d871e71118f102f6 /src/filters/proxy.rs | |
parent | ab0d780062bff88d4fbcdd2c91ad5352c0d6279f (diff) | |
download | gnix-6566cbb3f25aa8b1247c259b5e546910b6044f93.tar gnix-6566cbb3f25aa8b1247c259b5e546910b6044f93.tar.bz2 gnix-6566cbb3f25aa8b1247c259b5e546910b6044f93.tar.zst |
move some files around and add horrible access log
Diffstat (limited to 'src/filters/proxy.rs')
-rw-r--r-- | src/filters/proxy.rs | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/src/filters/proxy.rs b/src/filters/proxy.rs new file mode 100644 index 0000000..40ebf17 --- /dev/null +++ b/src/filters/proxy.rs @@ -0,0 +1,104 @@ +use crate::{helper::TokioIo, ServiceError, State}; +use http_body_util::{combinators::BoxBody, BodyExt}; +use hyper::{ + body::Incoming, + header::UPGRADE, + http::{ + uri::{PathAndQuery, Scheme}, + HeaderValue, + }, + upgrade::OnUpgrade, + Request, Uri, +}; +use log::{debug, error, warn}; +use std::{net::SocketAddr, sync::Arc}; +use tokio::net::TcpStream; + +pub async fn proxy_request( + state: &Arc<State>, + mut req: Request<Incoming>, + addr: SocketAddr, + backend: &SocketAddr, +) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> { + #[cfg(feature = "mond")] + state.reporting.request_out.inc(); + + let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS); + *req.uri_mut() = Uri::builder() + .path_and_query( + req.uri() + .clone() + .path_and_query() + .cloned() + .unwrap_or(PathAndQuery::from_static("/")), + ) + .build() + .unwrap(); + + req.headers_mut().insert( + "x-forwarded-for", + HeaderValue::from_str(&format!("{addr}")).unwrap(), + ); + req.headers_mut().insert( + "x-forwarded-proto", + if scheme_secure { + HeaderValue::from_static("https") + } else { + HeaderValue::from_static("http") + }, + ); + + let do_upgrade = req.headers().contains_key(UPGRADE); + let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); + + let _limit_guard = state.l_outgoing.try_acquire()?; + debug!("\tforwarding to {}", backend); + let mut resp = { + let client_stream = TokioIo( + TcpStream::connect(backend) + .await + .map_err(|_| ServiceError::CantConnect)?, + ); + + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; + + if do_upgrade { + let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); + tokio::task::spawn(async move { + debug!("about to upgrade connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut TokioIo(upgraded_downstream), + &mut TokioIo(upgraded_upstream), + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), + } + } + (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), + } + }); + } + Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) +} |