diff options
Diffstat (limited to 'server/src/logic')
| -rw-r--r-- | server/src/logic/mod.rs | 8 | ||||
| -rw-r--r-- | server/src/logic/playersync.rs | 109 | ||||
| -rw-r--r-- | server/src/logic/stream.rs | 216 | ||||
| -rw-r--r-- | server/src/logic/userdata.rs | 59 |
4 files changed, 0 insertions, 392 deletions
diff --git a/server/src/logic/mod.rs b/server/src/logic/mod.rs deleted file mode 100644 index 24d58f8..0000000 --- a/server/src/logic/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ -pub mod playersync; -pub mod stream; -pub mod userdata; diff --git a/server/src/logic/playersync.rs b/server/src/logic/playersync.rs deleted file mode 100644 index 71e2809..0000000 --- a/server/src/logic/playersync.rs +++ /dev/null @@ -1,109 +0,0 @@ -use anyhow::bail; -use chashmap::CHashMap; -use futures::{SinkExt, StreamExt}; -use log::warn; -use rocket::{State, get}; -use rocket_ws::{Channel, Message, WebSocket, stream::DuplexStream}; -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast::{self, Sender}; - -use crate::responders::cors::Cors; - -#[derive(Default)] -pub struct PlayersyncChannels { - channels: CHashMap<String, broadcast::Sender<Message>>, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Packet { - Time(f64), - Playing(bool), - Join(String), - Leave(String), -} - -#[get("/playersync/<channel>")] -pub fn r_playersync( - ws: WebSocket, - state: &State<PlayersyncChannels>, - channel: &str, -) -> Cors<Channel<'static>> { - let sender = state - .channels - .get(&channel.to_owned()) - .map(|x| x.to_owned()) - .unwrap_or_else(|| { - let ch = broadcast::channel(16).0; - state.channels.insert(channel.to_owned(), ch.clone()); - ch - }); - Cors(ws.channel(move |ws| { - Box::pin(async move { - let mut state = ClientState { - username: "unknown user".into(), - }; - if let Err(e) = handle_socket(&sender, ws, &mut state).await { - warn!("streamsync websocket error: {e:?}") - } - let _ = sender.send(Message::Text( - serde_json::to_string(&Packet::Leave(state.username)).unwrap(), - )); - Ok(()) - }) - })) -} - -struct ClientState { - username: String, -} - -async fn handle_socket( - broadcast: &Sender<Message>, - mut ws: DuplexStream, - state: &mut ClientState, -) -> anyhow::Result<()> { - let mut sub = broadcast.subscribe(); - loop { - tokio::select! { - message = ws.next() => { - match handle_packet(broadcast, message,state) { - Err(e) => Err(e)?, - Ok(true) => return Ok(()), - Ok(false) => () - } - }, - message = sub.recv() => { - ws.send(message?).await?; - } - }; - } -} - -fn handle_packet( - broadcast: &Sender<Message>, - message: Option<rocket_ws::result::Result<Message>>, - state: &mut ClientState, -) -> anyhow::Result<bool> { - let Some(message) = message else { - return Ok(true); - }; - let message = message?.into_text()?; - let packet: Packet = serde_json::from_str(&message)?; - - let broadcast = |p: Packet| -> anyhow::Result<()> { - broadcast.send(Message::Text(serde_json::to_string(&p)?))?; - Ok(()) - }; - - match packet { - Packet::Join(username) => { - broadcast(Packet::Join(username.clone()))?; - state.username = username; - } - Packet::Leave(_) => bail!("illegal packet"), - p => broadcast(p)?, - }; - - Ok(false) -} diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs deleted file mode 100644 index 38d2b7b..0000000 --- a/server/src/logic/stream.rs +++ /dev/null @@ -1,216 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ -use crate::{request_info::RequestInfo, ui::error::MyError}; -use anyhow::{Result, anyhow}; -use jellycommon::{ - NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path, - stream::StreamSpec, -}; -use jellydb::{Filter, Query}; -use jellystream::SMediaInfo; -use log::{info, warn}; -use rocket::{ - Either, Request, Response, get, head, - http::{Header, Status}, - request::{self, FromRequest}, - response::{self, Redirect, Responder}, -}; -use std::{ - collections::{BTreeMap, BTreeSet}, - ops::Range, - sync::Arc, -}; -use tokio::{ - io::{DuplexStream, duplex}, - task::spawn_blocking, -}; -use tokio_util::io::SyncIoBridge; - -#[head("/n/<_id>/stream?<spec..>")] -pub async fn r_stream_head( - _sess: RequestInfo<'_>, - _id: &str, - spec: BTreeMap<String, String>, -) -> Result<Either<StreamResponse, Redirect>, MyError> { - let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; - let head = jellystream::stream_head(&spec); - Ok(Either::Left(StreamResponse { - stream: duplex(0).0, - advertise_range: head.range_supported, - content_type: head.content_type, - range: None, - })) -} - -#[get("/n/<slug>/stream?<spec..>")] -pub async fn r_stream( - ri: RequestInfo<'_>, - slug: &str, - range: Option<RequestRange>, - spec: BTreeMap<String, String>, -) -> Result<StreamResponse, MyError> { - let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; - - let mut node = None; - ri.state.database.transaction(&mut |txn| { - if let Some(row) = txn.query_single(Query { - filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()), - ..Default::default() - })? { - node = txn.get(row)?; - } - Ok(()) - })?; - - let Some(node) = node else { - Err(anyhow!("node not found"))? - }; - - info!( - "stream request (range={})", - range - .as_ref() - .map(|r| r.to_cr_hv()) - .unwrap_or("none".to_string()) - ); - - let urange = match &range { - Some(r) => { - let r = r.0.first().unwrap_or(&(None..None)); - r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX) - } - None => 0..u64::MAX, - }; - - let head = jellystream::stream_head(&spec); - - let mut sources = BTreeSet::new(); - for track in node.iter(NO_TRACK) { - if let Some(s) = track.get(TR_SOURCE) { - if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) { - sources.insert(path.into()); - } - } - } - let media = Arc::new(SMediaInfo { - files: sources, - title: node.get(NO_TITLE).map(String::from), - cache: ri.state.cache.clone(), - config: ri.state.config.stream.clone(), - }); - - // TODO too many threads - let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange)) - .await - .unwrap() - { - Ok(o) => o, - Err(e) => { - warn!("stream error: {e:?}"); - Err(e)? - } - }; - let (stream_write, stream_read) = duplex(4096); - spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); - - Ok(StreamResponse { - stream: stream_read, - range, - advertise_range: head.range_supported, - content_type: head.content_type, - }) -} - -pub struct RedirectResponse(String); - -#[rocket::async_trait] -impl<'r> Responder<'r, 'static> for RedirectResponse { - fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { - let mut b = Response::build(); - b.status(Status::Found); - b.header(Header::new("access-control-allow-origin", "*")); - b.header(Header::new("location", self.0)); - Ok(b.finalize()) - } -} - -pub struct StreamResponse { - stream: DuplexStream, - advertise_range: bool, - content_type: &'static str, - range: Option<RequestRange>, -} - -#[rocket::async_trait] -impl<'r> Responder<'r, 'static> for StreamResponse { - fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { - let mut b = Response::build(); - b.status(Status::Ok); - b.header(Header::new("access-control-allow-origin", "*")); - if self.advertise_range { - //* it is very important here to not reply with content range if we didnt advertise. - //* mpv requests range but will crash if we dont pretend to not support it. - if let Some(range) = self.range { - b.status(Status::PartialContent); - b.header(Header::new("content-range", range.to_cr_hv())); - } - b.header(Header::new("accept-ranges", "bytes")); - } - b.header(Header::new("content-type", self.content_type)) - .streamed_body(self.stream) - .ok() - } -} - -#[derive(Debug)] -pub struct RequestRange(Vec<Range<Option<u64>>>); - -impl RequestRange { - pub fn to_cr_hv(&self) -> String { - assert_eq!(self.0.len(), 1); - format!( - "bytes {}-{}/*", - self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), - self.0[0].end.map(|e| e.to_string()).unwrap_or_default() - ) - } - pub fn from_hv(s: &str) -> Result<Self> { - Ok(Self( - s.strip_prefix("bytes=") - .ok_or(anyhow!("prefix expected"))? - .split(',') - .map(|s| { - let (l, r) = s - .split_once('-') - .ok_or(anyhow!("range delimeter missing"))?; - let km = |s: &str| { - if s.is_empty() { - Ok::<_, anyhow::Error>(None) - } else { - Ok(Some(s.parse()?)) - } - }; - Ok(km(l)?..km(r)?) - }) - .collect::<Result<Vec<_>>>()?, - )) - } -} - -#[rocket::async_trait] -impl<'r> FromRequest<'r> for RequestRange { - type Error = anyhow::Error; - - async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> { - match req.headers().get("range").next() { - Some(v) => match Self::from_hv(v) { - Ok(v) => rocket::outcome::Outcome::Success(v), - Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), - }, - None => rocket::outcome::Outcome::Forward(Status::Ok), - } - } -} diff --git a/server/src/logic/userdata.rs b/server/src/logic/userdata.rs deleted file mode 100644 index 9fdc2bf..0000000 --- a/server/src/logic/userdata.rs +++ /dev/null @@ -1,59 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2026 metamuffin <metamuffin.org> -*/ -use rocket::{FromFormField, UriDisplayQuery}; - -#[derive(Debug, FromFormField, UriDisplayQuery)] -pub enum UrlWatchedState { - None, - Watched, - Pending, -} - -// #[get("/n/<id>/userdata")] -// pub fn r_node_userdata(session: A<Session>, id: A<NodeID>) -> MyResult<Json<NodeUserData>> { -// let u = get_node(&session.0, id.0, false, false, NodeFilterSort::default())?.userdata; -// Ok(Json(u)) -// } - -// #[post("/n/<id>/watched?<state>")] -// pub async fn r_node_userdata_watched( -// session: A<Session>, -// id: A<NodeID>, -// state: UrlWatchedState, -// ) -> MyResult<Redirect> { -// update_node_userdata_watched( -// &session.0, -// id.0, -// match state { -// UrlWatchedState::None => WatchedState::None, -// UrlWatchedState::Watched => WatchedState::Watched, -// UrlWatchedState::Pending => WatchedState::Pending, -// }, -// )?; -// Ok(Redirect::found(u_node_id(id.0))) -// } - -// #[derive(FromForm)] -// pub struct UpdateRating { -// #[field(validate = range(-10..=10))] -// rating: i32, -// } - -// #[post("/n/<id>/update_rating", data = "<form>")] -// pub async fn r_node_userdata_rating( -// session: A<Session>, -// id: A<NodeID>, -// form: Form<UpdateRating>, -// ) -> MyResult<Redirect> { -// update_node_userdata_rating(&session.0, id.0, form.rating)?; -// Ok(Redirect::found(u_node_id(id.0))) -// } - -// #[post("/n/<id>/progress?<t>")] -// pub async fn r_node_userdata_progress(session: A<Session>, id: A<NodeID>, t: f64) -> MyResult<()> { -// update_node_userdata_watched_progress(&session.0, id.0, t)?; -// Ok(()) -// } |