aboutsummaryrefslogtreecommitdiff
path: root/server/src/logic
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/logic')
-rw-r--r--server/src/logic/mod.rs9
-rw-r--r--server/src/logic/playersync.rs109
-rw-r--r--server/src/logic/session.rs208
-rw-r--r--server/src/logic/stream.rs246
-rw-r--r--server/src/logic/userdata.rs96
5 files changed, 668 insertions, 0 deletions
diff --git a/server/src/logic/mod.rs b/server/src/logic/mod.rs
new file mode 100644
index 0000000..745d11b
--- /dev/null
+++ b/server/src/logic/mod.rs
@@ -0,0 +1,9 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+pub mod playersync;
+pub mod session;
+pub mod stream;
+pub mod userdata;
diff --git a/server/src/logic/playersync.rs b/server/src/logic/playersync.rs
new file mode 100644
index 0000000..b4cc51b
--- /dev/null
+++ b/server/src/logic/playersync.rs
@@ -0,0 +1,109 @@
+use anyhow::bail;
+use chashmap::CHashMap;
+use futures::{SinkExt, StreamExt};
+use log::warn;
+use rocket::{get, State};
+use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket};
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast::{self, Sender};
+
+use crate::helper::cors::Cors;
+
+#[derive(Default)]
+pub struct PlayersyncChannels {
+ channels: CHashMap<String, broadcast::Sender<Message>>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "snake_case")]
+pub enum Packet {
+ Time(f64),
+ Playing(bool),
+ Join(String),
+ Leave(String),
+}
+
+#[get("/playersync/<channel>")]
+pub fn r_playersync(
+ ws: WebSocket,
+ state: &State<PlayersyncChannels>,
+ channel: &str,
+) -> Cors<Channel<'static>> {
+ let sender = state
+ .channels
+ .get(&channel.to_owned())
+ .map(|x| x.to_owned())
+ .unwrap_or_else(|| {
+ let ch = broadcast::channel(16).0;
+ state.channels.insert(channel.to_owned(), ch.clone());
+ ch
+ });
+ Cors(ws.channel(move |ws| {
+ Box::pin(async move {
+ let mut state = ClientState {
+ username: "unknown user".into(),
+ };
+ if let Err(e) = handle_socket(&sender, ws, &mut state).await {
+ warn!("streamsync websocket error: {e:?}")
+ }
+ let _ = sender.send(Message::Text(
+ serde_json::to_string(&Packet::Leave(state.username)).unwrap(),
+ ));
+ Ok(())
+ })
+ }))
+}
+
+struct ClientState {
+ username: String,
+}
+
+async fn handle_socket(
+ broadcast: &Sender<Message>,
+ mut ws: DuplexStream,
+ state: &mut ClientState,
+) -> anyhow::Result<()> {
+ let mut sub = broadcast.subscribe();
+ loop {
+ tokio::select! {
+ message = ws.next() => {
+ match handle_packet(broadcast, message,state) {
+ Err(e) => Err(e)?,
+ Ok(true) => return Ok(()),
+ Ok(false) => ()
+ }
+ },
+ message = sub.recv() => {
+ ws.send(message?).await?;
+ }
+ };
+ }
+}
+
+fn handle_packet(
+ broadcast: &Sender<Message>,
+ message: Option<rocket_ws::result::Result<Message>>,
+ state: &mut ClientState,
+) -> anyhow::Result<bool> {
+ let Some(message) = message else {
+ return Ok(true);
+ };
+ let message = message?.into_text()?;
+ let packet: Packet = serde_json::from_str(&message)?;
+
+ let broadcast = |p: Packet| -> anyhow::Result<()> {
+ broadcast.send(Message::Text(serde_json::to_string(&p)?))?;
+ Ok(())
+ };
+
+ match packet {
+ Packet::Join(username) => {
+ broadcast(Packet::Join(username.clone()))?;
+ state.username = username;
+ }
+ Packet::Leave(_) => bail!("illegal packet"),
+ p => broadcast(p)?,
+ };
+
+ Ok(false)
+}
diff --git a/server/src/logic/session.rs b/server/src/logic/session.rs
new file mode 100644
index 0000000..790e070
--- /dev/null
+++ b/server/src/logic/session.rs
@@ -0,0 +1,208 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use crate::ui::error::MyError;
+use aes_gcm_siv::{
+ aead::{generic_array::GenericArray, Aead},
+ KeyInit,
+};
+use anyhow::anyhow;
+use base64::Engine;
+use chrono::{DateTime, Duration, Utc};
+use jellybase::{database::Database, SECRETS};
+use jellycommon::user::{PermissionSet, User};
+use log::warn;
+use rocket::{
+ async_trait,
+ http::Status,
+ outcome::Outcome,
+ request::{self, FromRequest},
+ Request, State,
+};
+use serde::{Deserialize, Serialize};
+use std::sync::LazyLock;
+
+pub struct Session {
+ pub user: User,
+}
+
+pub struct AdminSession(pub Session);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct SessionData {
+ username: String,
+ expire: DateTime<Utc>,
+ permissions: PermissionSet,
+}
+
+impl Session {
+ pub async fn from_request_ut(req: &Request<'_>) -> Result<Self, MyError> {
+ let username;
+
+ #[cfg(not(feature = "bypass-auth"))]
+ {
+ let token = req
+ .query_value("session")
+ .map(|e| e.unwrap())
+ .or_else(|| req.query_value("api_key").map(|e| e.unwrap()))
+ .or_else(|| req.headers().get_one("X-MediaBrowser-Token"))
+ .or_else(|| {
+ req.headers()
+ .get_one("Authorization")
+ .and_then(parse_jellyfin_auth)
+ }) // for jellyfin compat
+ .or(req.cookies().get("session").map(|cookie| cookie.value()))
+ .ok_or(anyhow!("not logged in"))?;
+
+ // jellyfin urlescapes the token for *some* requests
+ let token = token.replace("%3D", "=");
+ username = validate(&token)?;
+ };
+
+ #[cfg(feature = "bypass-auth")]
+ {
+ parse_jellyfin_auth("a"); // unused warning is annoying
+ username = "admin".to_string();
+ }
+
+ let db = req.guard::<&State<Database>>().await.unwrap();
+
+ let user = db.get_user(&username)?.ok_or(anyhow!("user not found"))?;
+
+ Ok(Session { user })
+ }
+}
+
+fn parse_jellyfin_auth(h: &str) -> Option<&str> {
+ for tok in h.split(" ") {
+ if let Some(tok) = tok.strip_prefix("Token=\"") {
+ if let Some(tok) = tok.strip_suffix("\"") {
+ return Some(tok);
+ }
+ }
+ }
+ None
+}
+
+#[async_trait]
+impl<'r> FromRequest<'r> for Session {
+ type Error = MyError;
+ async fn from_request<'life0>(
+ request: &'r Request<'life0>,
+ ) -> request::Outcome<Self, Self::Error> {
+ match Session::from_request_ut(request).await {
+ Ok(x) => Outcome::Success(x),
+ Err(e) => {
+ warn!("authentificated route rejected: {e:?}");
+ Outcome::Forward(Status::Unauthorized)
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl<'r> FromRequest<'r> for AdminSession {
+ type Error = MyError;
+ async fn from_request<'life0>(
+ request: &'r Request<'life0>,
+ ) -> request::Outcome<Self, Self::Error> {
+ match Session::from_request_ut(request).await {
+ Ok(x) => {
+ if x.user.admin {
+ Outcome::Success(AdminSession(x))
+ } else {
+ Outcome::Error((
+ Status::Unauthorized,
+ MyError(anyhow!("you are not an admin")),
+ ))
+ }
+ }
+ Err(e) => {
+ warn!("authentificated route rejected: {e:?}");
+ Outcome::Forward(Status::Unauthorized)
+ }
+ }
+ }
+}
+
+static SESSION_KEY: LazyLock<[u8; 32]> = LazyLock::new(|| {
+ if let Some(sk) = &SECRETS.session_key {
+ let r = base64::engine::general_purpose::STANDARD
+ .decode(sk)
+ .expect("key invalid; should be valid base64");
+ r.try_into()
+ .expect("key has the wrong length; should be 32 bytes")
+ } else {
+ warn!("session_key not configured; generating a random one.");
+ [(); 32].map(|_| rand::random())
+ }
+});
+
+pub fn create(username: String, permissions: PermissionSet, expire: Duration) -> String {
+ let session_data = SessionData {
+ expire: Utc::now() + expire,
+ username: username.to_owned(),
+ permissions,
+ };
+ let mut plaintext =
+ bincode::serde::encode_to_vec(&session_data, bincode::config::standard()).unwrap();
+
+ while plaintext.len() % 16 == 0 {
+ plaintext.push(0);
+ }
+
+ let cipher = aes_gcm_siv::Aes256GcmSiv::new_from_slice(&*SESSION_KEY).unwrap();
+ let nonce = [(); 12].map(|_| rand::random());
+ let mut ciphertext = cipher
+ .encrypt(&GenericArray::from(nonce), plaintext.as_slice())
+ .unwrap();
+ ciphertext.extend(nonce);
+
+ base64::engine::general_purpose::URL_SAFE.encode(&ciphertext)
+}
+
+pub fn validate(token: &str) -> anyhow::Result<String> {
+ let ciphertext = base64::engine::general_purpose::URL_SAFE.decode(token)?;
+ let cipher = aes_gcm_siv::Aes256GcmSiv::new_from_slice(&*SESSION_KEY).unwrap();
+ let (ciphertext, nonce) = ciphertext.split_at(ciphertext.len() - 12);
+ let plaintext = cipher
+ .decrypt(nonce.into(), ciphertext)
+ .map_err(|e| anyhow!("decryption failed: {e:?}"))?;
+
+ let (session_data, _): (SessionData, _) =
+ bincode::serde::decode_from_slice(&plaintext, bincode::config::standard())?;
+
+ if session_data.expire < Utc::now() {
+ Err(anyhow!("session expired"))?
+ }
+
+ Ok(session_data.username)
+}
+
+#[test]
+fn test() {
+ jellybase::use_test_config();
+ let tok = create(
+ "blub".to_string(),
+ jellycommon::user::PermissionSet::default(),
+ Duration::days(1),
+ );
+ validate(&tok).unwrap();
+}
+
+#[test]
+fn test_crypto() {
+ jellybase::use_test_config();
+ let nonce = [(); 12].map(|_| rand::random());
+ let cipher = aes_gcm_siv::Aes256GcmSiv::new_from_slice(&*SESSION_KEY).unwrap();
+ let plaintext = b"testing stuff---";
+ let ciphertext = cipher
+ .encrypt(&GenericArray::from(nonce), plaintext.as_slice())
+ .unwrap();
+ let plaintext2 = cipher
+ .decrypt((&nonce).into(), ciphertext.as_slice())
+ .unwrap();
+ assert_eq!(plaintext, plaintext2.as_slice());
+}
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs
new file mode 100644
index 0000000..5bba9c2
--- /dev/null
+++ b/server/src/logic/stream.rs
@@ -0,0 +1,246 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use super::session::Session;
+use crate::{database::Database, ui::error::MyError};
+use anyhow::{anyhow, Result};
+use jellybase::{assetfed::AssetInner, federation::Federation};
+use jellycommon::{stream::StreamSpec, TrackSource};
+use jellystream::SMediaInfo;
+use log::{info, warn};
+use rocket::{
+ get, head,
+ http::{Header, Status},
+ request::{self, FromRequest},
+ response::{self, Redirect, Responder},
+ Either, Request, Response, State,
+};
+use std::{
+ collections::{BTreeMap, BTreeSet},
+ ops::Range,
+ sync::Arc,
+};
+use tokio::io::{duplex, DuplexStream};
+
+#[head("/n/<_id>/stream?<spec..>")]
+pub async fn r_stream_head(
+ _sess: Session,
+ _id: &str,
+ spec: BTreeMap<String, String>,
+) -> Result<Either<StreamResponse, Redirect>, MyError> {
+ let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
+ let head = jellystream::stream_head(&spec);
+ Ok(Either::Left(StreamResponse {
+ stream: duplex(0).0,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ range: None,
+ }))
+}
+
+#[get("/n/<id>/stream?<spec..>")]
+pub async fn r_stream(
+ _session: Session,
+ _federation: &State<Federation>,
+ db: &State<Database>,
+ id: &str,
+ range: Option<RequestRange>,
+ spec: BTreeMap<String, String>,
+) -> Result<Either<StreamResponse, RedirectResponse>, MyError> {
+ let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
+ // TODO perm
+ let node = db
+ .get_node_slug(id)?
+ .ok_or(anyhow!("node does not exist"))?;
+
+ let media = Arc::new(
+ node.media
+ .clone()
+ .ok_or(anyhow!("item does not contain media"))?,
+ );
+
+ // TODO its unclear how requests with multiple tracks should be handled.
+ // if spec.track.len() == 1 {
+ // let ti = spec.track[0];
+ // if let TrackSource::Remote(remote_index) = media.tracks[ti].source {
+ // session
+ // .user
+ // .permissions
+ // .assert(&UserPermission::FederatedContent)?;
+
+ // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti];
+ // let host = track
+ // .federated
+ // .last()
+ // .ok_or(anyhow!("federation inconsistent"))?;
+
+ // let FederationAccount {
+ // password, username, ..
+ // } = SECRETS
+ // .federation
+ // .get(host)
+ // .ok_or(anyhow!("no credentials on the server-side"))?;
+
+ // info!("creating session on {host}");
+ // let instance = federation.get_instance(host)?.to_owned();
+ // let session = instance
+ // .login(CreateSessionParams {
+ // username: username.to_owned(),
+ // password: password.to_owned(),
+ // expire: Some(60),
+ // drop_permissions: Some(HashSet::from_iter([
+ // UserPermission::ManageSelf,
+ // UserPermission::Admin, // in case somebody federated the admin :)))
+ // ])),
+ // })
+ // .await?;
+
+ // let uri = session.stream_url(
+ // node.slug.clone().into(),
+ // &StreamSpec {
+ // track: vec![remote_index],
+ // ..spec
+ // },
+ // );
+ // info!("federation redirect");
+ // return Ok(Either::Right(RedirectResponse(uri)));
+ // }
+ // }
+
+ info!(
+ "stream request (range={})",
+ range
+ .as_ref()
+ .map(|r| r.to_cr_hv())
+ .unwrap_or("none".to_string())
+ );
+
+ let urange = match &range {
+ Some(r) => {
+ let r = r.0.first().unwrap_or(&(None..None));
+ r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize)
+ }
+ None => 0..(isize::MAX as usize),
+ };
+
+ let head = jellystream::stream_head(&spec);
+
+ let mut sources = BTreeSet::new();
+ for t in &media.tracks {
+ if let TrackSource::Local(x) = &t.source {
+ if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? {
+ sources.insert(m.path);
+ }
+ }
+ }
+ let media = Arc::new(SMediaInfo {
+ files: sources,
+ info: node,
+ });
+
+ match jellystream::stream(media, spec, urange).await {
+ Ok(stream) => Ok(Either::Left(StreamResponse {
+ stream,
+ range,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ })),
+ Err(e) => {
+ warn!("stream error: {e}");
+ Err(MyError(e))
+ }
+ }
+}
+
+pub struct RedirectResponse(String);
+
+#[rocket::async_trait]
+impl<'r> Responder<'r, 'static> for RedirectResponse {
+ fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
+ let mut b = Response::build();
+ b.status(Status::Found);
+ b.header(Header::new("access-control-allow-origin", "*"));
+ b.header(Header::new("location", self.0));
+ Ok(b.finalize())
+ }
+}
+
+pub struct StreamResponse {
+ stream: DuplexStream,
+ advertise_range: bool,
+ content_type: &'static str,
+ range: Option<RequestRange>,
+}
+
+#[rocket::async_trait]
+impl<'r> Responder<'r, 'static> for StreamResponse {
+ fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
+ let mut b = Response::build();
+ b.status(Status::Ok);
+ b.header(Header::new("access-control-allow-origin", "*"));
+ if self.advertise_range {
+ //* it is very important here to not reply with content range if we didnt advertise.
+ //* mpv requests range but will crash if we dont pretend to not support it.
+ if let Some(range) = self.range {
+ b.status(Status::PartialContent);
+ b.header(Header::new("content-range", range.to_cr_hv()));
+ }
+ b.header(Header::new("accept-ranges", "bytes"));
+ }
+ b.header(Header::new("content-type", self.content_type))
+ .streamed_body(self.stream)
+ .ok()
+ }
+}
+
+#[derive(Debug)]
+pub struct RequestRange(Vec<Range<Option<usize>>>);
+
+impl RequestRange {
+ pub fn to_cr_hv(&self) -> String {
+ assert_eq!(self.0.len(), 1);
+ format!(
+ "bytes {}-{}/*",
+ self.0[0].start.map(|e| e.to_string()).unwrap_or_default(),
+ self.0[0].end.map(|e| e.to_string()).unwrap_or_default()
+ )
+ }
+ pub fn from_hv(s: &str) -> Result<Self> {
+ Ok(Self(
+ s.strip_prefix("bytes=")
+ .ok_or(anyhow!("prefix expected"))?
+ .split(',')
+ .map(|s| {
+ let (l, r) = s
+ .split_once('-')
+ .ok_or(anyhow!("range delimeter missing"))?;
+ let km = |s: &str| {
+ if s.is_empty() {
+ Ok::<_, anyhow::Error>(None)
+ } else {
+ Ok(Some(s.parse()?))
+ }
+ };
+ Ok(km(l)?..km(r)?)
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ))
+ }
+}
+
+#[rocket::async_trait]
+impl<'r> FromRequest<'r> for RequestRange {
+ type Error = anyhow::Error;
+
+ async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
+ match req.headers().get("range").next() {
+ Some(v) => match Self::from_hv(v) {
+ Ok(v) => rocket::outcome::Outcome::Success(v),
+ Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)),
+ },
+ None => rocket::outcome::Outcome::Forward(Status::Ok),
+ }
+ }
+}
diff --git a/server/src/logic/userdata.rs b/server/src/logic/userdata.rs
new file mode 100644
index 0000000..64a136f
--- /dev/null
+++ b/server/src/logic/userdata.rs
@@ -0,0 +1,96 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use crate::{ui::error::MyResult, ui::node::rocket_uri_macro_r_library_node};
+use jellybase::database::Database;
+use jellycommon::{
+ user::{NodeUserData, WatchedState},
+ NodeID,
+};
+use rocket::{
+ form::Form, get, post, response::Redirect, serde::json::Json, FromForm, FromFormField, State,
+ UriDisplayQuery,
+};
+
+use super::session::Session;
+
+#[derive(Debug, FromFormField, UriDisplayQuery)]
+pub enum UrlWatchedState {
+ None,
+ Watched,
+ Pending,
+}
+
+#[get("/n/<id>/userdata")]
+pub fn r_node_userdata(
+ session: Session,
+ db: &State<Database>,
+ id: NodeID,
+) -> MyResult<Json<NodeUserData>> {
+ let u = db
+ .get_node_udata(id, &session.user.name)?
+ .unwrap_or_default();
+ Ok(Json(u))
+}
+
+#[post("/n/<id>/watched?<state>")]
+pub async fn r_node_userdata_watched(
+ session: Session,
+ db: &State<Database>,
+ id: NodeID,
+ state: UrlWatchedState,
+) -> MyResult<Redirect> {
+ // TODO perm
+ db.update_node_udata(id, &session.user.name, |udata| {
+ udata.watched = match state {
+ UrlWatchedState::None => WatchedState::None,
+ UrlWatchedState::Watched => WatchedState::Watched,
+ UrlWatchedState::Pending => WatchedState::Pending,
+ };
+ Ok(())
+ })?;
+ Ok(Redirect::found(rocket::uri!(r_library_node(id))))
+}
+
+#[derive(FromForm)]
+pub struct UpdateRating {
+ #[field(validate = range(-10..=10))]
+ rating: i32,
+}
+
+#[post("/n/<id>/update_rating", data = "<form>")]
+pub async fn r_node_userdata_rating(
+ session: Session,
+ db: &State<Database>,
+ id: NodeID,
+ form: Form<UpdateRating>,
+) -> MyResult<Redirect> {
+ // TODO perm
+ db.update_node_udata(id, &session.user.name, |udata| {
+ udata.rating = form.rating;
+ Ok(())
+ })?;
+ Ok(Redirect::found(rocket::uri!(r_library_node(id))))
+}
+
+#[post("/n/<id>/progress?<t>")]
+pub async fn r_node_userdata_progress(
+ session: Session,
+ db: &State<Database>,
+ id: NodeID,
+ t: f64,
+) -> MyResult<()> {
+ // TODO perm
+ db.update_node_udata(id, &session.user.name, |udata| {
+ udata.watched = match udata.watched {
+ WatchedState::None | WatchedState::Pending | WatchedState::Progress(_) => {
+ WatchedState::Progress(t)
+ }
+ WatchedState::Watched => WatchedState::Watched,
+ };
+ Ok(())
+ })?;
+ Ok(())
+}