diff options
Diffstat (limited to 'server/src/routes/stream.rs')
-rw-r--r-- | server/src/routes/stream.rs | 246 |
1 files changed, 0 insertions, 246 deletions
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs deleted file mode 100644 index 0fbeb3a..0000000 --- a/server/src/routes/stream.rs +++ /dev/null @@ -1,246 +0,0 @@ -/* - This file is part of jellything (https://codeberg.org/metamuffin/jellything) - which is licensed under the GNU Affero General Public License (version 3); see /COPYING. - Copyright (C) 2025 metamuffin <metamuffin.org> -*/ -use super::ui::{account::session::Session, error::MyError}; -use crate::database::Database; -use anyhow::{anyhow, Result}; -use jellybase::{assetfed::AssetInner, federation::Federation}; -use jellycommon::{stream::StreamSpec, TrackSource}; -use jellystream::SMediaInfo; -use log::{info, warn}; -use rocket::{ - get, head, - http::{Header, Status}, - request::{self, FromRequest}, - response::{self, Redirect, Responder}, - Either, Request, Response, State, -}; -use std::{ - collections::{BTreeMap, BTreeSet}, - ops::Range, - sync::Arc, -}; -use tokio::io::{duplex, DuplexStream}; - -#[head("/n/<_id>/stream?<spec..>")] -pub async fn r_stream_head( - _sess: Session, - _id: &str, - spec: BTreeMap<String, String>, -) -> Result<Either<StreamResponse, Redirect>, MyError> { - let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; - let head = jellystream::stream_head(&spec); - Ok(Either::Left(StreamResponse { - stream: duplex(0).0, - advertise_range: head.range_supported, - content_type: head.content_type, - range: None, - })) -} - -#[get("/n/<id>/stream?<spec..>")] -pub async fn r_stream( - _session: Session, - _federation: &State<Federation>, - db: &State<Database>, - id: &str, - range: Option<RequestRange>, - spec: BTreeMap<String, String>, -) -> Result<Either<StreamResponse, RedirectResponse>, MyError> { - let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; - // TODO perm - let node = db - .get_node_slug(id)? - .ok_or(anyhow!("node does not exist"))?; - - let media = Arc::new( - node.media - .clone() - .ok_or(anyhow!("item does not contain media"))?, - ); - - // TODO its unclear how requests with multiple tracks should be handled. - // if spec.track.len() == 1 { - // let ti = spec.track[0]; - // if let TrackSource::Remote(remote_index) = media.tracks[ti].source { - // session - // .user - // .permissions - // .assert(&UserPermission::FederatedContent)?; - - // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; - // let host = track - // .federated - // .last() - // .ok_or(anyhow!("federation inconsistent"))?; - - // let FederationAccount { - // password, username, .. - // } = SECRETS - // .federation - // .get(host) - // .ok_or(anyhow!("no credentials on the server-side"))?; - - // info!("creating session on {host}"); - // let instance = federation.get_instance(host)?.to_owned(); - // let session = instance - // .login(CreateSessionParams { - // username: username.to_owned(), - // password: password.to_owned(), - // expire: Some(60), - // drop_permissions: Some(HashSet::from_iter([ - // UserPermission::ManageSelf, - // UserPermission::Admin, // in case somebody federated the admin :))) - // ])), - // }) - // .await?; - - // let uri = session.stream_url( - // node.slug.clone().into(), - // &StreamSpec { - // track: vec![remote_index], - // ..spec - // }, - // ); - // info!("federation redirect"); - // return Ok(Either::Right(RedirectResponse(uri))); - // } - // } - - info!( - "stream request (range={})", - range - .as_ref() - .map(|r| r.to_cr_hv()) - .unwrap_or("none".to_string()) - ); - - let urange = match &range { - Some(r) => { - let r = r.0.first().unwrap_or(&(None..None)); - r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) - } - None => 0..(isize::MAX as usize), - }; - - let head = jellystream::stream_head(&spec); - - let mut sources = BTreeSet::new(); - for t in &media.tracks { - if let TrackSource::Local(x) = &t.source { - if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { - sources.insert(m.path); - } - } - } - let media = Arc::new(SMediaInfo { - files: sources, - info: node, - }); - - match jellystream::stream(media, spec, urange).await { - Ok(stream) => Ok(Either::Left(StreamResponse { - stream, - range, - advertise_range: head.range_supported, - content_type: head.content_type, - })), - Err(e) => { - warn!("stream error: {e}"); - Err(MyError(e)) - } - } -} - -pub struct RedirectResponse(String); - -#[rocket::async_trait] -impl<'r> Responder<'r, 'static> for RedirectResponse { - fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { - let mut b = Response::build(); - b.status(Status::Found); - b.header(Header::new("access-control-allow-origin", "*")); - b.header(Header::new("location", self.0)); - Ok(b.finalize()) - } -} - -pub struct StreamResponse { - stream: DuplexStream, - advertise_range: bool, - content_type: &'static str, - range: Option<RequestRange>, -} - -#[rocket::async_trait] -impl<'r> Responder<'r, 'static> for StreamResponse { - fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { - let mut b = Response::build(); - b.status(Status::Ok); - b.header(Header::new("access-control-allow-origin", "*")); - if self.advertise_range { - //* it is very important here to not reply with content range if we didnt advertise. - //* mpv requests range but will crash if we dont pretend to not support it. - if let Some(range) = self.range { - b.status(Status::PartialContent); - b.header(Header::new("content-range", range.to_cr_hv())); - } - b.header(Header::new("accept-ranges", "bytes")); - } - b.header(Header::new("content-type", self.content_type)) - .streamed_body(self.stream) - .ok() - } -} - -#[derive(Debug)] -pub struct RequestRange(Vec<Range<Option<usize>>>); - -impl RequestRange { - pub fn to_cr_hv(&self) -> String { - assert_eq!(self.0.len(), 1); - format!( - "bytes {}-{}/*", - self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), - self.0[0].end.map(|e| e.to_string()).unwrap_or_default() - ) - } - pub fn from_hv(s: &str) -> Result<Self> { - Ok(Self( - s.strip_prefix("bytes=") - .ok_or(anyhow!("prefix expected"))? - .split(',') - .map(|s| { - let (l, r) = s - .split_once('-') - .ok_or(anyhow!("range delimeter missing"))?; - let km = |s: &str| { - if s.is_empty() { - Ok::<_, anyhow::Error>(None) - } else { - Ok(Some(s.parse()?)) - } - }; - Ok(km(l)?..km(r)?) - }) - .collect::<Result<Vec<_>>>()?, - )) - } -} - -#[rocket::async_trait] -impl<'r> FromRequest<'r> for RequestRange { - type Error = anyhow::Error; - - async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> { - match req.headers().get("range").next() { - Some(v) => match Self::from_hv(v) { - Ok(v) => rocket::outcome::Outcome::Success(v), - Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), - }, - None => rocket::outcome::Outcome::Forward(Status::Ok), - } - } -} |