aboutsummaryrefslogtreecommitdiff
path: root/src/filters/proxy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/filters/proxy.rs')
-rw-r--r--src/filters/proxy.rs98
1 files changed, 0 insertions, 98 deletions
diff --git a/src/filters/proxy.rs b/src/filters/proxy.rs
deleted file mode 100644
index ce72f65..0000000
--- a/src/filters/proxy.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-use super::{Node, NodeContext, NodeKind, NodeRequest, NodeResponse};
-use crate::{helper::TokioIo, ServiceError};
-use futures::Future;
-use http_body_util::BodyExt;
-use hyper::{http::HeaderValue, upgrade::OnUpgrade, StatusCode};
-use log::{debug, warn};
-use serde::Deserialize;
-use serde_yaml::Value;
-use std::{net::SocketAddr, pin::Pin, sync::Arc};
-use tokio::net::TcpStream;
-
-#[derive(Default)]
-pub struct ProxyKind;
-
-#[derive(Debug, Deserialize)]
-struct Proxy {
- backend: SocketAddr,
-}
-
-impl NodeKind for ProxyKind {
- fn name(&self) -> &'static str {
- "proxy"
- }
- fn instanciate(&self, config: Value) -> anyhow::Result<Arc<dyn Node>> {
- Ok(Arc::new(serde_yaml::from_value::<Proxy>(config)?))
- }
-}
-impl Node for Proxy {
- fn handle<'a>(
- &'a self,
- context: &'a mut NodeContext,
- mut request: NodeRequest,
- ) -> Pin<Box<dyn Future<Output = Result<NodeResponse, ServiceError>> + Send + Sync + 'a>> {
- Box::pin(async move {
- request.headers_mut().insert(
- "x-real-ip",
- HeaderValue::from_str(&format!("{}", context.addr.ip())).unwrap(),
- );
-
- let on_upgrade_downstream = request.extensions_mut().remove::<OnUpgrade>();
-
- let _limit_guard = context.state.l_outgoing.try_acquire()?;
- debug!("\tforwarding to {}", self.backend);
- let mut resp = {
- let client_stream = TokioIo(
- TcpStream::connect(self.backend)
- .await
- .map_err(|_| ServiceError::CantConnect)?,
- );
-
- let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
- .await
- .map_err(ServiceError::Hyper)?;
- tokio::task::spawn(async move {
- if let Err(err) = conn.with_upgrades().await {
- warn!("connection failed: {:?}", err);
- }
- });
- sender
- .send_request(request)
- .await
- .map_err(ServiceError::Hyper)?
- };
-
- if resp.status() == StatusCode::SWITCHING_PROTOCOLS {
- let on_upgrade_upstream = resp
- .extensions_mut()
- .remove::<OnUpgrade>()
- .ok_or(ServiceError::UpgradeFailed)?;
- let on_upgrade_downstream =
- on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?;
- tokio::task::spawn(async move {
- debug!("about to upgrade connection");
- match (on_upgrade_upstream.await, on_upgrade_downstream.await) {
- (Ok(upgraded_upstream), Ok(upgraded_downstream)) => {
- debug!("upgrade successful");
- match tokio::io::copy_bidirectional(
- &mut TokioIo(upgraded_downstream),
- &mut TokioIo(upgraded_upstream),
- )
- .await
- {
- Ok((from_client, from_server)) => {
- debug!("proxy socket terminated: {from_server} sent, {from_client} received")
- }
- Err(e) => warn!("proxy socket error: {e}"),
- }
- }
- (a, b) => warn!("upgrade error: upstream={a:?} downstream={b:?}"),
- }
- });
- }
-
- let resp = resp.map(|b| b.map_err(ServiceError::Hyper).boxed());
- Ok(resp)
- })
- }
-}