summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-02-12 00:04:40 +0100
committermetamuffin <metamuffin@disroot.org>2023-02-12 00:04:40 +0100
commit7bbc6e3f1b0af552d04d331699e192d8fe3fffc0 (patch)
tree406c6b29201a865366e74cd5133303af83afe694
parent1284ac8ac8ab0797b908fd9cc8db8b682bc4373f (diff)
downloadgnix-7bbc6e3f1b0af552d04d331699e192d8fe3fffc0.tar
gnix-7bbc6e3f1b0af552d04d331699e192d8fe3fffc0.tar.bz2
gnix-7bbc6e3f1b0af552d04d331699e192d8fe3fffc0.tar.zst
blub
-rw-r--r--src/config.rs4
-rw-r--r--src/main.rs136
2 files changed, 108 insertions, 32 deletions
diff --git a/src/config.rs b/src/config.rs
index 527e159..6fc01d8 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -23,7 +23,9 @@ pub struct HttpsConfig {
}
#[derive(Debug, Serialize, Deserialize)]
-pub struct HostConfig {}
+pub struct HostConfig {
+ pub backend: SocketAddr,
+}
impl Config {
pub fn load() -> anyhow::Result<Config> {
diff --git a/src/main.rs b/src/main.rs
index 583a069..8b4d4d5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,8 +4,11 @@ use crate::config::Config;
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
-use hyper::{body::Incoming, server::conn::http1, service::service_fn, Method, Request, Response};
-use log::{debug, error, info};
+use hyper::{
+ body::Incoming, header::UPGRADE, http::uri::PathAndQuery, server::conn::http1,
+ service::service_fn, upgrade::OnUpgrade, Method, Request, Response, StatusCode, Uri,
+};
+use log::{debug, error, info, warn};
use std::{fs::File, io::BufReader, path::Path, sync::Arc};
use tokio::{
io::{AsyncRead, AsyncWrite},
@@ -74,11 +77,26 @@ pub fn serve_stream<T: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
stream: T,
) {
tokio::task::spawn(async move {
- if let Err(err) = http1::Builder::new()
- .serve_connection(stream, service_fn(move |req| service(config.clone(), req)))
- .await
- {
- error!("{:?}", err);
+ let conn = http1::Builder::new()
+ .serve_connection(
+ stream,
+ service_fn(move |req| {
+ let config = config.clone();
+ async move {
+ match service(config, req).await {
+ Ok(r) => Ok(r),
+ Err(ServiceError::Hyper(e)) => Err(e),
+ Err(other) => Ok(Response::new(format!(
+ "the reverse proxy encountered an error: {other:?}"
+ ))
+ .map(|b| b.map_err(|e| match e {}).boxed())),
+ }
+ }
+ }),
+ )
+ .with_upgrades();
+ if let Err(err) = conn.await {
+ error!("error: {:?}", err);
}
});
}
@@ -98,42 +116,98 @@ fn load_private_key(path: &Path) -> anyhow::Result<rustls::PrivateKey> {
Ok(rustls::PrivateKey(keys[0].clone()))
}
+#[derive(Debug)]
+enum ServiceError {
+ Hyper(hyper::Error),
+ NoHost,
+ CantConnect,
+}
+
async fn service(
config: Arc<Config>,
mut req: Request<Incoming>,
-) -> Result<hyper::Response<BoxBody<bytes::Bytes, hyper::Error>>, hyper::Error> {
- let uri_string = format!(
- "http://127.0.0.1:8080{}",
- req.uri()
- .path_and_query()
- .map(|x| x.as_str())
- .unwrap_or("/")
- );
- let uri = uri_string.parse().unwrap();
- *req.uri_mut() = uri;
+) -> Result<hyper::Response<BoxBody<bytes::Bytes, hyper::Error>>, ServiceError> {
+ *req.uri_mut() = Uri::builder()
+ .scheme("http")
+ .authority("backend")
+ .path_and_query(
+ req.uri()
+ .clone()
+ .path_and_query()
+ .cloned()
+ .unwrap_or(PathAndQuery::from_static("/")),
+ )
+ .build()
+ .unwrap();
- let host = req.uri().host().expect("uri has no host");
- let port = req.uri().port_u16().unwrap_or(80);
- let addr = format!("{}:{}", host, port);
+ let route = config
+ .hosts
+ .get(remove_port(
+ &req.headers()
+ .get("host")
+ .and_then(|e| e.to_str().ok())
+ .map(String::from)
+ .unwrap_or(String::from("")),
+ ))
+ .ok_or(ServiceError::NoHost)?;
- let client_stream = TcpStream::connect(addr).await.unwrap();
+ let upgrade_header = req.headers().get(UPGRADE).cloned();
+ let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();
- if req.method() == Method::CONNECT {
- Ok(Response::new(empty()))
- } else {
- let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream).await?;
+ let mut resp = {
+ let client_stream = TcpStream::connect(&route.backend)
+ .await
+ .map_err(|_| ServiceError::CantConnect)?;
+
+ let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
+ .await
+ .map_err(ServiceError::Hyper)?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
- println!("Connection failed: {:?}", err);
+ warn!("connection failed: {:?}", err);
}
});
- let resp = sender.send_request(req).await?;
+ sender
+ .send_request(req)
+ .await
+ .map_err(ServiceError::Hyper)?
+ };
+
+ if let Some(proto) = upgrade_header {
+ let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>();
+ tokio::task::spawn(async move {
+ debug!("about upgrading connection, sending empty response");
+ match (
+ on_upgrade_upstream.unwrap().await,
+ on_upgrade_downstream.unwrap().await,
+ ) {
+ (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => {
+ debug!("upgrade successful");
+ match tokio::io::copy_bidirectional(
+ &mut upgraded_downstream,
+ &mut upgraded_upstream,
+ )
+ .await
+ {
+ Ok((from_client, from_server)) => {
+ debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ }
+ Err(e) => warn!("proxy socket error: {e}"),
+ }
+ }
+ (a, b) => eprintln!("upgrade error: upstream={a:?} downstream={b:?}"),
+ }
+ });
+
+ let mut resp = Response::new(Empty::<Bytes>::new());
+ *resp.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ resp.headers_mut().insert(UPGRADE, proto);
+ Ok(resp.map(|b| b.map_err(|e| match e {}).boxed()))
+ } else {
Ok(resp.map(|b| b.boxed()))
}
}
-fn empty() -> BoxBody<Bytes, hyper::Error> {
- Empty::<Bytes>::new()
- .map_err(|never| match never {})
- .boxed()
+pub fn remove_port(s: &str) -> &str {
+ s.split_once(":").map(|(s, _)| s).unwrap_or(s)
}