/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ use crate::{helper::A, ui::error::MyError}; use anyhow::{anyhow, Result}; use jellycommon::{api::NodeFilterSort, stream::StreamSpec, NodeID, TrackSource}; use jellyimport::asset_token::AssetInner; use jellylogic::{node::get_node, session::Session}; use jellystream::SMediaInfo; use log::{info, warn}; use rocket::{ get, head, http::{Header, Status}, request::{self, FromRequest}, response::{self, Redirect, Responder}, Either, Request, Response, }; use std::{ collections::{BTreeMap, BTreeSet}, ops::Range, sync::Arc, }; use tokio::io::{duplex, DuplexStream}; #[head("/n/<_id>/stream?")] pub async fn r_stream_head( _sess: A, _id: &str, spec: BTreeMap, ) -> Result, MyError> { let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; let head = jellystream::stream_head(&spec); Ok(Either::Left(StreamResponse { stream: duplex(0).0, advertise_range: head.range_supported, content_type: head.content_type, range: None, })) } #[get("/n//stream?")] pub async fn r_stream( session: A, id: &str, range: Option, spec: BTreeMap, ) -> Result, MyError> { let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; // TODO perm let node = get_node( &session.0, NodeID::from_slug(id), false, false, NodeFilterSort::default(), )? .node; let media = Arc::new( node.media .clone() .ok_or(anyhow!("item does not contain media"))?, ); // TODO its unclear how requests with multiple tracks should be handled. // if spec.track.len() == 1 { // let ti = spec.track[0]; // if let TrackSource::Remote(remote_index) = media.tracks[ti].source { // session // .user // .permissions // .assert(&UserPermission::FederatedContent)?; // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; // let host = track // .federated // .last() // .ok_or(anyhow!("federation inconsistent"))?; // let FederationAccount { // password, username, .. // } = SECRETS // .federation // .get(host) // .ok_or(anyhow!("no credentials on the server-side"))?; // info!("creating session on {host}"); // let instance = federation.get_instance(host)?.to_owned(); // let session = instance // .login(CreateSessionParams { // username: username.to_owned(), // password: password.to_owned(), // expire: Some(60), // drop_permissions: Some(HashSet::from_iter([ // UserPermission::ManageSelf, // UserPermission::Admin, // in case somebody federated the admin :))) // ])), // }) // .await?; // let uri = session.stream_url( // node.slug.clone().into(), // &StreamSpec { // track: vec![remote_index], // ..spec // }, // ); // info!("federation redirect"); // return Ok(Either::Right(RedirectResponse(uri))); // } // } info!( "stream request (range={})", range .as_ref() .map(|r| r.to_cr_hv()) .unwrap_or("none".to_string()) ); let urange = match &range { Some(r) => { let r = r.0.first().unwrap_or(&(None..None)); r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) } None => 0..(isize::MAX as usize), }; let head = jellystream::stream_head(&spec); let mut sources = BTreeSet::new(); for t in &media.tracks { if let TrackSource::Local(x) = &t.source { if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { sources.insert(m.path); } } } let media = Arc::new(SMediaInfo { files: sources, title: node.title.clone(), }); match jellystream::stream(media, spec, urange).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, range, advertise_range: head.range_supported, content_type: head.content_type, })), Err(e) => { warn!("stream error: {e}"); Err(MyError(e)) } } } pub struct RedirectResponse(String); #[rocket::async_trait] impl<'r> Responder<'r, 'static> for RedirectResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); b.status(Status::Found); b.header(Header::new("access-control-allow-origin", "*")); b.header(Header::new("location", self.0)); Ok(b.finalize()) } } pub struct StreamResponse { stream: DuplexStream, advertise_range: bool, content_type: &'static str, range: Option, } #[rocket::async_trait] impl<'r> Responder<'r, 'static> for StreamResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); b.status(Status::Ok); b.header(Header::new("access-control-allow-origin", "*")); if self.advertise_range { //* it is very important here to not reply with content range if we didnt advertise. //* mpv requests range but will crash if we dont pretend to not support it. if let Some(range) = self.range { b.status(Status::PartialContent); b.header(Header::new("content-range", range.to_cr_hv())); } b.header(Header::new("accept-ranges", "bytes")); } b.header(Header::new("content-type", self.content_type)) .streamed_body(self.stream) .ok() } } #[derive(Debug)] pub struct RequestRange(Vec>>); impl RequestRange { pub fn to_cr_hv(&self) -> String { assert_eq!(self.0.len(), 1); format!( "bytes {}-{}/*", self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), self.0[0].end.map(|e| e.to_string()).unwrap_or_default() ) } pub fn from_hv(s: &str) -> Result { Ok(Self( s.strip_prefix("bytes=") .ok_or(anyhow!("prefix expected"))? .split(',') .map(|s| { let (l, r) = s .split_once('-') .ok_or(anyhow!("range delimeter missing"))?; let km = |s: &str| { if s.is_empty() { Ok::<_, anyhow::Error>(None) } else { Ok(Some(s.parse()?)) } }; Ok(km(l)?..km(r)?) }) .collect::>>()?, )) } } #[rocket::async_trait] impl<'r> FromRequest<'r> for RequestRange { type Error = anyhow::Error; async fn from_request(req: &'r Request<'_>) -> request::Outcome { match req.headers().get("range").next() { Some(v) => match Self::from_hv(v) { Ok(v) => rocket::outcome::Outcome::Success(v), Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), }, None => rocket::outcome::Outcome::Forward(Status::Ok), } } }