summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-02-24 13:23:58 +0100
committermetamuffin <metamuffin@disroot.org>2023-02-24 13:23:58 +0100
commit3ed98e04da0917e790063549676729c7051d67f7 (patch)
tree81fbb2c93172a7cbb284aaeed8c98d3f6a7c26b5
parentc0d504f9ae77f99e5484e92e2e9d3f68561129c5 (diff)
downloadgnix-3ed98e04da0917e790063549676729c7051d67f7.tar
gnix-3ed98e04da0917e790063549676729c7051d67f7.tar.bz2
gnix-3ed98e04da0917e790063549676729c7051d67f7.tar.zst
static file serving + bugs
-rw-r--r--Cargo.lock113
-rw-r--r--Cargo.toml27
-rw-r--r--src/config.rs13
-rw-r--r--src/error.rs21
-rw-r--r--src/files.rs193
-rw-r--r--src/main.rs128
-rw-r--r--src/proxy.rs100
7 files changed, 465 insertions, 130 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 357ae03..97815d2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -110,9 +110,20 @@ dependencies = [
[[package]]
name = "futures-core"
-version = "0.3.21"
+version = "0.3.26"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
[[package]]
name = "futures-sink"
@@ -122,20 +133,22 @@ checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
[[package]]
name = "futures-task"
-version = "0.3.21"
+version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
+checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366"
[[package]]
name = "futures-util"
-version = "0.3.21"
+version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
+checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1"
dependencies = [
"futures-core",
+ "futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
+ "slab",
]
[[package]]
@@ -145,15 +158,20 @@ dependencies = [
"anyhow",
"bytes",
"env_logger",
+ "futures-util",
"http-body-util",
+ "humansize",
"hyper",
"log",
+ "markup",
+ "mime_guess",
"rustls",
"rustls-pemfile",
"serde",
"thiserror",
"tokio",
"tokio-rustls",
+ "tokio-util",
"toml",
]
@@ -244,6 +262,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
+name = "humansize"
+version = "2.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
+dependencies = [
+ "libm",
+]
+
+[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -331,6 +358,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
[[package]]
+name = "libm"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
+
+[[package]]
name = "linux-raw-sys"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -356,12 +389,49 @@ dependencies = [
]
[[package]]
+name = "markup"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd9196a235d499738d04f6a2466ce2610bf6b84730610efea8bee1b90d028b0d"
+dependencies = [
+ "itoa",
+ "markup-proc-macro",
+]
+
+[[package]]
+name = "markup-proc-macro"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1a927f0e237dcbdd8c1a8ab03c4e1e8b1999804c448ebf06ff3b5512506c8150"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
+name = "mime"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
+
+[[package]]
+name = "mime_guess"
+version = "2.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
+dependencies = [
+ "mime",
+ "unicase",
+]
+
+[[package]]
name = "mio"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -693,9 +763,9 @@ dependencies = [
[[package]]
name = "tokio-util"
-version = "0.7.2"
+version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c"
+checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
dependencies = [
"bytes",
"futures-core",
@@ -747,22 +817,10 @@ checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
dependencies = [
"cfg-if",
"pin-project-lite",
- "tracing-attributes",
"tracing-core",
]
[[package]]
-name = "tracing-attributes"
-version = "0.1.21"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn",
-]
-
-[[package]]
name = "tracing-core"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -778,6 +836,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
+name = "unicase"
+version = "2.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
+dependencies = [
+ "version_check",
+]
+
+[[package]]
name = "unicode-ident"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -790,6 +857,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
+name = "version_check"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
+
+[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index f3bfafb..c900bd7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,20 +4,35 @@ version = "0.1.0"
edition = "2021"
[dependencies]
+
+# HTTP
hyper = { version = "1.0.0-rc.2", features = ["full"] }
-tokio = { version = "1.25.0", features = ["full"] }
http-body-util = "0.1.0-rc.2"
+# TLS
rustls-pemfile = "1.0.2"
rustls = "0.20.8"
tokio-rustls = "0.23.4"
-bytes = "1.4.0"
-anyhow = "1.0.69"
-thiserror = "1.0.38"
+# Async stuff
+tokio = { version = "1.25.0", features = ["full"] }
+tokio-util = { version = "0.7.7", features = ["io"] }
+futures-util = "0.3.26"
+
+# Config
+serde = { version = "1.0.152", features = ["derive"] }
+toml = "0.7.2"
+# Logging
env_logger = "0.10.0"
log = "0.4.17"
-serde = { version = "1.0.152", features = ["derive"] }
-toml = "0.7.2"
+# Fileserver related
+markup = "0.13.1"
+humansize = "2.1.3"
+mime_guess = "2.0.4"
+
+
+bytes = "1.4.0"
+anyhow = "1.0.69"
+thiserror = "1.0.38"
diff --git a/src/config.rs b/src/config.rs
index 210a1e6..58b885a 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -22,8 +22,17 @@ pub struct HttpsConfig {
}
#[derive(Debug, Serialize, Deserialize)]
-pub struct HostConfig {
- pub backend: SocketAddr,
+#[serde(untagged)]
+pub enum HostConfig {
+ Backend { backend: SocketAddr },
+ Files { files: FileserverConfig },
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FileserverConfig {
+ pub root: PathBuf,
+ #[serde(default)]
+ pub index: bool,
}
impl Config {
diff --git a/src/error.rs b/src/error.rs
new file mode 100644
index 0000000..cbeb6a6
--- /dev/null
+++ b/src/error.rs
@@ -0,0 +1,21 @@
+#[derive(Debug, thiserror::Error)]
+pub enum ServiceError {
+ #[error("hyper error")]
+ Hyper(hyper::Error),
+ #[error("unknown host")]
+ NoHost,
+ #[error("can't connect to the backend")]
+ CantConnect,
+ #[error("not found")]
+ NotFound,
+ #[error("io error: {0}")]
+ Io(std::io::Error),
+ #[error("ohh. i didn't expect that this error can be generated.")]
+ Other,
+}
+
+impl From<std::io::Error> for ServiceError {
+ fn from(e: std::io::Error) -> Self {
+ Self::Io(e)
+ }
+}
diff --git a/src/files.rs b/src/files.rs
new file mode 100644
index 0000000..e092994
--- /dev/null
+++ b/src/files.rs
@@ -0,0 +1,193 @@
+use crate::{config::FileserverConfig, ServiceError};
+use bytes::{Bytes, BytesMut};
+use futures_util::{future, future::Either, ready, stream, FutureExt, Stream, StreamExt};
+use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
+use humansize::FormatSizeOptions;
+use hyper::{
+ body::{Frame, Incoming},
+ header::{CONTENT_TYPE, LOCATION},
+ http::HeaderValue,
+ Request, Response, StatusCode,
+};
+use log::debug;
+use markup::Render;
+use std::{fs::Metadata, io, path::Path, pin::Pin, task::Poll};
+use tokio::{
+ fs::{read_to_string, File},
+ io::AsyncSeekExt,
+};
+use tokio_util::io::poll_read_buf;
+
+pub async fn serve_files(
+ req: Request<Incoming>,
+ config: &FileserverConfig,
+) -> Result<hyper::Response<BoxBody<Bytes, ServiceError>>, ServiceError> {
+ let rpath = req.uri().path();
+
+ let mut path = config.root.clone();
+ for seg in rpath.split("/") {
+ if seg == "" || seg == ".." {
+ continue; // not ideal
+ }
+ path.push(seg)
+ }
+
+ if !path.exists() {
+ return Err(ServiceError::NotFound);
+ }
+
+ if path.is_dir() {
+ if !config.index {
+ return Err(ServiceError::NotFound);
+ }
+
+ if !rpath.ends_with("/") {
+ let mut r = Response::new(String::new());
+ *r.status_mut() = StatusCode::FOUND;
+ r.headers_mut().insert(
+ LOCATION,
+ HeaderValue::from_str(&format!("{}/", rpath)).map_err(|_| ServiceError::Other)?,
+ );
+ return Ok(r.map(|b| b.map_err(|e| match e {}).boxed()));
+ }
+
+ return index(&path, rpath.to_string()).await.map(|s| {
+ let mut r = Response::new(s);
+ r.headers_mut()
+ .insert(CONTENT_TYPE, HeaderValue::from_static("text/html"));
+ r.map(|b| b.map_err(|e| match e {}).boxed())
+ });
+ }
+
+ let file = File::open(path.clone()).await?;
+
+ let mut r = Response::new(BoxBody::new(StreamBody::new(
+ StreamBody::new(file_stream(file, 4096, (0, u64::MAX)))
+ .map(|e| e.map(|e| Frame::data(e)).map_err(ServiceError::Io)),
+ )));
+
+ r.headers_mut().insert(
+ CONTENT_TYPE,
+ HeaderValue::from_str(
+ // no allocation possible here?
+ &mime_guess::from_path(path)
+ .first()
+ .map(|m| m.to_string())
+ .unwrap_or("text/plain".to_string()),
+ )
+ .unwrap(),
+ );
+ Ok(r)
+}
+
+// Adapted from warp (https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs). Thanks!
+fn file_stream(
+ mut file: File,
+ buf_size: usize,
+ (start, end): (u64, u64),
+) -> impl Stream<Item = Result<Bytes, io::Error>> + Send {
+ use std::io::SeekFrom;
+
+ let seek = async move {
+ if start != 0 {
+ file.seek(SeekFrom::Start(start)).await?;
+ }
+ Ok(file)
+ };
+
+ seek.into_stream()
+ .map(move |result| {
+ let mut buf = BytesMut::new();
+ let mut len = end - start;
+ let mut f = match result {
+ Ok(f) => f,
+ Err(f) => return Either::Left(stream::once(future::err(f))),
+ };
+
+ Either::Right(stream::poll_fn(move |cx| {
+ if len == 0 {
+ return Poll::Ready(None);
+ }
+ reserve_at_least(&mut buf, buf_size);
+
+ let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
+ Ok(n) => n as u64,
+ Err(err) => {
+ debug!("file read error: {}", err);
+ return Poll::Ready(Some(Err(err)));
+ }
+ };
+
+ if n == 0 {
+ debug!("file read found EOF before expected length");
+ return Poll::Ready(None);
+ }
+
+ let mut chunk = buf.split().freeze();
+ if n > len {
+ chunk = chunk.split_to(len as usize);
+ len = 0;
+ } else {
+ len -= n;
+ }
+
+ Poll::Ready(Some(Ok(chunk)))
+ }))
+ })
+ .flatten()
+}
+
+fn reserve_at_least(buf: &mut BytesMut, cap: usize) {
+ if buf.capacity() - buf.len() < cap {
+ buf.reserve(cap);
+ }
+}
+
+async fn index(path: &Path, rpath: String) -> Result<String, ServiceError> {
+ let files = path
+ .read_dir()?
+ .map(|e| e.and_then(|e| Ok((e.file_name().into_string().unwrap(), e.metadata()?))))
+ .collect::<Result<Vec<_>, _>>()?;
+ if let Ok(indexhtml) = read_to_string(path.join("index.html")).await {
+ Ok(indexhtml)
+ } else {
+ let banner = read_to_string(path.join("index.banner.html")).await.ok();
+ let mut s = String::new();
+ IndexTemplate {
+ files,
+ banner,
+ path: rpath,
+ }
+ .render(&mut s)
+ .unwrap();
+ Ok(s)
+ }
+}
+
+markup::define! {
+ IndexTemplate(path: String, banner: Option<String>, files: Vec<(String, Metadata)>) {
+ @markup::doctype()
+ html {
+ head {
+ title { "Index of " @path }
+ }
+ body {
+ @if let Some(banner) = banner {
+ @markup::raw(banner)
+ } else {
+ h1 { "Index of " @path }
+ }
+ hr;
+ table {
+ @for (name, meta) in files { tr {
+ td { @if meta.file_type().is_dir() { "(dir)" } else { "(file)" } }
+ td { a[href=name] { @name } }
+ td { @humansize::format_size(meta.len(), FormatSizeOptions::default()) }
+ } }
+ }
+ hr;
+ footer { sub { "served by gnix" } }
+ }
+ }
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 2edbe3d..c325e61 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,39 +1,35 @@
+#![feature(try_trait_v2)]
+
pub mod config;
+pub mod error;
+pub mod files;
+pub mod proxy;
-use crate::config::Config;
+use crate::{
+ config::{Config, HostConfig},
+ files::serve_files,
+ proxy::proxy_request,
+};
use anyhow::{anyhow, bail, Context, Result};
+use error::ServiceError;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{
body::Incoming,
- header::{HOST, UPGRADE},
- http::{
- uri::{PathAndQuery, Scheme},
- HeaderValue,
- },
+ header::{CONTENT_TYPE, HOST},
+ http::HeaderValue,
server::conn::http1,
service::service_fn,
- upgrade::OnUpgrade,
- Request, Response, StatusCode, Uri,
+ Request, Response, StatusCode,
};
-use log::{debug, error, info, warn};
+use log::{debug, info, warn};
use std::{fs::File, io::BufReader, net::SocketAddr, path::Path, sync::Arc};
use tokio::{
io::{AsyncRead, AsyncWrite},
- net::{TcpListener, TcpStream},
+ net::TcpListener,
signal::ctrl_c,
};
use tokio_rustls::TlsAcceptor;
-#[derive(Debug, thiserror::Error)]
-enum ServiceError {
- #[error("hyper error")]
- Hyper(hyper::Error),
- #[error("unknown host")]
- NoHost,
- #[error("can't connect to the backend")]
- CantConnect,
-}
-
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init_from_env("LOG");
@@ -60,7 +56,7 @@ async fn serve_http(config: Arc<Config>) -> Result<()> {
let (stream, addr) = listener.accept().await.context("accepting connection")?;
debug!("connection from {addr}");
let config = config.clone();
- tokio::spawn(async move { serve_stream(config, stream, addr) });
+ tokio::spawn(async move { serve_stream(config, stream, addr).await });
}
}
async fn serve_https(config: Arc<Config>) -> Result<()> {
@@ -114,10 +110,11 @@ pub async fn serve_stream<T: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
Ok(r) => Ok(r),
Err(ServiceError::Hyper(e)) => Err(e),
Err(error) => Ok({
- let mut resp = Response::new(format!(
- "the reverse proxy encountered an issue: {error}"
- ));
+ let mut resp =
+ Response::new(format!("gnix encountered an issue: {error}"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
+ resp.headers_mut()
+ .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
resp
}
.map(|b| b.map_err(|e| match e {}).boxed())),
@@ -148,34 +145,10 @@ fn load_private_key(path: &Path) -> anyhow::Result<rustls::PrivateKey> {
async fn service(
config: Arc<Config>,
- mut req: Request<Incoming>,
+ req: Request<Incoming>,
addr: SocketAddr,
-) -> Result<hyper::Response<BoxBody<bytes::Bytes, hyper::Error>>, ServiceError> {
- let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS);
+) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> {
debug!("{addr} ~> {:?} {}", req.headers().get(HOST), req.uri());
- *req.uri_mut() = Uri::builder()
- .path_and_query(
- req.uri()
- .clone()
- .path_and_query()
- .cloned()
- .unwrap_or(PathAndQuery::from_static("/")),
- )
- .build()
- .unwrap();
-
- req.headers_mut().insert(
- "x-forwarded-for",
- HeaderValue::from_str(&format!("{addr}")).unwrap(),
- );
- req.headers_mut().insert(
- "x-forwarded-proto",
- if scheme_secure {
- HeaderValue::from_static("https")
- } else {
- HeaderValue::from_static("http")
- },
- );
let route = config
.hosts
@@ -188,59 +161,10 @@ async fn service(
))
.ok_or(ServiceError::NoHost)?;
- let do_upgrade = req.headers().contains_key(UPGRADE);
- let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();
-
- debug!("\tforwarding to {}", route.backend);
- let mut resp = {
- let client_stream = TcpStream::connect(&route.backend)
- .await
- .map_err(|_| ServiceError::CantConnect)?;
-
- let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
- .await
- .map_err(ServiceError::Hyper)?;
- tokio::task::spawn(async move {
- if let Err(err) = conn.await {
- warn!("connection failed: {:?}", err);
- }
- });
- sender
- .send_request(req)
- .await
- .map_err(ServiceError::Hyper)?
- };
-
- resp.headers_mut()
- .insert("server", HeaderValue::from_static("gnix"));
-
- if do_upgrade {
- let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>();
- tokio::task::spawn(async move {
- debug!("about upgrading connection, sending empty response");
- match (
- on_upgrade_upstream.unwrap().await,
- on_upgrade_downstream.unwrap().await,
- ) {
- (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => {
- debug!("upgrade successful");
- match tokio::io::copy_bidirectional(
- &mut upgraded_downstream,
- &mut upgraded_upstream,
- )
- .await
- {
- Ok((from_client, from_server)) => {
- debug!("proxy socket terminated: {from_server} sent, {from_client} received")
- }
- Err(e) => warn!("proxy socket error: {e}"),
- }
- }
- (a, b) => eprintln!("upgrade error: upstream={a:?} downstream={b:?}"),
- }
- });
+ match route {
+ HostConfig::Backend { backend } => proxy_request(req, addr, backend).await,
+ HostConfig::Files { files } => serve_files(req, files).await,
}
- Ok(resp.map(|b| b.boxed()))
}
pub fn remove_port(s: &str) -> &str {
diff --git a/src/proxy.rs b/src/proxy.rs
new file mode 100644
index 0000000..65fc5a8
--- /dev/null
+++ b/src/proxy.rs
@@ -0,0 +1,100 @@
+use crate::ServiceError;
+use http_body_util::{combinators::BoxBody, BodyExt};
+use hyper::{
+ body::Incoming,
+ header::UPGRADE,
+ http::{
+ uri::{PathAndQuery, Scheme},
+ HeaderValue,
+ },
+ upgrade::OnUpgrade,
+ Request, Uri,
+};
+use log::{debug, error, warn};
+use std::net::SocketAddr;
+use tokio::net::TcpStream;
+
+pub async fn proxy_request(
+ mut req: Request<Incoming>,
+ addr: SocketAddr,
+ backend: &SocketAddr,
+) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> {
+ let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS);
+ *req.uri_mut() = Uri::builder()
+ .path_and_query(
+ req.uri()
+ .clone()
+ .path_and_query()
+ .cloned()
+ .unwrap_or(PathAndQuery::from_static("/")),
+ )
+ .build()
+ .unwrap();
+
+ req.headers_mut().insert(
+ "x-forwarded-for",
+ HeaderValue::from_str(&format!("{addr}")).unwrap(),
+ );
+ req.headers_mut().insert(
+ "x-forwarded-proto",
+ if scheme_secure {
+ HeaderValue::from_static("https")
+ } else {
+ HeaderValue::from_static("http")
+ },
+ );
+
+ let do_upgrade = req.headers().contains_key(UPGRADE);
+ let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>();
+
+ debug!("\tforwarding to {}", backend);
+ let mut resp = {
+ let client_stream = TcpStream::connect(backend)
+ .await
+ .map_err(|_| ServiceError::CantConnect)?;
+
+ let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream)
+ .await
+ .map_err(ServiceError::Hyper)?;
+ tokio::task::spawn(async move {
+ if let Err(err) = conn.await {
+ warn!("connection failed: {:?}", err);
+ }
+ });
+ sender
+ .send_request(req)
+ .await
+ .map_err(ServiceError::Hyper)?
+ };
+
+ resp.headers_mut()
+ .insert("server", HeaderValue::from_static("gnix"));
+
+ if do_upgrade {
+ let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>();
+ tokio::task::spawn(async move {
+ debug!("about upgrading connection, sending empty response");
+ match (
+ on_upgrade_upstream.unwrap().await,
+ on_upgrade_downstream.unwrap().await,
+ ) {
+ (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => {
+ debug!("upgrade successful");
+ match tokio::io::copy_bidirectional(
+ &mut upgraded_downstream,
+ &mut upgraded_upstream,
+ )
+ .await
+ {
+ Ok((from_client, from_server)) => {
+ debug!("proxy socket terminated: {from_server} sent, {from_client} received")
+ }
+ Err(e) => warn!("proxy socket error: {e}"),
+ }
+ }
+ (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"),
+ }
+ });
+ }
+ Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed()))
+}