pub mod config; use anyhow::bail; use azalea_protocol::{ packets::{ handshake::{client_intention_packet::ClientIntentionPacket, ServerboundHandshakePacket}, login::ServerboundLoginPacket, ConnectionProtocol, }, read::read_packet, write::write_packet, }; use bytes::BytesMut; use config::Config; use inotify::{EventMask, Inotify, WatchMask}; use log::{error, info, warn}; use std::{ fs::read_to_string, sync::{Arc, RwLock}, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, }, }; #[tokio::main] async fn main() { env_logger::builder() .filter_level(log::LevelFilter::Info) .parse_env("LOG") .init(); let config = Arc::new(RwLock::new(Arc::new( serde_yaml::from_str::(&read_to_string("proxy.yaml").unwrap()).unwrap(), ))); { let config = config.clone(); std::thread::spawn(move || { let mut inotify = Inotify::init().unwrap(); inotify .add_watch( ".", WatchMask::MODIFY | WatchMask::CREATE | WatchMask::DELETE, ) .unwrap(); let mut buffer = [0u8; 4096]; loop { let events = inotify .read_events_blocking(&mut buffer) .expect("Failed to read inotify events"); for event in events { if event.mask.contains(EventMask::MODIFY) { info!("reloading config"); match serde_yaml::from_str::(&read_to_string("proxy.yaml").unwrap()) { Ok(conf) => *config.write().unwrap() = Arc::new(conf), Err(e) => error!("config has errors: {e}"), } } } } }); } let listener = TcpListener::bind(config.read().unwrap().bind) .await .unwrap(); info!("listening"); loop { match listener.accept().await { Ok((sock, addr)) => { info!("connected: {addr}"); let config = config.read().unwrap().clone(); tokio::spawn(async move { match handle_client(config, sock).await { Ok(()) => info!("disconnected: {addr}"), Err(err) => warn!("error: ({addr}) {err}"), } }); } Err(e) => error!("{}", e), } } } async fn handle_client(config: Arc, sock: TcpStream) -> Result<(), anyhow::Error> { let mut buf = BytesMut::new(); sock.set_nodelay(true)?; let (mut downstream_reader, downstream_writer) = sock.into_split(); let handshake = read_packet::( &mut downstream_reader, &mut buf, None, &mut None, ) .await?; let upstream_handshake = match handshake { ServerboundHandshakePacket::ClientIntention(p) => { info!( "new client (version={}, intent={:?})", p.protocol_version, p.intention ); if p.protocol_version != config.protocol { bail!("protocol version unsupported") } match p.intention { ConnectionProtocol::Status => { handle_status_intent(config, downstream_writer, downstream_reader).await?; return Ok(()); } ConnectionProtocol::Login => {} _ => bail!("unsupported intent"), } p } }; let login = read_packet::(&mut downstream_reader, &mut buf, None, &mut None) .await?; let upstream_login = match login { ServerboundLoginPacket::Hello(mut p) => { info!("client hello (username={:?})", p.username); let profile = config .whitelist .iter() .find(|e| e.token.as_ref().map_or(false, |e| e == &p.username)); match profile { Some(profile) => { info!("login as {:?}", profile.username); p.username = profile.username.clone(); } None => bail!("no profile found, disconnecting client"), } p } ServerboundLoginPacket::Key(_) => bail!("key not supported"), ServerboundLoginPacket::CustomQuery(_) => bail!("custom query not supported"), }; let upstream = TcpStream::connect(config.backend).await?; let (upstream_reader, mut upstream_writer) = upstream.into_split(); write_packet( &ServerboundHandshakePacket::ClientIntention(upstream_handshake), &mut upstream_writer, None, &mut None, ) .await?; write_packet::( &ServerboundLoginPacket::Hello(upstream_login), &mut upstream_writer, None, &mut None, ) .await?; let task_res = tokio::spawn(async move { connect(downstream_writer, upstream_reader).await }); let res = connect(upstream_writer, downstream_reader).await; task_res.abort(); res?; if let Ok(r) = task_res.await { r?; } Ok(()) } async fn connect(mut writer: OwnedWriteHalf, mut reader: OwnedReadHalf) -> anyhow::Result<()> { let mut buf = [0; 1024]; loop { let size = reader.read(&mut buf).await?; if size == 0 { break Ok(()); } writer.write_all(&buf[..size]).await?; } } async fn handle_status_intent( config: Arc, writer: OwnedWriteHalf, reader: OwnedReadHalf, ) -> anyhow::Result<()> { // let mut buf = BytesMut::new(); let upstream = TcpStream::connect(config.backend).await?; upstream.set_nodelay(true)?; let (upstream_reader, mut upstream_writer) = upstream.into_split(); write_packet( &ServerboundHandshakePacket::ClientIntention(ClientIntentionPacket { protocol_version: config.protocol, hostname: config.backend.ip().to_string(), port: config.backend.port(), intention: ConnectionProtocol::Status, }), &mut upstream_writer, None, &mut None, ) .await?; let task_res = tokio::spawn(async move { connect(writer, upstream_reader).await }); let res = connect(upstream_writer, reader).await; task_res.abort(); res?; if let Ok(r) = task_res.await { r?; } return Ok(()); } // loop { // let req = read_packet::(&mut reader, &mut buf, None, &mut None) // .await?; // info!("{req:?}"); // match req { // ServerboundStatusPacket::StatusRequest(..) => { // write_packet( // &ClientboundStatusPacket::StatusResponse(ClientboundStatusResponsePacket { // description: azalea_chat::component::Component::Text( // legacy_color_code_to_text_component("blub"), // ), // favicon: None, // players: Players { // max: 10, // online: 0, // sample: vec![], // }, // version: Version { // name: azalea_chat::component::Component::Text( // legacy_color_code_to_text_component("blub"), // ), // protocol: 760, // }, // }), // &mut writer, // None, // &mut None, // ) // .await?; // } // ServerboundStatusPacket::PingRequest(p) => { // write_packet( // &ClientboundStatusPacket::PongResponse(ClientboundPongResponsePacket { // time: p.time, // }), // &mut writer, // None, // &mut None, // ) // .await?; // } // } // } // Ok(()) // for _ in 0..3 { // let a = read_packet::( // &mut upstream_reader, // &mut buf, // None, // &mut None, // ) // .await?; // debug!("login {a:?}"); // write_packet(&a, &mut downstream_writer, None, &mut None).await?; // } // tokio::spawn(async move { // let mut buf = BytesMut::new(); // loop { // let a = read_packet::( // &mut upstream_reader, // &mut buf, // None, // &mut None, // ) // .await // .unwrap(); // debug!("downstream {a:?}"); // write_packet(&a, &mut downstream_writer, None, &mut None) // .await // .unwrap(); // } // }); // loop { // let a = read_packet::( // &mut downstream_reader, // &mut buf, // None, // &mut None, // ) // .await?; // debug!("upstream {a:?}"); // write_packet(&a, &mut upstream_writer, None, &mut None).await?; // }