diff options
author | metamuffin <metamuffin@disroot.org> | 2023-02-12 00:04:40 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-02-12 00:04:40 +0100 |
commit | 7bbc6e3f1b0af552d04d331699e192d8fe3fffc0 (patch) | |
tree | 406c6b29201a865366e74cd5133303af83afe694 | |
parent | 1284ac8ac8ab0797b908fd9cc8db8b682bc4373f (diff) | |
download | gnix-7bbc6e3f1b0af552d04d331699e192d8fe3fffc0.tar gnix-7bbc6e3f1b0af552d04d331699e192d8fe3fffc0.tar.bz2 gnix-7bbc6e3f1b0af552d04d331699e192d8fe3fffc0.tar.zst |
blub
-rw-r--r-- | src/config.rs | 4 | ||||
-rw-r--r-- | src/main.rs | 136 |
2 files changed, 108 insertions, 32 deletions
diff --git a/src/config.rs b/src/config.rs index 527e159..6fc01d8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,7 +23,9 @@ pub struct HttpsConfig { } #[derive(Debug, Serialize, Deserialize)] -pub struct HostConfig {} +pub struct HostConfig { + pub backend: SocketAddr, +} impl Config { pub fn load() -> anyhow::Result<Config> { diff --git a/src/main.rs b/src/main.rs index 583a069..8b4d4d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,8 +4,11 @@ use crate::config::Config; use anyhow::{bail, Context, Result}; use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty}; -use hyper::{body::Incoming, server::conn::http1, service::service_fn, Method, Request, Response}; -use log::{debug, error, info}; +use hyper::{ + body::Incoming, header::UPGRADE, http::uri::PathAndQuery, server::conn::http1, + service::service_fn, upgrade::OnUpgrade, Method, Request, Response, StatusCode, Uri, +}; +use log::{debug, error, info, warn}; use std::{fs::File, io::BufReader, path::Path, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite}, @@ -74,11 +77,26 @@ pub fn serve_stream<T: AsyncRead + AsyncWrite + Unpin + Send + 'static>( stream: T, ) { tokio::task::spawn(async move { - if let Err(err) = http1::Builder::new() - .serve_connection(stream, service_fn(move |req| service(config.clone(), req))) - .await - { - error!("{:?}", err); + let conn = http1::Builder::new() + .serve_connection( + stream, + service_fn(move |req| { + let config = config.clone(); + async move { + match service(config, req).await { + Ok(r) => Ok(r), + Err(ServiceError::Hyper(e)) => Err(e), + Err(other) => Ok(Response::new(format!( + "the reverse proxy encountered an error: {other:?}" + )) + .map(|b| b.map_err(|e| match e {}).boxed())), + } + } + }), + ) + .with_upgrades(); + if let Err(err) = conn.await { + error!("error: {:?}", err); } }); } @@ -98,42 +116,98 @@ fn load_private_key(path: &Path) -> anyhow::Result<rustls::PrivateKey> { Ok(rustls::PrivateKey(keys[0].clone())) } +#[derive(Debug)] +enum ServiceError { + Hyper(hyper::Error), + NoHost, + CantConnect, +} + async fn service( config: Arc<Config>, mut req: Request<Incoming>, -) -> Result<hyper::Response<BoxBody<bytes::Bytes, hyper::Error>>, hyper::Error> { - let uri_string = format!( - "http://127.0.0.1:8080{}", - req.uri() - .path_and_query() - .map(|x| x.as_str()) - .unwrap_or("/") - ); - let uri = uri_string.parse().unwrap(); - *req.uri_mut() = uri; +) -> Result<hyper::Response<BoxBody<bytes::Bytes, hyper::Error>>, ServiceError> { + *req.uri_mut() = Uri::builder() + .scheme("http") + .authority("backend") + .path_and_query( + req.uri() + .clone() + .path_and_query() + .cloned() + .unwrap_or(PathAndQuery::from_static("/")), + ) + .build() + .unwrap(); - let host = req.uri().host().expect("uri has no host"); - let port = req.uri().port_u16().unwrap_or(80); - let addr = format!("{}:{}", host, port); + let route = config + .hosts + .get(remove_port( + &req.headers() + .get("host") + .and_then(|e| e.to_str().ok()) + .map(String::from) + .unwrap_or(String::from("")), + )) + .ok_or(ServiceError::NoHost)?; - let client_stream = TcpStream::connect(addr).await.unwrap(); + let upgrade_header = req.headers().get(UPGRADE).cloned(); + let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); - if req.method() == Method::CONNECT { - Ok(Response::new(empty())) - } else { - let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream).await?; + let mut resp = { + let client_stream = TcpStream::connect(&route.backend) + .await + .map_err(|_| ServiceError::CantConnect)?; + + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; tokio::task::spawn(async move { if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); + warn!("connection failed: {:?}", err); } }); - let resp = sender.send_request(req).await?; + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; + + if let Some(proto) = upgrade_header { + let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); + tokio::task::spawn(async move { + debug!("about upgrading connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut upgraded_downstream, + &mut upgraded_upstream, + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), + } + } + (a, b) => eprintln!("upgrade error: upstream={a:?} downstream={b:?}"), + } + }); + + let mut resp = Response::new(Empty::<Bytes>::new()); + *resp.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + resp.headers_mut().insert(UPGRADE, proto); + Ok(resp.map(|b| b.map_err(|e| match e {}).boxed())) + } else { Ok(resp.map(|b| b.boxed())) } } -fn empty() -> BoxBody<Bytes, hyper::Error> { - Empty::<Bytes>::new() - .map_err(|never| match never {}) - .boxed() +pub fn remove_port(s: &str) -> &str { + s.split_once(":").map(|(s, _)| s).unwrap_or(s) } |