diff options
author | metamuffin <metamuffin@disroot.org> | 2023-02-24 13:23:58 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-02-24 13:23:58 +0100 |
commit | 3ed98e04da0917e790063549676729c7051d67f7 (patch) | |
tree | 81fbb2c93172a7cbb284aaeed8c98d3f6a7c26b5 | |
parent | c0d504f9ae77f99e5484e92e2e9d3f68561129c5 (diff) | |
download | gnix-3ed98e04da0917e790063549676729c7051d67f7.tar gnix-3ed98e04da0917e790063549676729c7051d67f7.tar.bz2 gnix-3ed98e04da0917e790063549676729c7051d67f7.tar.zst |
static file serving + bugs
-rw-r--r-- | Cargo.lock | 113 | ||||
-rw-r--r-- | Cargo.toml | 27 | ||||
-rw-r--r-- | src/config.rs | 13 | ||||
-rw-r--r-- | src/error.rs | 21 | ||||
-rw-r--r-- | src/files.rs | 193 | ||||
-rw-r--r-- | src/main.rs | 128 | ||||
-rw-r--r-- | src/proxy.rs | 100 |
7 files changed, 465 insertions, 130 deletions
@@ -110,9 +110,20 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" + +[[package]] +name = "futures-macro" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "futures-sink" @@ -122,20 +133,22 @@ checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -145,15 +158,20 @@ dependencies = [ "anyhow", "bytes", "env_logger", + "futures-util", "http-body-util", + "humansize", "hyper", "log", + "markup", + "mime_guess", "rustls", "rustls-pemfile", "serde", "thiserror", "tokio", "tokio-rustls", + "tokio-util", "toml", ] @@ -244,6 +262,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + +[[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -331,6 +358,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] +name = "libm" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" + +[[package]] name = "linux-raw-sys" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -356,12 +389,49 @@ dependencies = [ ] [[package]] +name = "markup" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9196a235d499738d04f6a2466ce2610bf6b84730610efea8bee1b90d028b0d" +dependencies = [ + "itoa", + "markup-proc-macro", +] + +[[package]] +name = "markup-proc-macro" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a927f0e237dcbdd8c1a8ab03c4e1e8b1999804c448ebf06ff3b5512506c8150" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + +[[package]] name = "mio" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -693,9 +763,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.2" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", @@ -747,22 +817,10 @@ checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if", "pin-project-lite", - "tracing-attributes", "tracing-core", ] [[package]] -name = "tracing-attributes" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] name = "tracing-core" version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -778,6 +836,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" [[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + +[[package]] name = "unicode-ident" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -790,6 +857,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] name = "want" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4,20 +4,35 @@ version = "0.1.0" edition = "2021" [dependencies] + +# HTTP hyper = { version = "1.0.0-rc.2", features = ["full"] } -tokio = { version = "1.25.0", features = ["full"] } http-body-util = "0.1.0-rc.2" +# TLS rustls-pemfile = "1.0.2" rustls = "0.20.8" tokio-rustls = "0.23.4" -bytes = "1.4.0" -anyhow = "1.0.69" -thiserror = "1.0.38" +# Async stuff +tokio = { version = "1.25.0", features = ["full"] } +tokio-util = { version = "0.7.7", features = ["io"] } +futures-util = "0.3.26" + +# Config +serde = { version = "1.0.152", features = ["derive"] } +toml = "0.7.2" +# Logging env_logger = "0.10.0" log = "0.4.17" -serde = { version = "1.0.152", features = ["derive"] } -toml = "0.7.2" +# Fileserver related +markup = "0.13.1" +humansize = "2.1.3" +mime_guess = "2.0.4" + + +bytes = "1.4.0" +anyhow = "1.0.69" +thiserror = "1.0.38" diff --git a/src/config.rs b/src/config.rs index 210a1e6..58b885a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,8 +22,17 @@ pub struct HttpsConfig { } #[derive(Debug, Serialize, Deserialize)] -pub struct HostConfig { - pub backend: SocketAddr, +#[serde(untagged)] +pub enum HostConfig { + Backend { backend: SocketAddr }, + Files { files: FileserverConfig }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct FileserverConfig { + pub root: PathBuf, + #[serde(default)] + pub index: bool, } impl Config { diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..cbeb6a6 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,21 @@ +#[derive(Debug, thiserror::Error)] +pub enum ServiceError { + #[error("hyper error")] + Hyper(hyper::Error), + #[error("unknown host")] + NoHost, + #[error("can't connect to the backend")] + CantConnect, + #[error("not found")] + NotFound, + #[error("io error: {0}")] + Io(std::io::Error), + #[error("ohh. i didn't expect that this error can be generated.")] + Other, +} + +impl From<std::io::Error> for ServiceError { + fn from(e: std::io::Error) -> Self { + Self::Io(e) + } +} diff --git a/src/files.rs b/src/files.rs new file mode 100644 index 0000000..e092994 --- /dev/null +++ b/src/files.rs @@ -0,0 +1,193 @@ +use crate::{config::FileserverConfig, ServiceError}; +use bytes::{Bytes, BytesMut}; +use futures_util::{future, future::Either, ready, stream, FutureExt, Stream, StreamExt}; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use humansize::FormatSizeOptions; +use hyper::{ + body::{Frame, Incoming}, + header::{CONTENT_TYPE, LOCATION}, + http::HeaderValue, + Request, Response, StatusCode, +}; +use log::debug; +use markup::Render; +use std::{fs::Metadata, io, path::Path, pin::Pin, task::Poll}; +use tokio::{ + fs::{read_to_string, File}, + io::AsyncSeekExt, +}; +use tokio_util::io::poll_read_buf; + +pub async fn serve_files( + req: Request<Incoming>, + config: &FileserverConfig, +) -> Result<hyper::Response<BoxBody<Bytes, ServiceError>>, ServiceError> { + let rpath = req.uri().path(); + + let mut path = config.root.clone(); + for seg in rpath.split("/") { + if seg == "" || seg == ".." { + continue; // not ideal + } + path.push(seg) + } + + if !path.exists() { + return Err(ServiceError::NotFound); + } + + if path.is_dir() { + if !config.index { + return Err(ServiceError::NotFound); + } + + if !rpath.ends_with("/") { + let mut r = Response::new(String::new()); + *r.status_mut() = StatusCode::FOUND; + r.headers_mut().insert( + LOCATION, + HeaderValue::from_str(&format!("{}/", rpath)).map_err(|_| ServiceError::Other)?, + ); + return Ok(r.map(|b| b.map_err(|e| match e {}).boxed())); + } + + return index(&path, rpath.to_string()).await.map(|s| { + let mut r = Response::new(s); + r.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("text/html")); + r.map(|b| b.map_err(|e| match e {}).boxed()) + }); + } + + let file = File::open(path.clone()).await?; + + let mut r = Response::new(BoxBody::new(StreamBody::new( + StreamBody::new(file_stream(file, 4096, (0, u64::MAX))) + .map(|e| e.map(|e| Frame::data(e)).map_err(ServiceError::Io)), + ))); + + r.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str( + // no allocation possible here? + &mime_guess::from_path(path) + .first() + .map(|m| m.to_string()) + .unwrap_or("text/plain".to_string()), + ) + .unwrap(), + ); + Ok(r) +} + +// Adapted from warp (https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs). Thanks! +fn file_stream( + mut file: File, + buf_size: usize, + (start, end): (u64, u64), +) -> impl Stream<Item = Result<Bytes, io::Error>> + Send { + use std::io::SeekFrom; + + let seek = async move { + if start != 0 { + file.seek(SeekFrom::Start(start)).await?; + } + Ok(file) + }; + + seek.into_stream() + .map(move |result| { + let mut buf = BytesMut::new(); + let mut len = end - start; + let mut f = match result { + Ok(f) => f, + Err(f) => return Either::Left(stream::once(future::err(f))), + }; + + Either::Right(stream::poll_fn(move |cx| { + if len == 0 { + return Poll::Ready(None); + } + reserve_at_least(&mut buf, buf_size); + + let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) { + Ok(n) => n as u64, + Err(err) => { + debug!("file read error: {}", err); + return Poll::Ready(Some(Err(err))); + } + }; + + if n == 0 { + debug!("file read found EOF before expected length"); + return Poll::Ready(None); + } + + let mut chunk = buf.split().freeze(); + if n > len { + chunk = chunk.split_to(len as usize); + len = 0; + } else { + len -= n; + } + + Poll::Ready(Some(Ok(chunk))) + })) + }) + .flatten() +} + +fn reserve_at_least(buf: &mut BytesMut, cap: usize) { + if buf.capacity() - buf.len() < cap { + buf.reserve(cap); + } +} + +async fn index(path: &Path, rpath: String) -> Result<String, ServiceError> { + let files = path + .read_dir()? + .map(|e| e.and_then(|e| Ok((e.file_name().into_string().unwrap(), e.metadata()?)))) + .collect::<Result<Vec<_>, _>>()?; + if let Ok(indexhtml) = read_to_string(path.join("index.html")).await { + Ok(indexhtml) + } else { + let banner = read_to_string(path.join("index.banner.html")).await.ok(); + let mut s = String::new(); + IndexTemplate { + files, + banner, + path: rpath, + } + .render(&mut s) + .unwrap(); + Ok(s) + } +} + +markup::define! { + IndexTemplate(path: String, banner: Option<String>, files: Vec<(String, Metadata)>) { + @markup::doctype() + html { + head { + title { "Index of " @path } + } + body { + @if let Some(banner) = banner { + @markup::raw(banner) + } else { + h1 { "Index of " @path } + } + hr; + table { + @for (name, meta) in files { tr { + td { @if meta.file_type().is_dir() { "(dir)" } else { "(file)" } } + td { a[href=name] { @name } } + td { @humansize::format_size(meta.len(), FormatSizeOptions::default()) } + } } + } + hr; + footer { sub { "served by gnix" } } + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 2edbe3d..c325e61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,39 +1,35 @@ +#![feature(try_trait_v2)] + pub mod config; +pub mod error; +pub mod files; +pub mod proxy; -use crate::config::Config; +use crate::{ + config::{Config, HostConfig}, + files::serve_files, + proxy::proxy_request, +}; use anyhow::{anyhow, bail, Context, Result}; +use error::ServiceError; use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{ body::Incoming, - header::{HOST, UPGRADE}, - http::{ - uri::{PathAndQuery, Scheme}, - HeaderValue, - }, + header::{CONTENT_TYPE, HOST}, + http::HeaderValue, server::conn::http1, service::service_fn, - upgrade::OnUpgrade, - Request, Response, StatusCode, Uri, + Request, Response, StatusCode, }; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use std::{fs::File, io::BufReader, net::SocketAddr, path::Path, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::{TcpListener, TcpStream}, + net::TcpListener, signal::ctrl_c, }; use tokio_rustls::TlsAcceptor; -#[derive(Debug, thiserror::Error)] -enum ServiceError { - #[error("hyper error")] - Hyper(hyper::Error), - #[error("unknown host")] - NoHost, - #[error("can't connect to the backend")] - CantConnect, -} - #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init_from_env("LOG"); @@ -60,7 +56,7 @@ async fn serve_http(config: Arc<Config>) -> Result<()> { let (stream, addr) = listener.accept().await.context("accepting connection")?; debug!("connection from {addr}"); let config = config.clone(); - tokio::spawn(async move { serve_stream(config, stream, addr) }); + tokio::spawn(async move { serve_stream(config, stream, addr).await }); } } async fn serve_https(config: Arc<Config>) -> Result<()> { @@ -114,10 +110,11 @@ pub async fn serve_stream<T: AsyncRead + AsyncWrite + Unpin + Send + 'static>( Ok(r) => Ok(r), Err(ServiceError::Hyper(e)) => Err(e), Err(error) => Ok({ - let mut resp = Response::new(format!( - "the reverse proxy encountered an issue: {error}" - )); + let mut resp = + Response::new(format!("gnix encountered an issue: {error}")); *resp.status_mut() = StatusCode::BAD_REQUEST; + resp.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain")); resp } .map(|b| b.map_err(|e| match e {}).boxed())), @@ -148,34 +145,10 @@ fn load_private_key(path: &Path) -> anyhow::Result<rustls::PrivateKey> { async fn service( config: Arc<Config>, - mut req: Request<Incoming>, + req: Request<Incoming>, addr: SocketAddr, -) -> Result<hyper::Response<BoxBody<bytes::Bytes, hyper::Error>>, ServiceError> { - let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS); +) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> { debug!("{addr} ~> {:?} {}", req.headers().get(HOST), req.uri()); - *req.uri_mut() = Uri::builder() - .path_and_query( - req.uri() - .clone() - .path_and_query() - .cloned() - .unwrap_or(PathAndQuery::from_static("/")), - ) - .build() - .unwrap(); - - req.headers_mut().insert( - "x-forwarded-for", - HeaderValue::from_str(&format!("{addr}")).unwrap(), - ); - req.headers_mut().insert( - "x-forwarded-proto", - if scheme_secure { - HeaderValue::from_static("https") - } else { - HeaderValue::from_static("http") - }, - ); let route = config .hosts @@ -188,59 +161,10 @@ async fn service( )) .ok_or(ServiceError::NoHost)?; - let do_upgrade = req.headers().contains_key(UPGRADE); - let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); - - debug!("\tforwarding to {}", route.backend); - let mut resp = { - let client_stream = TcpStream::connect(&route.backend) - .await - .map_err(|_| ServiceError::CantConnect)?; - - let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) - .await - .map_err(ServiceError::Hyper)?; - tokio::task::spawn(async move { - if let Err(err) = conn.await { - warn!("connection failed: {:?}", err); - } - }); - sender - .send_request(req) - .await - .map_err(ServiceError::Hyper)? - }; - - resp.headers_mut() - .insert("server", HeaderValue::from_static("gnix")); - - if do_upgrade { - let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); - tokio::task::spawn(async move { - debug!("about upgrading connection, sending empty response"); - match ( - on_upgrade_upstream.unwrap().await, - on_upgrade_downstream.unwrap().await, - ) { - (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { - debug!("upgrade successful"); - match tokio::io::copy_bidirectional( - &mut upgraded_downstream, - &mut upgraded_upstream, - ) - .await - { - Ok((from_client, from_server)) => { - debug!("proxy socket terminated: {from_server} sent, {from_client} received") - } - Err(e) => warn!("proxy socket error: {e}"), - } - } - (a, b) => eprintln!("upgrade error: upstream={a:?} downstream={b:?}"), - } - }); + match route { + HostConfig::Backend { backend } => proxy_request(req, addr, backend).await, + HostConfig::Files { files } => serve_files(req, files).await, } - Ok(resp.map(|b| b.boxed())) } pub fn remove_port(s: &str) -> &str { diff --git a/src/proxy.rs b/src/proxy.rs new file mode 100644 index 0000000..65fc5a8 --- /dev/null +++ b/src/proxy.rs @@ -0,0 +1,100 @@ +use crate::ServiceError; +use http_body_util::{combinators::BoxBody, BodyExt}; +use hyper::{ + body::Incoming, + header::UPGRADE, + http::{ + uri::{PathAndQuery, Scheme}, + HeaderValue, + }, + upgrade::OnUpgrade, + Request, Uri, +}; +use log::{debug, error, warn}; +use std::net::SocketAddr; +use tokio::net::TcpStream; + +pub async fn proxy_request( + mut req: Request<Incoming>, + addr: SocketAddr, + backend: &SocketAddr, +) -> Result<hyper::Response<BoxBody<bytes::Bytes, ServiceError>>, ServiceError> { + let scheme_secure = req.uri().scheme() == Some(&Scheme::HTTPS); + *req.uri_mut() = Uri::builder() + .path_and_query( + req.uri() + .clone() + .path_and_query() + .cloned() + .unwrap_or(PathAndQuery::from_static("/")), + ) + .build() + .unwrap(); + + req.headers_mut().insert( + "x-forwarded-for", + HeaderValue::from_str(&format!("{addr}")).unwrap(), + ); + req.headers_mut().insert( + "x-forwarded-proto", + if scheme_secure { + HeaderValue::from_static("https") + } else { + HeaderValue::from_static("http") + }, + ); + + let do_upgrade = req.headers().contains_key(UPGRADE); + let on_upgrade_downstream = req.extensions_mut().remove::<OnUpgrade>(); + + debug!("\tforwarding to {}", backend); + let mut resp = { + let client_stream = TcpStream::connect(backend) + .await + .map_err(|_| ServiceError::CantConnect)?; + + let (mut sender, conn) = hyper::client::conn::http1::handshake(client_stream) + .await + .map_err(ServiceError::Hyper)?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + warn!("connection failed: {:?}", err); + } + }); + sender + .send_request(req) + .await + .map_err(ServiceError::Hyper)? + }; + + resp.headers_mut() + .insert("server", HeaderValue::from_static("gnix")); + + if do_upgrade { + let on_upgrade_upstream = resp.extensions_mut().remove::<OnUpgrade>(); + tokio::task::spawn(async move { + debug!("about upgrading connection, sending empty response"); + match ( + on_upgrade_upstream.unwrap().await, + on_upgrade_downstream.unwrap().await, + ) { + (Ok(mut upgraded_upstream), Ok(mut upgraded_downstream)) => { + debug!("upgrade successful"); + match tokio::io::copy_bidirectional( + &mut upgraded_downstream, + &mut upgraded_upstream, + ) + .await + { + Ok((from_client, from_server)) => { + debug!("proxy socket terminated: {from_server} sent, {from_client} received") + } + Err(e) => warn!("proxy socket error: {e}"), + } + } + (a, b) => error!("upgrade error: upstream={a:?} downstream={b:?}"), + } + }); + } + Ok(resp.map(|b| b.map_err(ServiceError::Hyper).boxed())) +} |