aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server/src/routes/playersync.rs65
m---------web/script/jshelper0
-rw-r--r--web/script/player/mod.ts30
-rw-r--r--web/script/player/sync.ts24
4 files changed, 100 insertions, 19 deletions
diff --git a/server/src/routes/playersync.rs b/server/src/routes/playersync.rs
index 2a7d3f6..1ef9d73 100644
--- a/server/src/routes/playersync.rs
+++ b/server/src/routes/playersync.rs
@@ -1,9 +1,11 @@
use super::Cors;
+use anyhow::bail;
use chashmap::CHashMap;
use futures::{SinkExt, StreamExt};
use log::warn;
use rocket::{get, State};
use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket};
+use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::{self, Sender};
#[derive(Default)]
@@ -11,6 +13,15 @@ pub struct PlayersyncChannels {
channels: CHashMap<String, broadcast::Sender<Message>>,
}
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "snake_case")]
+pub enum Packet {
+ Time(f64),
+ Playing(bool),
+ Join(String),
+ Leave(String),
+}
+
#[get("/playersync/<channel>")]
pub fn r_streamsync(
ws: WebSocket,
@@ -28,23 +39,37 @@ pub fn r_streamsync(
});
Cors(ws.channel(move |ws| {
Box::pin(async move {
- if let Err(e) = handle_socket(sender, ws).await {
+ let mut state = ClientState {
+ username: "unknown user".into(),
+ };
+ if let Err(e) = handle_socket(&sender, ws, &mut state).await {
warn!("streamsync websocket error: {e:?}")
}
+ let _ = sender.send(Message::Text(
+ serde_json::to_string(&Packet::Leave(state.username)).unwrap(),
+ ));
Ok(())
})
}))
}
-async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyhow::Result<()> {
+struct ClientState {
+ username: String,
+}
+
+async fn handle_socket(
+ broadcast: &Sender<Message>,
+ mut ws: DuplexStream,
+ state: &mut ClientState,
+) -> anyhow::Result<()> {
let mut sub = broadcast.subscribe();
loop {
tokio::select! {
message = ws.next() => {
- if let Some(message) = message {
- broadcast.send(message?)?;
- } else {
- return Ok(())
+ match handle_packet(broadcast, message,state) {
+ Err(e) => Err(e)?,
+ Ok(true) => return Ok(()),
+ Ok(false) => ()
}
},
message = sub.recv() => {
@@ -53,3 +78,31 @@ async fn handle_socket(broadcast: Sender<Message>, mut ws: DuplexStream) -> anyh
};
}
}
+
+fn handle_packet(
+ broadcast: &Sender<Message>,
+ message: Option<rocket_ws::result::Result<Message>>,
+ state: &mut ClientState,
+) -> anyhow::Result<bool> {
+ let Some(message) = message else {
+ return Ok(true);
+ };
+ let message = message?.into_text()?;
+ let packet: Packet = serde_json::from_str(&message)?;
+
+ let broadcast = |p: Packet| -> anyhow::Result<()> {
+ broadcast.send(Message::Text(serde_json::to_string(&p)?))?;
+ Ok(())
+ };
+
+ match packet {
+ Packet::Join(username) => {
+ broadcast(Packet::Join(username.clone()))?;
+ state.username = username;
+ }
+ Packet::Leave(_) => bail!("illegal packet"),
+ p => broadcast(p)?,
+ };
+
+ Ok(false)
+}
diff --git a/web/script/jshelper b/web/script/jshelper
-Subproject d20a4fcf83818ead0b0775ded507e9e6f0fcc9b
+Subproject 95603bb2d1ff60dea7c9c9b70d2aec0768c7be2
diff --git a/web/script/player/mod.ts b/web/script/player/mod.ts
index 139f23f..2fd9046 100644
--- a/web/script/player/mod.ts
+++ b/web/script/player/mod.ts
@@ -96,7 +96,8 @@ function initialize_player(el: HTMLElement, node_id: string) {
}
const settings_popup = () => {
const button = e("button", "settings", { class: "icon" })
- let channelname: HTMLInputElement;
+ let channel_name: HTMLInputElement;
+ let channel_name_copy: HTMLInputElement;
new Popup(button, popups, () => e("div", { class: "jsp-settings-popup" },
e("h2", "Settings"),
e("div", { class: ["jsp-controlgroup", "jsp-playersync-controls"] },
@@ -106,14 +107,30 @@ function initialize_player(el: HTMLElement, node_id: string) {
e("span", "Sync enabled."),
e("button", "Disable", {
onclick: () => { sync_state.value?.destroy(); sync_state.value = undefined }
- }))
+ }),
+ e("p", "Session ID: ",
+ channel_name_copy = e("input", { type: "text", disabled: true, value: sync.name }),
+ e("button", "content_paste_go", {
+ class: "icon",
+ onclick: () => {
+ logger.log("Session ID copied to clipboard.")
+ navigator.clipboard.writeText(channel_name_copy.value)
+ }
+ })
+ ))
: e("div",
- channelname = e("input", { type: "text", placeholder: "someroom:example.org" }),
- e("button", "Sync!", {
+ channel_name = e("input", { type: "text", placeholder: "someroom:example.org" }),
+ e("button", "Join", {
+ onclick: () => {
+ if (!channel_name.value.length) return
+ sync_state.value?.destroy()
+ sync_state.value = new Playersync(player, logger, channel_name.value)
+ }
+ }), e("br"),
+ e("button", "Create new session", {
onclick: () => {
- if (!channelname.value.length) return
sync_state.value?.destroy()
- sync_state.value = new Playersync(player, logger, channelname.value)
+ sync_state.value = new Playersync(player, logger)
}
}))
)
@@ -192,6 +209,7 @@ function initialize_player(el: HTMLElement, node_id: string) {
player.seek(p * player.duration.value)
})
document.body.addEventListener("keydown", k => {
+ if (k.ctrlKey) return
if (k.code == "Period") player.pause(), player.frame_forward()
if (k.code == "Space") toggle_playing()
else if (k.code == "KeyV") show_stats.value = !show_stats.value
diff --git a/web/script/player/sync.ts b/web/script/player/sync.ts
index a2029ea..6f2b86f 100644
--- a/web/script/player/sync.ts
+++ b/web/script/player/sync.ts
@@ -16,17 +16,22 @@ export class Playersync {
private ws: WebSocket
private on_destroy: (() => void)[] = []
+ public name: string
+
private cancel_pers: undefined | (() => void)
set_pers(s?: string) {
if (this.cancel_pers) this.cancel_pers(), this.cancel_pers = undefined
if (s) this.cancel_pers = this.logger?.log_persistent(s)
}
- constructor(private player: Player, private logger: Logger<string>, private channel_name: string) {
+ constructor(private player: Player, private logger: Logger<string>, private channel_name?: string) {
this.set_pers("Playersync enabling...")
- let [localpart, remotepart] = channel_name.split(":")
+ channel_name ??= Math.random().toString(16).padEnd(5, "0").substring(2).substring(0, 6)
+ let [localpart, remotepart, port] = channel_name.split(":")
if (!remotepart?.length) remotepart = window.location.host
+ if (port) remotepart += ":" + port
+ this.name = localpart + ":" + remotepart
this.ws = new WebSocket(`${window.location.protocol.endsWith("s:") ? "wss" : "ws"}://${remotepart}/playersync/${encodeURIComponent(localpart)}`)
this.on_destroy.push(() => this.ws.close())
@@ -54,25 +59,31 @@ export class Playersync {
if (packet.playing === true) this.player.play()
if (packet.playing === false) this.player.pause()
if (packet.join) this.logger.log(`${packet.join} joined.`)
- if (packet.leave) this.logger.log(`${packet.join} left.`)
+ if (packet.leave) this.logger.log(`${packet.leave} left.`)
}
let cb: () => void
+ const send_time = () => {
+ const time = this.player.video.currentTime
+ if (Math.abs(last_time - time) < 0.01) return
+ this.send({ time: this.player.video.currentTime })
+ }
+
player.video.addEventListener("play", cb = () => {
+ send_time()
this.send({ playing: true })
})
this.on_destroy.push(() => player.video.removeEventListener("play", cb))
player.video.addEventListener("pause", cb = () => {
this.send({ playing: false })
+ send_time()
})
this.on_destroy.push(() => player.video.removeEventListener("pause", cb))
player.video.addEventListener("seeking", cb = () => {
- const time = this.player.video.currentTime
- if (Math.abs(last_time - time) < 0.01) return
- this.send({ time: this.player.video.currentTime })
+ send_time()
})
this.on_destroy.push(() => player.video.removeEventListener("seeking", cb))
}
@@ -81,7 +92,6 @@ export class Playersync {
this.set_pers()
this.logger.log("Playersync disabled.")
this.on_destroy.forEach(f => f())
- this.send({ leave: get_username() })
}
send(p: Packet) {