diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/Cargo.toml | 1 | ||||
-rw-r--r-- | server/src/routes/mod.rs | 24 | ||||
-rw-r--r-- | server/src/routes/playersync.rs | 55 | ||||
-rw-r--r-- | server/src/routes/streamsync.rs | 11 |
4 files changed, 76 insertions, 15 deletions
diff --git a/server/Cargo.toml b/server/Cargo.toml index a124876..ca8315c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.8.5" base64 = "0.21.5" chrono = { version = "0.4.31", features = ["serde"] } vte = "0.13.0" +chashmap = "2.2.2" argon2 = "0.5.2" aes-gcm-siv = "0.11.1" diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index e4d05e5..a6a086f 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -10,8 +10,14 @@ use jellybase::{federation::Federation, CONF, SECRETS}; use log::warn; use rand::random; use rocket::{ - catchers, config::SecretKey, fairing::AdHoc, fs::FileServer, get, http::Header, routes, Build, - Config, Rocket, + catchers, + config::SecretKey, + fairing::AdHoc, + fs::FileServer, + get, + http::Header, + response::{self, Responder}, + routes, Build, Config, Request, Rocket, }; use std::fs::File; use stream::r_stream; @@ -37,11 +43,11 @@ use ui::{ }; use userdata::{r_node_userdata, r_player_progress, r_player_watched}; -use self::streamsync::r_streamsync; +use self::playersync::{r_streamsync, PlayersyncChannels}; pub mod api; pub mod stream; -pub mod streamsync; +pub mod playersync; pub mod ui; pub mod userdata; @@ -76,6 +82,7 @@ pub fn build_rocket(database: DataAcid, federation: Federation) -> Rocket<Build> }) .manage(database) .manage(federation) + .manage(PlayersyncChannels::default()) .attach(AdHoc::on_response("set server header", |_req, res| { res.set_header(Header::new("server", "jellything")); Box::pin(async {}) @@ -133,3 +140,12 @@ pub fn build_rocket(database: DataAcid, federation: Federation) -> Rocket<Build> fn r_favicon() -> MyResult<File> { Ok(File::open(CONF.asset_path.join("favicon.ico"))?) } + +pub struct Cors<T>(pub T); +impl<'r, T: Responder<'r, 'static>> Responder<'r, 'static> for Cors<T> { + fn respond_to(self, request: &'r Request<'_>) -> response::Result<'static> { + let mut r = self.0.respond_to(request)?; + r.adjoin_header(Header::new("access-controll-allow-origin", "*")); + Ok(r) + } +} diff --git a/server/src/routes/playersync.rs b/server/src/routes/playersync.rs new file mode 100644 index 0000000..2a7d3f6 --- /dev/null +++ b/server/src/routes/playersync.rs @@ -0,0 +1,55 @@ +use super::Cors; +use chashmap::CHashMap; +use futures::{SinkExt, StreamExt}; +use log::warn; +use rocket::{get, State}; +use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; +use tokio::sync::broadcast::{self, Sender}; + +#[derive(Default)] +pub struct PlayersyncChannels { + channels: CHashMap<String, broadcast::Sender<Message>>, +} + +#[get("/playersync/<channel>")] +pub fn r_streamsync( + ws: WebSocket, + state: &State<PlayersyncChannels>, + channel: &str, +) -> Cors<Channel<'static>> { + let sender = state + .channels + .get(&channel.to_owned()) + .map(|x| x.to_owned()) + .unwrap_or_else(|| { + let ch = broadcast::channel(16).0; + state.channels.insert(channel.to_owned(), ch.clone()); + ch + }); + Cors(ws.channel(move |ws| { + Box::pin(async move { + if let Err(e) = handle_socket(sender, ws).await { + warn!("streamsync websocket error: {e:?}") + } + Ok(()) + }) + })) +} + +async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyhow::Result<()> { + let mut sub = broadcast.subscribe(); + loop { + tokio::select! { + message = ws.next() => { + if let Some(message) = message { + broadcast.send(message?)?; + } else { + return Ok(()) + } + }, + message = sub.recv() => { + ws.send(message?).await?; + } + }; + } +} diff --git a/server/src/routes/streamsync.rs b/server/src/routes/streamsync.rs deleted file mode 100644 index d4a4d7e..0000000 --- a/server/src/routes/streamsync.rs +++ /dev/null @@ -1,11 +0,0 @@ -use rocket::get; -use rocket_ws::{Stream, WebSocket}; - -#[get("/streamsync")] -pub fn r_streamsync(ws: WebSocket) -> Stream!['static] { - Stream! { ws => - for await message in ws { - yield message?; - } - } -} |