diff options
author | metamuffin <metamuffin@disroot.org> | 2025-04-27 19:25:11 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-04-27 19:25:11 +0200 |
commit | 11a585b3dbe620dcc8772e713b22f1d9ba80d598 (patch) | |
tree | 44f8d97137412aefc79a2425a489c34fa3e5f6c5 /server/src/logic | |
parent | d871aa7c5bba49ff55170b5d2dac9cd440ae7170 (diff) | |
download | jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar.bz2 jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar.zst |
move files around
Diffstat (limited to 'server/src/logic')
-rw-r--r-- | server/src/logic/mod.rs | 9 | ||||
-rw-r--r-- | server/src/logic/playersync.rs | 109 | ||||
-rw-r--r-- | server/src/logic/session.rs | 208 | ||||
-rw-r--r-- | server/src/logic/stream.rs | 246 | ||||
-rw-r--r-- | server/src/logic/userdata.rs | 96 |
5 files changed, 668 insertions, 0 deletions
diff --git a/server/src/logic/mod.rs b/server/src/logic/mod.rs new file mode 100644 index 0000000..745d11b --- /dev/null +++ b/server/src/logic/mod.rs @@ -0,0 +1,9 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +pub mod playersync; +pub mod session; +pub mod stream; +pub mod userdata; diff --git a/server/src/logic/playersync.rs b/server/src/logic/playersync.rs new file mode 100644 index 0000000..b4cc51b --- /dev/null +++ b/server/src/logic/playersync.rs @@ -0,0 +1,109 @@ +use anyhow::bail; +use chashmap::CHashMap; +use futures::{SinkExt, StreamExt}; +use log::warn; +use rocket::{get, State}; +use rocket_ws::{stream::DuplexStream, Channel, Message, WebSocket}; +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast::{self, Sender}; + +use crate::helper::cors::Cors; + +#[derive(Default)] +pub struct PlayersyncChannels { + channels: CHashMap<String, broadcast::Sender<Message>>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Packet { + Time(f64), + Playing(bool), + Join(String), + Leave(String), +} + +#[get("/playersync/<channel>")] +pub fn r_playersync( + ws: WebSocket, + state: &State<PlayersyncChannels>, + channel: &str, +) -> Cors<Channel<'static>> { + let sender = state + .channels + .get(&channel.to_owned()) + .map(|x| x.to_owned()) + .unwrap_or_else(|| { + let ch = broadcast::channel(16).0; + state.channels.insert(channel.to_owned(), ch.clone()); + ch + }); + Cors(ws.channel(move |ws| { + Box::pin(async move { + let mut state = ClientState { + username: "unknown user".into(), + }; + if let Err(e) = handle_socket(&sender, ws, &mut state).await { + warn!("streamsync websocket error: {e:?}") + } + let _ = sender.send(Message::Text( + serde_json::to_string(&Packet::Leave(state.username)).unwrap(), + )); + Ok(()) + }) + })) +} + +struct ClientState { + username: String, +} + +async fn handle_socket( + broadcast: &Sender<Message>, + mut ws: DuplexStream, + state: &mut ClientState, +) -> anyhow::Result<()> { + let mut sub = broadcast.subscribe(); + loop { + tokio::select! { + message = ws.next() => { + match handle_packet(broadcast, message,state) { + Err(e) => Err(e)?, + Ok(true) => return Ok(()), + Ok(false) => () + } + }, + message = sub.recv() => { + ws.send(message?).await?; + } + }; + } +} + +fn handle_packet( + broadcast: &Sender<Message>, + message: Option<rocket_ws::result::Result<Message>>, + state: &mut ClientState, +) -> anyhow::Result<bool> { + let Some(message) = message else { + return Ok(true); + }; + let message = message?.into_text()?; + let packet: Packet = serde_json::from_str(&message)?; + + let broadcast = |p: Packet| -> anyhow::Result<()> { + broadcast.send(Message::Text(serde_json::to_string(&p)?))?; + Ok(()) + }; + + match packet { + Packet::Join(username) => { + broadcast(Packet::Join(username.clone()))?; + state.username = username; + } + Packet::Leave(_) => bail!("illegal packet"), + p => broadcast(p)?, + }; + + Ok(false) +} diff --git a/server/src/logic/session.rs b/server/src/logic/session.rs new file mode 100644 index 0000000..790e070 --- /dev/null +++ b/server/src/logic/session.rs @@ -0,0 +1,208 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use crate::ui::error::MyError; +use aes_gcm_siv::{ + aead::{generic_array::GenericArray, Aead}, + KeyInit, +}; +use anyhow::anyhow; +use base64::Engine; +use chrono::{DateTime, Duration, Utc}; +use jellybase::{database::Database, SECRETS}; +use jellycommon::user::{PermissionSet, User}; +use log::warn; +use rocket::{ + async_trait, + http::Status, + outcome::Outcome, + request::{self, FromRequest}, + Request, State, +}; +use serde::{Deserialize, Serialize}; +use std::sync::LazyLock; + +pub struct Session { + pub user: User, +} + +pub struct AdminSession(pub Session); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionData { + username: String, + expire: DateTime<Utc>, + permissions: PermissionSet, +} + +impl Session { + pub async fn from_request_ut(req: &Request<'_>) -> Result<Self, MyError> { + let username; + + #[cfg(not(feature = "bypass-auth"))] + { + let token = req + .query_value("session") + .map(|e| e.unwrap()) + .or_else(|| req.query_value("api_key").map(|e| e.unwrap())) + .or_else(|| req.headers().get_one("X-MediaBrowser-Token")) + .or_else(|| { + req.headers() + .get_one("Authorization") + .and_then(parse_jellyfin_auth) + }) // for jellyfin compat + .or(req.cookies().get("session").map(|cookie| cookie.value())) + .ok_or(anyhow!("not logged in"))?; + + // jellyfin urlescapes the token for *some* requests + let token = token.replace("%3D", "="); + username = validate(&token)?; + }; + + #[cfg(feature = "bypass-auth")] + { + parse_jellyfin_auth("a"); // unused warning is annoying + username = "admin".to_string(); + } + + let db = req.guard::<&State<Database>>().await.unwrap(); + + let user = db.get_user(&username)?.ok_or(anyhow!("user not found"))?; + + Ok(Session { user }) + } +} + +fn parse_jellyfin_auth(h: &str) -> Option<&str> { + for tok in h.split(" ") { + if let Some(tok) = tok.strip_prefix("Token=\"") { + if let Some(tok) = tok.strip_suffix("\"") { + return Some(tok); + } + } + } + None +} + +#[async_trait] +impl<'r> FromRequest<'r> for Session { + type Error = MyError; + async fn from_request<'life0>( + request: &'r Request<'life0>, + ) -> request::Outcome<Self, Self::Error> { + match Session::from_request_ut(request).await { + Ok(x) => Outcome::Success(x), + Err(e) => { + warn!("authentificated route rejected: {e:?}"); + Outcome::Forward(Status::Unauthorized) + } + } + } +} + +#[async_trait] +impl<'r> FromRequest<'r> for AdminSession { + type Error = MyError; + async fn from_request<'life0>( + request: &'r Request<'life0>, + ) -> request::Outcome<Self, Self::Error> { + match Session::from_request_ut(request).await { + Ok(x) => { + if x.user.admin { + Outcome::Success(AdminSession(x)) + } else { + Outcome::Error(( + Status::Unauthorized, + MyError(anyhow!("you are not an admin")), + )) + } + } + Err(e) => { + warn!("authentificated route rejected: {e:?}"); + Outcome::Forward(Status::Unauthorized) + } + } + } +} + +static SESSION_KEY: LazyLock<[u8; 32]> = LazyLock::new(|| { + if let Some(sk) = &SECRETS.session_key { + let r = base64::engine::general_purpose::STANDARD + .decode(sk) + .expect("key invalid; should be valid base64"); + r.try_into() + .expect("key has the wrong length; should be 32 bytes") + } else { + warn!("session_key not configured; generating a random one."); + [(); 32].map(|_| rand::random()) + } +}); + +pub fn create(username: String, permissions: PermissionSet, expire: Duration) -> String { + let session_data = SessionData { + expire: Utc::now() + expire, + username: username.to_owned(), + permissions, + }; + let mut plaintext = + bincode::serde::encode_to_vec(&session_data, bincode::config::standard()).unwrap(); + + while plaintext.len() % 16 == 0 { + plaintext.push(0); + } + + let cipher = aes_gcm_siv::Aes256GcmSiv::new_from_slice(&*SESSION_KEY).unwrap(); + let nonce = [(); 12].map(|_| rand::random()); + let mut ciphertext = cipher + .encrypt(&GenericArray::from(nonce), plaintext.as_slice()) + .unwrap(); + ciphertext.extend(nonce); + + base64::engine::general_purpose::URL_SAFE.encode(&ciphertext) +} + +pub fn validate(token: &str) -> anyhow::Result<String> { + let ciphertext = base64::engine::general_purpose::URL_SAFE.decode(token)?; + let cipher = aes_gcm_siv::Aes256GcmSiv::new_from_slice(&*SESSION_KEY).unwrap(); + let (ciphertext, nonce) = ciphertext.split_at(ciphertext.len() - 12); + let plaintext = cipher + .decrypt(nonce.into(), ciphertext) + .map_err(|e| anyhow!("decryption failed: {e:?}"))?; + + let (session_data, _): (SessionData, _) = + bincode::serde::decode_from_slice(&plaintext, bincode::config::standard())?; + + if session_data.expire < Utc::now() { + Err(anyhow!("session expired"))? + } + + Ok(session_data.username) +} + +#[test] +fn test() { + jellybase::use_test_config(); + let tok = create( + "blub".to_string(), + jellycommon::user::PermissionSet::default(), + Duration::days(1), + ); + validate(&tok).unwrap(); +} + +#[test] +fn test_crypto() { + jellybase::use_test_config(); + let nonce = [(); 12].map(|_| rand::random()); + let cipher = aes_gcm_siv::Aes256GcmSiv::new_from_slice(&*SESSION_KEY).unwrap(); + let plaintext = b"testing stuff---"; + let ciphertext = cipher + .encrypt(&GenericArray::from(nonce), plaintext.as_slice()) + .unwrap(); + let plaintext2 = cipher + .decrypt((&nonce).into(), ciphertext.as_slice()) + .unwrap(); + assert_eq!(plaintext, plaintext2.as_slice()); +} diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs new file mode 100644 index 0000000..5bba9c2 --- /dev/null +++ b/server/src/logic/stream.rs @@ -0,0 +1,246 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use super::session::Session; +use crate::{database::Database, ui::error::MyError}; +use anyhow::{anyhow, Result}; +use jellybase::{assetfed::AssetInner, federation::Federation}; +use jellycommon::{stream::StreamSpec, TrackSource}; +use jellystream::SMediaInfo; +use log::{info, warn}; +use rocket::{ + get, head, + http::{Header, Status}, + request::{self, FromRequest}, + response::{self, Redirect, Responder}, + Either, Request, Response, State, +}; +use std::{ + collections::{BTreeMap, BTreeSet}, + ops::Range, + sync::Arc, +}; +use tokio::io::{duplex, DuplexStream}; + +#[head("/n/<_id>/stream?<spec..>")] +pub async fn r_stream_head( + _sess: Session, + _id: &str, + spec: BTreeMap<String, String>, +) -> Result<Either<StreamResponse, Redirect>, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; + let head = jellystream::stream_head(&spec); + Ok(Either::Left(StreamResponse { + stream: duplex(0).0, + advertise_range: head.range_supported, + content_type: head.content_type, + range: None, + })) +} + +#[get("/n/<id>/stream?<spec..>")] +pub async fn r_stream( + _session: Session, + _federation: &State<Federation>, + db: &State<Database>, + id: &str, + range: Option<RequestRange>, + spec: BTreeMap<String, String>, +) -> Result<Either<StreamResponse, RedirectResponse>, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; + // TODO perm + let node = db + .get_node_slug(id)? + .ok_or(anyhow!("node does not exist"))?; + + let media = Arc::new( + node.media + .clone() + .ok_or(anyhow!("item does not contain media"))?, + ); + + // TODO its unclear how requests with multiple tracks should be handled. + // if spec.track.len() == 1 { + // let ti = spec.track[0]; + // if let TrackSource::Remote(remote_index) = media.tracks[ti].source { + // session + // .user + // .permissions + // .assert(&UserPermission::FederatedContent)?; + + // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; + // let host = track + // .federated + // .last() + // .ok_or(anyhow!("federation inconsistent"))?; + + // let FederationAccount { + // password, username, .. + // } = SECRETS + // .federation + // .get(host) + // .ok_or(anyhow!("no credentials on the server-side"))?; + + // info!("creating session on {host}"); + // let instance = federation.get_instance(host)?.to_owned(); + // let session = instance + // .login(CreateSessionParams { + // username: username.to_owned(), + // password: password.to_owned(), + // expire: Some(60), + // drop_permissions: Some(HashSet::from_iter([ + // UserPermission::ManageSelf, + // UserPermission::Admin, // in case somebody federated the admin :))) + // ])), + // }) + // .await?; + + // let uri = session.stream_url( + // node.slug.clone().into(), + // &StreamSpec { + // track: vec![remote_index], + // ..spec + // }, + // ); + // info!("federation redirect"); + // return Ok(Either::Right(RedirectResponse(uri))); + // } + // } + + info!( + "stream request (range={})", + range + .as_ref() + .map(|r| r.to_cr_hv()) + .unwrap_or("none".to_string()) + ); + + let urange = match &range { + Some(r) => { + let r = r.0.first().unwrap_or(&(None..None)); + r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) + } + None => 0..(isize::MAX as usize), + }; + + let head = jellystream::stream_head(&spec); + + let mut sources = BTreeSet::new(); + for t in &media.tracks { + if let TrackSource::Local(x) = &t.source { + if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { + sources.insert(m.path); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + info: node, + }); + + match jellystream::stream(media, spec, urange).await { + Ok(stream) => Ok(Either::Left(StreamResponse { + stream, + range, + advertise_range: head.range_supported, + content_type: head.content_type, + })), + Err(e) => { + warn!("stream error: {e}"); + Err(MyError(e)) + } + } +} + +pub struct RedirectResponse(String); + +#[rocket::async_trait] +impl<'r> Responder<'r, 'static> for RedirectResponse { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let mut b = Response::build(); + b.status(Status::Found); + b.header(Header::new("access-control-allow-origin", "*")); + b.header(Header::new("location", self.0)); + Ok(b.finalize()) + } +} + +pub struct StreamResponse { + stream: DuplexStream, + advertise_range: bool, + content_type: &'static str, + range: Option<RequestRange>, +} + +#[rocket::async_trait] +impl<'r> Responder<'r, 'static> for StreamResponse { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let mut b = Response::build(); + b.status(Status::Ok); + b.header(Header::new("access-control-allow-origin", "*")); + if self.advertise_range { + //* it is very important here to not reply with content range if we didnt advertise. + //* mpv requests range but will crash if we dont pretend to not support it. + if let Some(range) = self.range { + b.status(Status::PartialContent); + b.header(Header::new("content-range", range.to_cr_hv())); + } + b.header(Header::new("accept-ranges", "bytes")); + } + b.header(Header::new("content-type", self.content_type)) + .streamed_body(self.stream) + .ok() + } +} + +#[derive(Debug)] +pub struct RequestRange(Vec<Range<Option<usize>>>); + +impl RequestRange { + pub fn to_cr_hv(&self) -> String { + assert_eq!(self.0.len(), 1); + format!( + "bytes {}-{}/*", + self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), + self.0[0].end.map(|e| e.to_string()).unwrap_or_default() + ) + } + pub fn from_hv(s: &str) -> Result<Self> { + Ok(Self( + s.strip_prefix("bytes=") + .ok_or(anyhow!("prefix expected"))? + .split(',') + .map(|s| { + let (l, r) = s + .split_once('-') + .ok_or(anyhow!("range delimeter missing"))?; + let km = |s: &str| { + if s.is_empty() { + Ok::<_, anyhow::Error>(None) + } else { + Ok(Some(s.parse()?)) + } + }; + Ok(km(l)?..km(r)?) + }) + .collect::<Result<Vec<_>>>()?, + )) + } +} + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for RequestRange { + type Error = anyhow::Error; + + async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> { + match req.headers().get("range").next() { + Some(v) => match Self::from_hv(v) { + Ok(v) => rocket::outcome::Outcome::Success(v), + Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), + }, + None => rocket::outcome::Outcome::Forward(Status::Ok), + } + } +} diff --git a/server/src/logic/userdata.rs b/server/src/logic/userdata.rs new file mode 100644 index 0000000..64a136f --- /dev/null +++ b/server/src/logic/userdata.rs @@ -0,0 +1,96 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use crate::{ui::error::MyResult, ui::node::rocket_uri_macro_r_library_node}; +use jellybase::database::Database; +use jellycommon::{ + user::{NodeUserData, WatchedState}, + NodeID, +}; +use rocket::{ + form::Form, get, post, response::Redirect, serde::json::Json, FromForm, FromFormField, State, + UriDisplayQuery, +}; + +use super::session::Session; + +#[derive(Debug, FromFormField, UriDisplayQuery)] +pub enum UrlWatchedState { + None, + Watched, + Pending, +} + +#[get("/n/<id>/userdata")] +pub fn r_node_userdata( + session: Session, + db: &State<Database>, + id: NodeID, +) -> MyResult<Json<NodeUserData>> { + let u = db + .get_node_udata(id, &session.user.name)? + .unwrap_or_default(); + Ok(Json(u)) +} + +#[post("/n/<id>/watched?<state>")] +pub async fn r_node_userdata_watched( + session: Session, + db: &State<Database>, + id: NodeID, + state: UrlWatchedState, +) -> MyResult<Redirect> { + // TODO perm + db.update_node_udata(id, &session.user.name, |udata| { + udata.watched = match state { + UrlWatchedState::None => WatchedState::None, + UrlWatchedState::Watched => WatchedState::Watched, + UrlWatchedState::Pending => WatchedState::Pending, + }; + Ok(()) + })?; + Ok(Redirect::found(rocket::uri!(r_library_node(id)))) +} + +#[derive(FromForm)] +pub struct UpdateRating { + #[field(validate = range(-10..=10))] + rating: i32, +} + +#[post("/n/<id>/update_rating", data = "<form>")] +pub async fn r_node_userdata_rating( + session: Session, + db: &State<Database>, + id: NodeID, + form: Form<UpdateRating>, +) -> MyResult<Redirect> { + // TODO perm + db.update_node_udata(id, &session.user.name, |udata| { + udata.rating = form.rating; + Ok(()) + })?; + Ok(Redirect::found(rocket::uri!(r_library_node(id)))) +} + +#[post("/n/<id>/progress?<t>")] +pub async fn r_node_userdata_progress( + session: Session, + db: &State<Database>, + id: NodeID, + t: f64, +) -> MyResult<()> { + // TODO perm + db.update_node_udata(id, &session.user.name, |udata| { + udata.watched = match udata.watched { + WatchedState::None | WatchedState::Pending | WatchedState::Progress(_) => { + WatchedState::Progress(t) + } + WatchedState::Watched => WatchedState::Watched, + }; + Ok(()) + })?; + Ok(()) +} |