aboutsummaryrefslogtreecommitdiff
path: root/src/filters/proxy.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-05-29 16:37:44 +0200
committermetamuffin <metamuffin@disroot.org>2024-05-29 16:37:44 +0200
commit886a18e0c67624d0882f04c7f6659bcfee6b4d8d (patch)
tree32a5389076b199c4e06fa10ce6b54d165d5466c5 /src/filters/proxy.rs
parent6cebab912dcf01bbe225c20ec2e7656f61ba160e (diff)
downloadgnix-886a18e0c67624d0882f04c7f6659bcfee6b4d8d.tar
gnix-886a18e0c67624d0882f04c7f6659bcfee6b4d8d.tar.bz2
gnix-886a18e0c67624d0882f04c7f6659bcfee6b4d8d.tar.zst
refactor filter system
Diffstat (limited to 'src/filters/proxy.rs')
-rw-r--r--src/filters/proxy.rs157
1 files changed, 84 insertions, 73 deletions
diff --git a/src/filters/proxy.rs b/src/filters/proxy.rs
index ad959ad..ce72f65 100644
--- a/src/filters/proxy.rs
+++ b/src/filters/proxy.rs
@@ -1,87 +1,98 @@
-use crate::{helper::TokioIo, ServiceError, State};
-use http_body_util::{combinators::BoxBody, BodyExt};
-use hyper::{body::Incoming, http::HeaderValue, upgrade::OnUpgrade, Request, StatusCode};
+use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
+use crate::{helper::TokioIo, ServiceError};
+use futures::Future;
+use http_body_util::BodyExt;
+use hyper::{http::HeaderValue, upgrade::OnUpgrade, StatusCode};
use log::{debug, warn};
-use std::{net::SocketAddr, sync::Arc};
+use serde::Deserialize;
+use serde_yaml::Value;
+use std::{net::SocketAddr, pin::Pin, sync::Arc};
use tokio::net::TcpStream;
-pub async fn proxy_request(
- state: &Arc<State>,
- mut req: Request<Incoming>,
- addr: SocketAddr,
- backend: &SocketAddr,
-) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> {
- #[cfg(feature = "mond")]
- state.reporting.request_out.inc();
+#[derive(Default)]
+pub struct ProxyKind;
- //? Do we know what this did?
- // *req.uri_mut() = Uri::builder()
- // .path_and_query(
- // req.uri()
- // .clone()
- // .path_and_query()
- // .cloned()
- // .unwrap_or(PathAndQuery::from_static("/")),
- // )
- // .build()
- // .unwrap();
-
- req.headers_mut().insert(
- "x-real-ip",
- HeaderValue::from_str(&format!("{}", addr.ip())).unwrap(),
- );
+#[derive(Debug, Deserialize)]
+struct Proxy {
+ backend: SocketAddr,
+}
- let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();
+impl NodeKind for ProxyKind {
+ fn name(&self) -> &'static str {
+ "proxy"
+ }
+ fn instanciate(&self, config: Value) -> anyhow::Result<Arc<dyn Node>> {
+ Ok(Arc::new(serde_yaml::from_value::<Proxy>(config)?))
+ }
+}
+impl Node for Proxy {
+ fn handle<'a>(
+ &'a self,
+ context: &'a mut NodeContext,
+ mut request: NodeRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> {
+ Box::pin(async move {
+ request.headers_mut().insert(
+ "x-real-ip",
+ HeaderValue::from_str(&format!("{}", context.addr.ip())).unwrap(),
+ );
- let _limit_guard = state.l_outgoing.try_acquire()?;
- debug!("\tforwarding to {}", backend);
- let mut resp = {
- let client_stream = TokioIo(
- TcpStream::connect(backend)
- .await
- .map_err(|_| ServiceError::CantConnect)?,
- );
+ let on_upgrade_downstream = request.extensions_mut().remove::<OnUpgrade>();
- let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
- .await
- .map_err(ServiceError::Hyper)?;
- tokio::task::spawn(async move {
- if let Err(err) = conn.with_upgrades().await {
- warn!("connection failed: {:?}", err);
- }
- });
- sender
- .send_request(req)
- .await
- .map_err(ServiceError::Hyper)?
- };
+ let _limit_guard = context.state.l_outgoing.try_acquire()?;
+ debug!("\tforwarding to {}", self.backend);
+ let mut resp = {
+ let client_stream = TokioIo(
+ TcpStream::connect(self.backend)
+ .await
+ .map_err(|_| ServiceError::CantConnect)?,
+ );
- if resp.status() == StatusCode::SWITCHING_PROTOCOLS {
- let on_upgrade_upstream = resp
- .extensions_mut()
- .remove::<OnUpgrade>()
- .ok_or(ServiceError::UpgradeFailed)?;
- let on_upgrade_downstream = on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?;
- tokio::task::spawn(async move {
- debug!("about to upgrade connection");
- match (on_upgrade_upstream.await, on_upgrade_downstream.await) {
- (Ok(upgraded_upstream), Ok(upgraded_downstream)) => {
- debug!("upgrade successful");
- match tokio::io::copy_bidirectional(
- &mut TokioIo(upgraded_downstream),
- &mut TokioIo(upgraded_upstream),
- )
+ let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
+ .await
+ .map_err(ServiceError::Hyper)?;
+ tokio::task::spawn(async move {
+ if let Err(err) = conn.with_upgrades().await {
+ warn!("connection failed: {:?}", err);
+ }
+ });
+ sender
+ .send_request(request)
.await
- {
- Ok((from_client, from_server)) => {
- debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ .map_err(ServiceError::Hyper)?
+ };
+
+ if resp.status() == StatusCode::SWITCHING_PROTOCOLS {
+ let on_upgrade_upstream = resp
+ .extensions_mut()
+ .remove::<OnUpgrade>()
+ .ok_or(ServiceError::UpgradeFailed)?;
+ let on_upgrade_downstream =
+ on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?;
+ tokio::task::spawn(async move {
+ debug!("about to upgrade connection");
+ match (on_upgrade_upstream.await, on_upgrade_downstream.await) {
+ (Ok(upgraded_upstream), Ok(upgraded_downstream)) => {
+ debug!("upgrade successful");
+ match tokio::io::copy_bidirectional(
+ &mut TokioIo(upgraded_downstream),
+ &mut TokioIo(upgraded_upstream),
+ )
+ .await
+ {
+ Ok((from_client, from_server)) => {
+ debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ }
+ Err(e) => warn!("proxy socket error: {e}"),
+ }
}
- Err(e) => warn!("proxy socket error: {e}"),
+ (a, b) => warn!("upgrade error: upstream={a:?} downstream={b:?}"),
}
- }
- (a, b) => warn!("upgrade error: upstream={a:?} downstream={b:?}"),
+ });
}
- });
+
+ let resp = resp.map(|b| b.map_err(ServiceError::Hyper).boxed());
+ Ok(resp)
+ })
}
- Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed()))
}