aboutsummaryrefslogtreecommitdiff
path: root/src/filters/proxy.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-12-07 14:35:48 +0100
committermetamuffin <metamuffin@disroot.org>2023-12-07 14:35:48 +0100
commit6566cbb3f25aa8b1247c259b5e546910b6044f93 (patch)
treee94dd775fc1fd90b4ea7b272d871e71118f102f6 /src/filters/proxy.rs
parentab0d780062bff88d4fbcdd2c91ad5352c0d6279f (diff)
downloadgnix-6566cbb3f25aa8b1247c259b5e546910b6044f93.tar
gnix-6566cbb3f25aa8b1247c259b5e546910b6044f93.tar.bz2
gnix-6566cbb3f25aa8b1247c259b5e546910b6044f93.tar.zst
move some files around and add horrible access log
Diffstat (limited to 'src/filters/proxy.rs')
-rw-r--r--src/filters/proxy.rs104
1 files changed, 104 insertions, 0 deletions
diff --git a/src/filters/proxy.rs b/src/filters/proxy.rs
new file mode 100644
index 0000000..40ebf17
--- /dev/null
+++ b/src/filters/proxy.rs
@@ -0,0 +1,104 @@
+use crate::{helper::TokioIo, ServiceError, State};
+use http_body_util::{combinators::BoxBody, BodyExt};
+use hyper::{
+ body::Incoming,
+ header::UPGRADE,
+ http::{
+ uri::{PathAndQuery, Scheme},
+ HeaderValue,
+ },
+ upgrade::OnUpgrade,
+ Request, Uri,
+};
+use log::{debug, error, warn};
+use std::{net::SocketAddr, sync::Arc};
+use tokio::net::TcpStream;
+
+pub async fn proxy_request(
+ state: &Arc<State>,
+ mut req: Request<Incoming>,
+ addr: SocketAddr,
+ backend: &SocketAddr,
+) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> {
+ #[cfg(feature = "mond")]
+ state.reporting.request_out.inc();
+
+ let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS);
+ *req.uri_mut() = Uri::builder()
+ .path_and_query(
+ req.uri()
+ .clone()
+ .path_and_query()
+ .cloned()
+ .unwrap_or(PathAndQuery::from_static("/")),
+ )
+ .build()
+ .unwrap();
+
+ req.headers_mut().insert(
+ "x-forwarded-for",
+ HeaderValue::from_str(&format!("{addr}")).unwrap(),
+ );
+ req.headers_mut().insert(
+ "x-forwarded-proto",
+ if scheme_secure {
+ HeaderValue::from_static("https")
+ } else {
+ HeaderValue::from_static("http")
+ },
+ );
+
+ let do_upgrade = req.headers().contains_key(UPGRADE);
+ let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();
+
+ let _limit_guard = state.l_outgoing.try_acquire()?;
+ debug!("\tforwarding to {}", backend);
+ let mut resp = {
+ let client_stream = TokioIo(
+ TcpStream::connect(backend)
+ .await
+ .map_err(|_| ServiceError::CantConnect)?,
+ );
+
+ let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
+ .await
+ .map_err(ServiceError::Hyper)?;
+ tokio::task::spawn(async move {
+ if let Err(err) = conn.await {
+ warn!("connection failed: {:?}", err);
+ }
+ });
+ sender
+ .send_request(req)
+ .await
+ .map_err(ServiceError::Hyper)?
+ };
+
+ if do_upgrade {
+ let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>();
+ tokio::task::spawn(async move {
+ debug!("about to upgrade connection, sending empty response");
+ match (
+ on_upgrade_upstream.unwrap().await,
+ on_upgrade_downstream.unwrap().await,
+ ) {
+ (Ok(upgraded_upstream), Ok(upgraded_downstream)) => {
+ debug!("upgrade successful");
+ match tokio::io::copy_bidirectional(
+ &mut TokioIo(upgraded_downstream),
+ &mut TokioIo(upgraded_upstream),
+ )
+ .await
+ {
+ Ok((from_client, from_server)) => {
+ debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ }
+ Err(e) => warn!("proxy socket error: {e}"),
+ }
+ }
+ (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"),
+ }
+ });
+ }
+ Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed()))
+}