/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ use crate::{request_info::RequestInfo, ui::error::MyError}; use anyhow::{Result, anyhow}; use jellycommon::{ NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path, stream::StreamSpec, }; use jellydb::{Filter, Query}; use jellystream::SMediaInfo; use log::{info, warn}; use rocket::{ Either, Request, Response, get, head, http::{Header, Status}, request::{self, FromRequest}, response::{self, Redirect, Responder}, }; use std::{ collections::{BTreeMap, BTreeSet}, ops::Range, sync::Arc, }; use tokio::{ io::{DuplexStream, duplex}, task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; #[head("/n/<_id>/stream?")] pub async fn r_stream_head( _sess: RequestInfo<'_>, _id: &str, spec: BTreeMap, ) -> Result, MyError> { let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; let head = jellystream::stream_head(&spec); Ok(Either::Left(StreamResponse { stream: duplex(0).0, advertise_range: head.range_supported, content_type: head.content_type, range: None, })) } #[get("/n//stream?")] pub async fn r_stream( ri: RequestInfo<'_>, slug: &str, range: Option, spec: BTreeMap, ) -> Result { let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; let mut node = None; ri.state.database.transaction(&mut |txn| { if let Some(row) = txn.query_single(Query { filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()), ..Default::default() })? { node = txn.get(row)?; } Ok(()) })?; let Some(node) = node.as_ref().map(|n| n.as_object()) else { Err(anyhow!("node not found"))? }; info!( "stream request (range={})", range .as_ref() .map(|r| r.to_cr_hv()) .unwrap_or("none".to_string()) ); let urange = match &range { Some(r) => { let r = r.0.first().unwrap_or(&(None..None)); r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX) } None => 0..u64::MAX, }; let head = jellystream::stream_head(&spec); let mut sources = BTreeSet::new(); for track in node.iter(NO_TRACK) { if let Some(s) = track.get(TR_SOURCE) { if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) { sources.insert(path.into()); } } } let media = Arc::new(SMediaInfo { files: sources, title: node.get(NO_TITLE).map(String::from), cache: ri.state.cache.clone(), config: ri.state.config.stream.clone(), }); // TODO too many threads let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange)) .await .unwrap() { Ok(o) => o, Err(e) => { warn!("stream error: {e:?}"); Err(e)? } }; let (stream_write, stream_read) = duplex(4096); spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); Ok(StreamResponse { stream: stream_read, range, advertise_range: head.range_supported, content_type: head.content_type, }) } pub struct RedirectResponse(String); #[rocket::async_trait] impl<'r> Responder<'r, 'static> for RedirectResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); b.status(Status::Found); b.header(Header::new("access-control-allow-origin", "*")); b.header(Header::new("location", self.0)); Ok(b.finalize()) } } pub struct StreamResponse { stream: DuplexStream, advertise_range: bool, content_type: &'static str, range: Option, } #[rocket::async_trait] impl<'r> Responder<'r, 'static> for StreamResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); b.status(Status::Ok); b.header(Header::new("access-control-allow-origin", "*")); if self.advertise_range { //* it is very important here to not reply with content range if we didnt advertise. //* mpv requests range but will crash if we dont pretend to not support it. if let Some(range) = self.range { b.status(Status::PartialContent); b.header(Header::new("content-range", range.to_cr_hv())); } b.header(Header::new("accept-ranges", "bytes")); } b.header(Header::new("content-type", self.content_type)) .streamed_body(self.stream) .ok() } } #[derive(Debug)] pub struct RequestRange(Vec>>); impl RequestRange { pub fn to_cr_hv(&self) -> String { assert_eq!(self.0.len(), 1); format!( "bytes {}-{}/*", self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), self.0[0].end.map(|e| e.to_string()).unwrap_or_default() ) } pub fn from_hv(s: &str) -> Result { Ok(Self( s.strip_prefix("bytes=") .ok_or(anyhow!("prefix expected"))? .split(',') .map(|s| { let (l, r) = s .split_once('-') .ok_or(anyhow!("range delimeter missing"))?; let km = |s: &str| { if s.is_empty() { Ok::<_, anyhow::Error>(None) } else { Ok(Some(s.parse()?)) } }; Ok(km(l)?..km(r)?) }) .collect::>>()?, )) } } #[rocket::async_trait] impl<'r> FromRequest<'r> for RequestRange { type Error = anyhow::Error; async fn from_request(req: &'r Request<'_>) -> request::Outcome { match req.headers().get("range").next() { Some(v) => match Self::from_hv(v) { Ok(v) => rocket::outcome::Outcome::Success(v), Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), }, None => rocket::outcome::Outcome::Forward(Status::Ok), } } }