diff options
Diffstat (limited to 'server/src/routes/stream.rs')
| -rw-r--r-- | server/src/routes/stream.rs | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs new file mode 100644 index 0000000..a72e0d9 --- /dev/null +++ b/server/src/routes/stream.rs @@ -0,0 +1,216 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2026 metamuffin <metamuffin.org> +*/ +use crate::{request_info::RequestInfo, routes::error::MyError}; +use anyhow::{Result, anyhow}; +use jellycommon::{ + NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path, + stream::StreamSpec, +}; +use jellydb::{Filter, Query}; +use jellystream::SMediaInfo; +use log::{info, warn}; +use rocket::{ + Either, Request, Response, get, head, + http::{Header, Status}, + request::{self, FromRequest}, + response::{self, Redirect, Responder}, +}; +use std::{ + collections::{BTreeMap, BTreeSet}, + ops::Range, + sync::Arc, +}; +use tokio::{ + io::{DuplexStream, duplex}, + task::spawn_blocking, +}; +use tokio_util::io::SyncIoBridge; + +#[head("/n/<_id>/stream?<spec..>")] +pub async fn r_stream_head( + _sess: RequestInfo<'_>, + _id: &str, + spec: BTreeMap<String, String>, +) -> Result<Either<StreamResponse, Redirect>, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; + let head = jellystream::stream_head(&spec); + Ok(Either::Left(StreamResponse { + stream: duplex(0).0, + advertise_range: head.range_supported, + content_type: head.content_type, + range: None, + })) +} + +#[get("/n/<slug>/stream?<spec..>")] +pub async fn r_stream( + ri: RequestInfo<'_>, + slug: &str, + range: Option<RequestRange>, + spec: BTreeMap<String, String>, +) -> Result<StreamResponse, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; + + let mut node = None; + ri.state.database.transaction(&mut |txn| { + if let Some(row) = txn.query_single(Query { + filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()), + ..Default::default() + })? { + node = txn.get(row)?; + } + Ok(()) + })?; + + let Some(node) = node else { + Err(anyhow!("node not found"))? + }; + + info!( + "stream request (range={})", + range + .as_ref() + .map(|r| r.to_cr_hv()) + .unwrap_or("none".to_string()) + ); + + let urange = match &range { + Some(r) => { + let r = r.0.first().unwrap_or(&(None..None)); + r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX) + } + None => 0..u64::MAX, + }; + + let head = jellystream::stream_head(&spec); + + let mut sources = BTreeSet::new(); + for track in node.iter(NO_TRACK) { + if let Some(s) = track.get(TR_SOURCE) { + if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) { + sources.insert(path.into()); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + title: node.get(NO_TITLE).map(String::from), + cache: ri.state.cache.clone(), + config: ri.state.config.stream.clone(), + }); + + // TODO too many threads + let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange)) + .await + .unwrap() + { + Ok(o) => o, + Err(e) => { + warn!("stream error: {e:?}"); + Err(e)? + } + }; + let (stream_write, stream_read) = duplex(4096); + spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); + + Ok(StreamResponse { + stream: stream_read, + range, + advertise_range: head.range_supported, + content_type: head.content_type, + }) +} + +pub struct RedirectResponse(String); + +#[rocket::async_trait] +impl<'r> Responder<'r, 'static> for RedirectResponse { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let mut b = Response::build(); + b.status(Status::Found); + b.header(Header::new("access-control-allow-origin", "*")); + b.header(Header::new("location", self.0)); + Ok(b.finalize()) + } +} + +pub struct StreamResponse { + stream: DuplexStream, + advertise_range: bool, + content_type: &'static str, + range: Option<RequestRange>, +} + +#[rocket::async_trait] +impl<'r> Responder<'r, 'static> for StreamResponse { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let mut b = Response::build(); + b.status(Status::Ok); + b.header(Header::new("access-control-allow-origin", "*")); + if self.advertise_range { + //* it is very important here to not reply with content range if we didnt advertise. + //* mpv requests range but will crash if we dont pretend to not support it. + if let Some(range) = self.range { + b.status(Status::PartialContent); + b.header(Header::new("content-range", range.to_cr_hv())); + } + b.header(Header::new("accept-ranges", "bytes")); + } + b.header(Header::new("content-type", self.content_type)) + .streamed_body(self.stream) + .ok() + } +} + +#[derive(Debug)] +pub struct RequestRange(Vec<Range<Option<u64>>>); + +impl RequestRange { + pub fn to_cr_hv(&self) -> String { + assert_eq!(self.0.len(), 1); + format!( + "bytes {}-{}/*", + self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), + self.0[0].end.map(|e| e.to_string()).unwrap_or_default() + ) + } + pub fn from_hv(s: &str) -> Result<Self> { + Ok(Self( + s.strip_prefix("bytes=") + .ok_or(anyhow!("prefix expected"))? + .split(',') + .map(|s| { + let (l, r) = s + .split_once('-') + .ok_or(anyhow!("range delimeter missing"))?; + let km = |s: &str| { + if s.is_empty() { + Ok::<_, anyhow::Error>(None) + } else { + Ok(Some(s.parse()?)) + } + }; + Ok(km(l)?..km(r)?) + }) + .collect::<Result<Vec<_>>>()?, + )) + } +} + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for RequestRange { + type Error = anyhow::Error; + + async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> { + match req.headers().get("range").next() { + Some(v) => match Self::from_hv(v) { + Ok(v) => rocket::outcome::Outcome::Success(v), + Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), + }, + None => rocket::outcome::Outcome::Forward(Status::Ok), + } + } +} |