aboutsummaryrefslogtreecommitdiff
path: root/server/src/logic
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/logic')
-rw-r--r--server/src/logic/mod.rs8
-rw-r--r--server/src/logic/playersync.rs109
-rw-r--r--server/src/logic/stream.rs216
-rw-r--r--server/src/logic/userdata.rs59
4 files changed, 0 insertions, 392 deletions
diff --git a/server/src/logic/mod.rs b/server/src/logic/mod.rs
deleted file mode 100644
index 24d58f8..0000000
--- a/server/src/logic/mod.rs
+++ /dev/null
@@ -1,8 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-pub mod playersync;
-pub mod stream;
-pub mod userdata;
diff --git a/server/src/logic/playersync.rs b/server/src/logic/playersync.rs
deleted file mode 100644
index 71e2809..0000000
--- a/server/src/logic/playersync.rs
+++ /dev/null
@@ -1,109 +0,0 @@
-use anyhow::bail;
-use chashmap::CHashMap;
-use futures::{SinkExt, StreamExt};
-use log::warn;
-use rocket::{State, get};
-use rocket_ws::{Channel, Message, WebSocket, stream::DuplexStream};
-use serde::{Deserialize, Serialize};
-use tokio::sync::broadcast::{self, Sender};
-
-use crate::responders::cors::Cors;
-
-#[derive(Default)]
-pub struct PlayersyncChannels {
- channels: CHashMap<String, broadcast::Sender<Message>>,
-}
-
-#[derive(Serialize, Deserialize, Debug, Clone)]
-#[serde(rename_all = "snake_case")]
-pub enum Packet {
- Time(f64),
- Playing(bool),
- Join(String),
- Leave(String),
-}
-
-#[get("/playersync/<channel>")]
-pub fn r_playersync(
- ws: WebSocket,
- state: &State<PlayersyncChannels>,
- channel: &str,
-) -> Cors<Channel<'static>> {
- let sender = state
- .channels
- .get(&channel.to_owned())
- .map(|x| x.to_owned())
- .unwrap_or_else(|| {
- let ch = broadcast::channel(16).0;
- state.channels.insert(channel.to_owned(), ch.clone());
- ch
- });
- Cors(ws.channel(move |ws| {
- Box::pin(async move {
- let mut state = ClientState {
- username: "unknown user".into(),
- };
- if let Err(e) = handle_socket(&sender, ws, &mut state).await {
- warn!("streamsync websocket error: {e:?}")
- }
- let _ = sender.send(Message::Text(
- serde_json::to_string(&Packet::Leave(state.username)).unwrap(),
- ));
- Ok(())
- })
- }))
-}
-
-struct ClientState {
- username: String,
-}
-
-async fn handle_socket(
- broadcast: &Sender<Message>,
- mut ws: DuplexStream,
- state: &mut ClientState,
-) -> anyhow::Result<()> {
- let mut sub = broadcast.subscribe();
- loop {
- tokio::select! {
- message = ws.next() => {
- match handle_packet(broadcast, message,state) {
- Err(e) => Err(e)?,
- Ok(true) => return Ok(()),
- Ok(false) => ()
- }
- },
- message = sub.recv() => {
- ws.send(message?).await?;
- }
- };
- }
-}
-
-fn handle_packet(
- broadcast: &Sender<Message>,
- message: Option<rocket_ws::result::Result<Message>>,
- state: &mut ClientState,
-) -> anyhow::Result<bool> {
- let Some(message) = message else {
- return Ok(true);
- };
- let message = message?.into_text()?;
- let packet: Packet = serde_json::from_str(&message)?;
-
- let broadcast = |p: Packet| -> anyhow::Result<()> {
- broadcast.send(Message::Text(serde_json::to_string(&p)?))?;
- Ok(())
- };
-
- match packet {
- Packet::Join(username) => {
- broadcast(Packet::Join(username.clone()))?;
- state.username = username;
- }
- Packet::Leave(_) => bail!("illegal packet"),
- p => broadcast(p)?,
- };
-
- Ok(false)
-}
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs
deleted file mode 100644
index 38d2b7b..0000000
--- a/server/src/logic/stream.rs
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-use crate::{request_info::RequestInfo, ui::error::MyError};
-use anyhow::{Result, anyhow};
-use jellycommon::{
- NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path,
- stream::StreamSpec,
-};
-use jellydb::{Filter, Query};
-use jellystream::SMediaInfo;
-use log::{info, warn};
-use rocket::{
- Either, Request, Response, get, head,
- http::{Header, Status},
- request::{self, FromRequest},
- response::{self, Redirect, Responder},
-};
-use std::{
- collections::{BTreeMap, BTreeSet},
- ops::Range,
- sync::Arc,
-};
-use tokio::{
- io::{DuplexStream, duplex},
- task::spawn_blocking,
-};
-use tokio_util::io::SyncIoBridge;
-
-#[head("/n/<_id>/stream?<spec..>")]
-pub async fn r_stream_head(
- _sess: RequestInfo<'_>,
- _id: &str,
- spec: BTreeMap<String, String>,
-) -> Result<Either<StreamResponse, Redirect>, MyError> {
- let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
- let head = jellystream::stream_head(&spec);
- Ok(Either::Left(StreamResponse {
- stream: duplex(0).0,
- advertise_range: head.range_supported,
- content_type: head.content_type,
- range: None,
- }))
-}
-
-#[get("/n/<slug>/stream?<spec..>")]
-pub async fn r_stream(
- ri: RequestInfo<'_>,
- slug: &str,
- range: Option<RequestRange>,
- spec: BTreeMap<String, String>,
-) -> Result<StreamResponse, MyError> {
- let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
-
- let mut node = None;
- ri.state.database.transaction(&mut |txn| {
- if let Some(row) = txn.query_single(Query {
- filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()),
- ..Default::default()
- })? {
- node = txn.get(row)?;
- }
- Ok(())
- })?;
-
- let Some(node) = node else {
- Err(anyhow!("node not found"))?
- };
-
- info!(
- "stream request (range={})",
- range
- .as_ref()
- .map(|r| r.to_cr_hv())
- .unwrap_or("none".to_string())
- );
-
- let urange = match &range {
- Some(r) => {
- let r = r.0.first().unwrap_or(&(None..None));
- r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX)
- }
- None => 0..u64::MAX,
- };
-
- let head = jellystream::stream_head(&spec);
-
- let mut sources = BTreeSet::new();
- for track in node.iter(NO_TRACK) {
- if let Some(s) = track.get(TR_SOURCE) {
- if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) {
- sources.insert(path.into());
- }
- }
- }
- let media = Arc::new(SMediaInfo {
- files: sources,
- title: node.get(NO_TITLE).map(String::from),
- cache: ri.state.cache.clone(),
- config: ri.state.config.stream.clone(),
- });
-
- // TODO too many threads
- let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange))
- .await
- .unwrap()
- {
- Ok(o) => o,
- Err(e) => {
- warn!("stream error: {e:?}");
- Err(e)?
- }
- };
- let (stream_write, stream_read) = duplex(4096);
- spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write)));
-
- Ok(StreamResponse {
- stream: stream_read,
- range,
- advertise_range: head.range_supported,
- content_type: head.content_type,
- })
-}
-
-pub struct RedirectResponse(String);
-
-#[rocket::async_trait]
-impl<'r> Responder<'r, 'static> for RedirectResponse {
- fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
- let mut b = Response::build();
- b.status(Status::Found);
- b.header(Header::new("access-control-allow-origin", "*"));
- b.header(Header::new("location", self.0));
- Ok(b.finalize())
- }
-}
-
-pub struct StreamResponse {
- stream: DuplexStream,
- advertise_range: bool,
- content_type: &'static str,
- range: Option<RequestRange>,
-}
-
-#[rocket::async_trait]
-impl<'r> Responder<'r, 'static> for StreamResponse {
- fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
- let mut b = Response::build();
- b.status(Status::Ok);
- b.header(Header::new("access-control-allow-origin", "*"));
- if self.advertise_range {
- //* it is very important here to not reply with content range if we didnt advertise.
- //* mpv requests range but will crash if we dont pretend to not support it.
- if let Some(range) = self.range {
- b.status(Status::PartialContent);
- b.header(Header::new("content-range", range.to_cr_hv()));
- }
- b.header(Header::new("accept-ranges", "bytes"));
- }
- b.header(Header::new("content-type", self.content_type))
- .streamed_body(self.stream)
- .ok()
- }
-}
-
-#[derive(Debug)]
-pub struct RequestRange(Vec<Range<Option<u64>>>);
-
-impl RequestRange {
- pub fn to_cr_hv(&self) -> String {
- assert_eq!(self.0.len(), 1);
- format!(
- "bytes {}-{}/*",
- self.0[0].start.map(|e| e.to_string()).unwrap_or_default(),
- self.0[0].end.map(|e| e.to_string()).unwrap_or_default()
- )
- }
- pub fn from_hv(s: &str) -> Result<Self> {
- Ok(Self(
- s.strip_prefix("bytes=")
- .ok_or(anyhow!("prefix expected"))?
- .split(',')
- .map(|s| {
- let (l, r) = s
- .split_once('-')
- .ok_or(anyhow!("range delimeter missing"))?;
- let km = |s: &str| {
- if s.is_empty() {
- Ok::<_, anyhow::Error>(None)
- } else {
- Ok(Some(s.parse()?))
- }
- };
- Ok(km(l)?..km(r)?)
- })
- .collect::<Result<Vec<_>>>()?,
- ))
- }
-}
-
-#[rocket::async_trait]
-impl<'r> FromRequest<'r> for RequestRange {
- type Error = anyhow::Error;
-
- async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
- match req.headers().get("range").next() {
- Some(v) => match Self::from_hv(v) {
- Ok(v) => rocket::outcome::Outcome::Success(v),
- Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)),
- },
- None => rocket::outcome::Outcome::Forward(Status::Ok),
- }
- }
-}
diff --git a/server/src/logic/userdata.rs b/server/src/logic/userdata.rs
deleted file mode 100644
index 9fdc2bf..0000000
--- a/server/src/logic/userdata.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2026 metamuffin <metamuffin.org>
-*/
-use rocket::{FromFormField, UriDisplayQuery};
-
-#[derive(Debug, FromFormField, UriDisplayQuery)]
-pub enum UrlWatchedState {
- None,
- Watched,
- Pending,
-}
-
-// #[get("/n/<id>/userdata")]
-// pub fn r_node_userdata(session: A<Session>, id: A<NodeID>) -> MyResult<Json<NodeUserData>> {
-// let u = get_node(&session.0, id.0, false, false, NodeFilterSort::default())?.userdata;
-// Ok(Json(u))
-// }
-
-// #[post("/n/<id>/watched?<state>")]
-// pub async fn r_node_userdata_watched(
-// session: A<Session>,
-// id: A<NodeID>,
-// state: UrlWatchedState,
-// ) -> MyResult<Redirect> {
-// update_node_userdata_watched(
-// &session.0,
-// id.0,
-// match state {
-// UrlWatchedState::None => WatchedState::None,
-// UrlWatchedState::Watched => WatchedState::Watched,
-// UrlWatchedState::Pending => WatchedState::Pending,
-// },
-// )?;
-// Ok(Redirect::found(u_node_id(id.0)))
-// }
-
-// #[derive(FromForm)]
-// pub struct UpdateRating {
-// #[field(validate = range(-10..=10))]
-// rating: i32,
-// }
-
-// #[post("/n/<id>/update_rating", data = "<form>")]
-// pub async fn r_node_userdata_rating(
-// session: A<Session>,
-// id: A<NodeID>,
-// form: Form<UpdateRating>,
-// ) -> MyResult<Redirect> {
-// update_node_userdata_rating(&session.0, id.0, form.rating)?;
-// Ok(Redirect::found(u_node_id(id.0)))
-// }
-
-// #[post("/n/<id>/progress?<t>")]
-// pub async fn r_node_userdata_progress(session: A<Session>, id: A<NodeID>, t: f64) -> MyResult<()> {
-// update_node_userdata_watched_progress(&session.0, id.0, t)?;
-// Ok(())
-// }