aboutsummaryrefslogtreecommitdiff
path: root/server/src/routes/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/routes/stream.rs')
-rw-r--r--server/src/routes/stream.rs216
1 files changed, 216 insertions, 0 deletions
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
new file mode 100644
index 0000000..a72e0d9
--- /dev/null
+++ b/server/src/routes/stream.rs
@@ -0,0 +1,216 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2026 metamuffin <metamuffin.org>
+*/
+use crate::{request_info::RequestInfo, routes::error::MyError};
+use anyhow::{Result, anyhow};
+use jellycommon::{
+ NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path,
+ stream::StreamSpec,
+};
+use jellydb::{Filter, Query};
+use jellystream::SMediaInfo;
+use log::{info, warn};
+use rocket::{
+ Either, Request, Response, get, head,
+ http::{Header, Status},
+ request::{self, FromRequest},
+ response::{self, Redirect, Responder},
+};
+use std::{
+ collections::{BTreeMap, BTreeSet},
+ ops::Range,
+ sync::Arc,
+};
+use tokio::{
+ io::{DuplexStream, duplex},
+ task::spawn_blocking,
+};
+use tokio_util::io::SyncIoBridge;
+
+#[head("/n/<_id>/stream?<spec..>")]
+pub async fn r_stream_head(
+ _sess: RequestInfo<'_>,
+ _id: &str,
+ spec: BTreeMap<String, String>,
+) -> Result<Either<StreamResponse, Redirect>, MyError> {
+ let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
+ let head = jellystream::stream_head(&spec);
+ Ok(Either::Left(StreamResponse {
+ stream: duplex(0).0,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ range: None,
+ }))
+}
+
+#[get("/n/<slug>/stream?<spec..>")]
+pub async fn r_stream(
+ ri: RequestInfo<'_>,
+ slug: &str,
+ range: Option<RequestRange>,
+ spec: BTreeMap<String, String>,
+) -> Result<StreamResponse, MyError> {
+ let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
+
+ let mut node = None;
+ ri.state.database.transaction(&mut |txn| {
+ if let Some(row) = txn.query_single(Query {
+ filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()),
+ ..Default::default()
+ })? {
+ node = txn.get(row)?;
+ }
+ Ok(())
+ })?;
+
+ let Some(node) = node else {
+ Err(anyhow!("node not found"))?
+ };
+
+ info!(
+ "stream request (range={})",
+ range
+ .as_ref()
+ .map(|r| r.to_cr_hv())
+ .unwrap_or("none".to_string())
+ );
+
+ let urange = match &range {
+ Some(r) => {
+ let r = r.0.first().unwrap_or(&(None..None));
+ r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX)
+ }
+ None => 0..u64::MAX,
+ };
+
+ let head = jellystream::stream_head(&spec);
+
+ let mut sources = BTreeSet::new();
+ for track in node.iter(NO_TRACK) {
+ if let Some(s) = track.get(TR_SOURCE) {
+ if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) {
+ sources.insert(path.into());
+ }
+ }
+ }
+ let media = Arc::new(SMediaInfo {
+ files: sources,
+ title: node.get(NO_TITLE).map(String::from),
+ cache: ri.state.cache.clone(),
+ config: ri.state.config.stream.clone(),
+ });
+
+ // TODO too many threads
+ let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange))
+ .await
+ .unwrap()
+ {
+ Ok(o) => o,
+ Err(e) => {
+ warn!("stream error: {e:?}");
+ Err(e)?
+ }
+ };
+ let (stream_write, stream_read) = duplex(4096);
+ spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write)));
+
+ Ok(StreamResponse {
+ stream: stream_read,
+ range,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ })
+}
+
+pub struct RedirectResponse(String);
+
+#[rocket::async_trait]
+impl<'r> Responder<'r, 'static> for RedirectResponse {
+ fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
+ let mut b = Response::build();
+ b.status(Status::Found);
+ b.header(Header::new("access-control-allow-origin", "*"));
+ b.header(Header::new("location", self.0));
+ Ok(b.finalize())
+ }
+}
+
+pub struct StreamResponse {
+ stream: DuplexStream,
+ advertise_range: bool,
+ content_type: &'static str,
+ range: Option<RequestRange>,
+}
+
+#[rocket::async_trait]
+impl<'r> Responder<'r, 'static> for StreamResponse {
+ fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
+ let mut b = Response::build();
+ b.status(Status::Ok);
+ b.header(Header::new("access-control-allow-origin", "*"));
+ if self.advertise_range {
+ //* it is very important here to not reply with content range if we didnt advertise.
+ //* mpv requests range but will crash if we dont pretend to not support it.
+ if let Some(range) = self.range {
+ b.status(Status::PartialContent);
+ b.header(Header::new("content-range", range.to_cr_hv()));
+ }
+ b.header(Header::new("accept-ranges", "bytes"));
+ }
+ b.header(Header::new("content-type", self.content_type))
+ .streamed_body(self.stream)
+ .ok()
+ }
+}
+
+#[derive(Debug)]
+pub struct RequestRange(Vec<Range<Option<u64>>>);
+
+impl RequestRange {
+ pub fn to_cr_hv(&self) -> String {
+ assert_eq!(self.0.len(), 1);
+ format!(
+ "bytes {}-{}/*",
+ self.0[0].start.map(|e| e.to_string()).unwrap_or_default(),
+ self.0[0].end.map(|e| e.to_string()).unwrap_or_default()
+ )
+ }
+ pub fn from_hv(s: &str) -> Result<Self> {
+ Ok(Self(
+ s.strip_prefix("bytes=")
+ .ok_or(anyhow!("prefix expected"))?
+ .split(',')
+ .map(|s| {
+ let (l, r) = s
+ .split_once('-')
+ .ok_or(anyhow!("range delimeter missing"))?;
+ let km = |s: &str| {
+ if s.is_empty() {
+ Ok::<_, anyhow::Error>(None)
+ } else {
+ Ok(Some(s.parse()?))
+ }
+ };
+ Ok(km(l)?..km(r)?)
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ))
+ }
+}
+
+#[rocket::async_trait]
+impl<'r> FromRequest<'r> for RequestRange {
+ type Error = anyhow::Error;
+
+ async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
+ match req.headers().get("range").next() {
+ Some(v) => match Self::from_hv(v) {
+ Ok(v) => rocket::outcome::Outcome::Success(v),
+ Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)),
+ },
+ None => rocket::outcome::Outcome::Forward(Status::Ok),
+ }
+ }
+}