/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ use super::ui::{account::session::Session, error::MyError}; use crate::database::DataAcid; use anyhow::{anyhow, Result}; use jellybase::{ database::{TableExt, T_NODE}, federation::Federation, permission::{NodePermissionExt, PermissionSetExt}, SECRETS, }; use jellycommon::{ config::FederationAccount, stream::StreamSpec, user::{CreateSessionParams, UserPermission}, TrackSource, }; use log::{info, warn}; use rocket::{ get, head, http::{Header, Status}, request::{self, FromRequest}, response::{self, Redirect, Responder}, Either, Request, Response, State, }; use std::{collections::HashSet, ops::Range}; use tokio::io::{duplex, DuplexStream}; #[head("/n/<_id>/stream?")] pub async fn r_stream_head( _sess: Session, _id: &str, spec: StreamSpec, ) -> Result, MyError> { let head = jellystream::stream_head(&spec); Ok(Either::Left(StreamResponse { stream: duplex(0).0, advertise_range: head.range_supported, content_type: head.content_type, range: None, })) } #[get("/n//stream?")] pub async fn r_stream( session: Session, federation: &State, db: &State, id: &str, range: Option, spec: StreamSpec, ) -> Result, MyError> { let node = T_NODE .get(&db, id)? .only_if_permitted(&session.user.permissions) .ok_or(anyhow!("node does not exist"))?; let source = node .private .source .as_ref() .ok_or(anyhow!("item does not contain media"))?; // TODO its unclear how requests with multiple tracks should be handled. if spec.tracks.len() == 1 { let ti = spec.tracks[0]; if let TrackSource::Remote(remote_index) = source[ti] { session .user .permissions .assert(&UserPermission::FederatedContent)?; let track = &node.public.media.ok_or(anyhow!("no media"))?.tracks[ti]; let host = track .federated .last() .ok_or(anyhow!("federation inconsistent"))?; let FederationAccount { password, username, .. } = SECRETS .federation .get(host) .ok_or(anyhow!("no credentials on the server-side"))?; info!("creating session on {host}"); let instance = federation.get_instance(&host)?.to_owned(); let session = instance .login(CreateSessionParams { username: username.to_owned(), password: password.to_owned(), expire: Some(60), drop_permissions: Some(HashSet::from_iter([ UserPermission::ManageSelf, UserPermission::Admin, // in case somebody federated the admin :))) ])), }) .await?; let uri = session.stream_url( node.public.id.as_ref().unwrap(), &StreamSpec { tracks: vec![remote_index], ..spec }, ); info!("federation redirect"); return Ok(Either::Right(RedirectResponse(uri))); } } info!( "stream request (range={})", range .as_ref() .map(|r| r.to_cr_hv()) .unwrap_or(format!("none")) ); let urange = match &range { Some(r) => { let r = r.0.get(0).unwrap_or(&(None..None)); r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) } None => 0..(isize::MAX as usize), }; let head = jellystream::stream_head(&spec); match jellystream::stream(node, spec, urange, &session.user.permissions).await { Ok(stream) => Ok(Either::Left(StreamResponse { stream, range, advertise_range: head.range_supported, content_type: head.content_type, })), Err(e) => { warn!("stream error: {e}"); Err(MyError(e)) } } } pub struct RedirectResponse(String); #[rocket::async_trait] impl<'r> Responder<'r, 'static> for RedirectResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); b.status(Status::Found); b.header(Header::new("access-control-allow-origin", "*")); b.header(Header::new("location", self.0)); Ok(b.finalize()) } } pub struct StreamResponse { stream: DuplexStream, advertise_range: bool, content_type: &'static str, range: Option, } #[rocket::async_trait] impl<'r> Responder<'r, 'static> for StreamResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); b.status(Status::Ok); b.header(Header::new("access-control-allow-origin", "*")); if let Some(range) = self.range { b.status(Status::PartialContent); b.header(Header::new("content-range", range.to_cr_hv())); } if self.advertise_range { b.header(Header::new("accept-ranges", "bytes")); } b.header(Header::new("content-type", self.content_type)) .streamed_body(self.stream) .ok() } } #[derive(Debug)] pub struct RequestRange(Vec>>); impl RequestRange { pub fn to_cr_hv(&self) -> String { assert_eq!(self.0.len(), 1); format!( "bytes {}-{}/*", self.0[0] .start .map(|e| format!("{e}")) .unwrap_or(String::new()), self.0[0] .end .map(|e| format!("{e}")) .unwrap_or(String::new()) ) } pub fn from_hv(s: &str) -> Result { Ok(Self( s.strip_prefix("bytes=") .ok_or(anyhow!("prefix expected"))? .split(',') .map(|s| { let (l, r) = s .split_once('-') .ok_or(anyhow!("range delimeter missing"))?; let km = |s: &str| { if s.is_empty() { Ok::<_, anyhow::Error>(None) } else { Ok(Some(s.parse()?)) } }; Ok(km(l)?..km(r)?) }) .collect::>>()?, )) } } #[rocket::async_trait] impl<'r> FromRequest<'r> for RequestRange { type Error = anyhow::Error; async fn from_request(req: &'r Request<'_>) -> request::Outcome { match req.headers().get("range").next() { Some(v) => match Self::from_hv(v) { Ok(v) => rocket::outcome::Outcome::Success(v), Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), }, None => rocket::outcome::Outcome::Forward(Status::Ok), } } }