/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ use super::ui::{account::session::Session, error::MyError}; use crate::{database::Database, CONF}; use anyhow::{anyhow, Result}; use jellycommon::MediaSource; use jellyremuxer::RemuxerContext; use log::{debug, info, warn}; use rocket::{ get, http::{ContentType, Header, Status}, request::{self, FromRequest}, response::{self, Responder}, Request, Response, State, }; use std::ops::{Deref, Range}; use tokio::io::{duplex, DuplexStream}; use tokio_util::io::SyncIoBridge; pub fn stream_uri(id: &str, tracks: &[u64], webm: bool) -> String { format!( "/n/{}/stream?tracks={}&webm={}", id, tracks .iter() .map(|v| format!("{v}")) .collect::>() .join(","), if webm { "1" } else { "0" } ) } #[get("/n//stream?&")] pub fn r_stream( _sess: Session, id: String, webm: Option, tracks: String, remuxer: &State, db: &State, range: Option, ) -> Result { let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?; let source = node .private .source .ok_or(anyhow!("item does not contain media"))?; let source_tracks = match source { MediaSource::Local { tracks } => tracks, _ => Err(anyhow!("todo"))?, }; info!( "stream request (range={})", range .as_ref() .map(|r| r.to_cr_hv()) .unwrap_or(format!("none")) ); let (a, b) = duplex(4096); let remuxer = remuxer.deref().clone(); let tracks = tracks .split(',') .map(|e| e.parse().map_err(|_| anyhow!("invalid number"))) .collect::, _>>()?; let b = SyncIoBridge::new(b); let urange = match &range { Some(r) => { let r = r.0.get(0).unwrap_or(&(None..None)); r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) } None => 0..(isize::MAX as usize), }; tokio::task::spawn_blocking(move || { if let Err(e) = remuxer.generate_into( b, urange, CONF.library_path.clone(), node.public, source_tracks, tracks, webm.unwrap_or(false), ) { warn!("stream stopped: {e}") } }); debug!("starting stream"); Ok(StreamResponse { stream: a, range }) } pub struct StreamResponse { stream: DuplexStream, range: Option, } #[rocket::async_trait] impl<'r> Responder<'r, 'static> for StreamResponse { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { let mut b = Response::build(); if let Some(range) = self.range { b.status(Status::PartialContent); b.header(Header::new("content-range", range.to_cr_hv())); } b.header(Header::new("accept-ranges", "bytes")) .header(ContentType::WEBM) .streamed_body(self.stream) .ok() } } #[derive(Debug)] pub struct RequestRange(Vec>>); impl RequestRange { pub fn to_cr_hv(&self) -> String { assert_eq!(self.0.len(), 1); format!( "bytes {}-{}/*", self.0[0] .start .map(|e| format!("{e}")) .unwrap_or(String::new()), self.0[0] .end .map(|e| format!("{e}")) .unwrap_or(String::new()) ) } pub fn from_hv(s: &str) -> Result { Ok(Self( s.strip_prefix("bytes=") .ok_or(anyhow!("prefix expected"))? .split(',') .map(|s| { let (l, r) = s .split_once('-') .ok_or(anyhow!("range delimeter missing"))?; let km = |s: &str| { if s.is_empty() { Ok::<_, anyhow::Error>(None) } else { Ok(Some(s.parse()?)) } }; Ok(km(l)?..km(r)?) }) .collect::>>()?, )) } } #[rocket::async_trait] impl<'r> FromRequest<'r> for RequestRange { type Error = anyhow::Error; async fn from_request(req: &'r Request<'_>) -> request::Outcome { match req.headers().get("range").next() { Some(v) => match Self::from_hv(v) { Ok(v) => rocket::outcome::Outcome::Success(v), Err(e) => rocket::outcome::Outcome::Failure((Status::BadRequest, e)), }, None => rocket::outcome::Outcome::Forward(()), } } }