diff options
author | metamuffin <metamuffin@disroot.org> | 2024-05-29 16:37:44 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-05-29 16:37:44 +0200 |
commit | 886a18e0c67624d0882f04c7f6659bcfee6b4d8d (patch) | |
tree | 32a5389076b199c4e06fa10ce6b54d165d5466c5 /src/filters/proxy.rs | |
parent | 6cebab912dcf01bbe225c20ec2e7656f61ba160e (diff) | |
download | gnix-886a18e0c67624d0882f04c7f6659bcfee6b4d8d.tar gnix-886a18e0c67624d0882f04c7f6659bcfee6b4d8d.tar.bz2 gnix-886a18e0c67624d0882f04c7f6659bcfee6b4d8d.tar.zst |
refactor filter system
Diffstat (limited to 'src/filters/proxy.rs')
-rw-r--r-- | src/filters/proxy.rs | 157 |
1 files changed, 84 insertions, 73 deletions
diff --git a/src/filters/proxy.rs b/src/filters/proxy.rs index ad959ad..ce72f65 100644 --- a/src/filters/proxy.rs +++ b/src/filters/proxy.rs @@ -1,87 +1,98 @@ -use crate::{helper::TokioIo, ServiceError, State}; -use http_body_util::{combinators::BoxBody, BodyExt}; -use hyper::{body::Incoming, http::HeaderValue, upgrade::OnUpgrade, Request, StatusCode}; +use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse}; +use crate::{helper::TokioIo, ServiceError}; +use futures::Future; +use http_body_util::BodyExt; +use hyper::{http::HeaderValue, upgrade::OnUpgrade, StatusCode}; use log::{debug, warn}; -use std::{net::SocketAddr, sync::Arc}; +use serde::Deserialize; +use serde_yaml::Value; +use std::{net::SocketAddr, pin::Pin, sync::Arc}; use tokio::net::TcpStream; -pub async fn proxy_request( - state: &Arc<State>, - mut req: Request<Incoming>, - addr: SocketAddr, - backend: &SocketAddr, -) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> { - #[cfg(feature = "mond")] - state.reporting.request_out.inc(); +#[derive(Default)] +pub struct ProxyKind; - //? Do we know what this did? - // *req.uri_mut() = Uri::builder() - // .path_and_query( - // req.uri() - // .clone() - // .path_and_query() - // .cloned() - // .unwrap_or(PathAndQuery::from_static("/")), - // ) - // .build() - // .unwrap(); - - req.headers_mut().insert( - "x-real-ip", - HeaderValue::from_str(&format!("{}", addr.ip())).unwrap(), - ); +#[derive(Debug, Deserialize)] +struct Proxy { + backend: SocketAddr, +} - let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); +impl NodeKind for ProxyKind { + fn name(&self) -> &'static str { + "proxy" + } + fn instanciate(&self, config: Value) -> anyhow::Result<Arc<dyn Node>> { + Ok(Arc::new(serde_yaml::from_value::<Proxy>(config)?)) + } +} +impl Node for Proxy { + fn handle<'a>( + &'a self, + context: &'a mut NodeContext, + mut request: NodeRequest, + ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> { + Box::pin(async move { + request.headers_mut().insert( + "x-real-ip", + HeaderValue::from_str(&format!("{}", context.addr.ip())).unwrap(), + ); - let _limit_guard = state.l_outgoing.try_acquire()?; - debug!("\tforwarding to {}", backend); - let mut resp = { - let client_stream = TokioIo( - TcpStream::connect(backend) - .await - .map_err(|_| ServiceError::CantConnect)?, - ); + let on_upgrade_downstream = request.extensions_mut().remove::<OnUpgrade>(); - let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) - .await - .map_err(ServiceError::Hyper)?; - tokio::task::spawn(async move { - if let Err(err) = conn.with_upgrades().await { - warn!("connection failed: {:?}", err); - } - }); - sender - .send_request(req) - .await - .map_err(ServiceError::Hyper)? - }; + let _limit_guard = context.state.l_outgoing.try_acquire()?; + debug!("\tforwarding to {}", self.backend); + let mut resp = { + let client_stream = TokioIo( + TcpStream::connect(self.backend) + .await + .map_err(|_| ServiceError::CantConnect)?, + ); - if resp.status() == StatusCode::SWITCHING_PROTOCOLS { - let on_upgrade_upstream = resp - .extensions_mut() - .remove::<OnUpgrade>() - .ok_or(ServiceError::UpgradeFailed)?; - let on_upgrade_downstream = on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?; - tokio::task::spawn(async move { - debug!("about to upgrade connection"); - match (on_upgrade_upstream.await, on_upgrade_downstream.await) { - (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { - debug!("upgrade successful"); - match tokio::io::copy_bidirectional( - &mut TokioIo(upgraded_downstream), - &mut TokioIo(upgraded_upstream), - ) + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.with_upgrades().await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(request) .await - { - Ok((from_client, from_server)) => { - debug!("proxy socket terminated: {from_server} sent, {from_client} received") + .map_err(ServiceError::Hyper)? + }; + + if resp.status() == StatusCode::SWITCHING_PROTOCOLS { + let on_upgrade_upstream = resp + .extensions_mut() + .remove::<OnUpgrade>() + .ok_or(ServiceError::UpgradeFailed)?; + let on_upgrade_downstream = + on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?; + tokio::task::spawn(async move { + debug!("about to upgrade connection"); + match (on_upgrade_upstream.await, on_upgrade_downstream.await) { + (Ok(upgraded_upstream), Ok(upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut TokioIo(upgraded_downstream), + &mut TokioIo(upgraded_upstream), + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), + } } - Err(e) => warn!("proxy socket error: {e}"), + (a, b) => warn!("upgrade error: upstream={a:?} downstream={b:?}"), } - } - (a, b) => warn!("upgrade error: upstream={a:?} downstream={b:?}"), + }); } - }); + + let resp = resp.map(|b| b.map_err(ServiceError::Hyper).boxed()); + Ok(resp) + }) } - Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) } |