aboutsummaryrefslogtreecommitdiff
path: root/src/filters/proxy.rs
blob: 11ff8c1201b168a70f8f480c371a91c62d65c05b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use crate::{helper::TokioIo, ServiceError, State};
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{
    body::Incoming,
    header::UPGRADE,
    http::{uri::PathAndQuery, HeaderValue},
    upgrade::OnUpgrade,
    Request, Uri,
};
use log::{debug, error, warn};
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpStream;

pub async fn proxy_request(
    state: &Arc<State>,
    mut req: Request<Incoming>,
    addr: SocketAddr,
    backend: &SocketAddr,
) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> {
    #[cfg(feature = "mond")]
    state.reporting.request_out.inc();

    *req.uri_mut() = Uri::builder()
        .path_and_query(
            req.uri()
                .clone()
                .path_and_query()
                .cloned()
                .unwrap_or(PathAndQuery::from_static("/")),
        )
        .build()
        .unwrap();

    req.headers_mut().insert(
        "x-real-ip",
        HeaderValue::from_str(&format!("{}", addr.ip())).unwrap(),
    );

    let do_upgrade = req.headers().contains_key(UPGRADE);
    let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();

    let _limit_guard = state.l_outgoing.try_acquire()?;
    debug!("\tforwarding to {}", backend);
    let mut resp = {
        let client_stream = TokioIo(
            TcpStream::connect(backend)
                .await
                .map_err(|_| ServiceError::CantConnect)?,
        );

        let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
            .await
            .map_err(ServiceError::Hyper)?;
        tokio::task::spawn(async move {
            if let Err(err) = conn.await {
                warn!("connection failed: {:?}", err);
            }
        });
        sender
            .send_request(req)
            .await
            .map_err(ServiceError::Hyper)?
    };

    if do_upgrade {
        let on_upgrade_upstream = resp
            .extensions_mut()
            .remove::<OnUpgrade>()
            .ok_or(ServiceError::UpgradeFailed)?;
        let on_upgrade_downstream = on_upgrade_downstream.ok_or(ServiceError::UpgradeFailed)?;
        tokio::task::spawn(async move {
            debug!("about to upgrade connection, sending empty response");
            match (on_upgrade_upstream.await, on_upgrade_downstream.await) {
                (Ok(upgraded_upstream), Ok(upgraded_downstream)) => {
                    debug!("upgrade successful");
                    match tokio::io::copy_bidirectional(
                        &mut TokioIo(upgraded_downstream),
                        &mut TokioIo(upgraded_upstream),
                    )
                    .await
                    {
                        Ok((from_client, from_server)) => {
                            debug!("proxy socket terminated: {from_server} sent, {from_client} received")
                        }
                        Err(e) => warn!("proxy socket error: {e}"),
                    }
                }
                (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"),
            }
        });
    }
    Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed()))
}