diff options
Diffstat (limited to 'server/src/routes/stream.rs')
-rw-r--r-- | server/src/routes/stream.rs | 86 |
1 files changed, 30 insertions, 56 deletions
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index e920341..6b268e4 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -7,9 +7,8 @@ use super::ui::{account::session::Session, error::MyError}; use crate::{database::Database, federation::Federation}; use anyhow::{anyhow, Result}; use jellybase::CONF; -use jellycommon::MediaSource; -use jellyremuxer::RemuxerContext; -use log::{debug, info, warn}; +use jellycommon::{stream::StreamSpec, MediaSource}; +use log::{info, warn}; use rocket::{ get, http::{ContentType, Header, Status}, @@ -17,56 +16,43 @@ use rocket::{ response::{self, Redirect, Responder}, Either, Request, Response, State, }; -use std::{ - ops::{Deref, Range}, - time::Duration, -}; -use tokio::io::{duplex, DuplexStream}; -use tokio_util::io::SyncIoBridge; +use std::{ops::Range, time::Duration}; +use tokio::io::DuplexStream; -#[get("/n/<id>/stream?<tracks>&<webm>")] +#[get("/n/<id>/stream?<spec>")] pub async fn r_stream( _sess: Session, - id: String, - webm: Option<bool>, - tracks: String, - remuxer: &State<RemuxerContext>, federation: &State<Federation>, db: &State<Database>, + id: String, range: Option<RequestRange>, + spec: StreamSpec, ) -> Result<Either<StreamResponse, Redirect>, MyError> { let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?; let source = node .private .source + .as_ref() .ok_or(anyhow!("item does not contain media"))?; - let tracks = tracks - .split(',') - .map(|e| e.parse().map_err(|_| anyhow!("invalid number"))) - .collect::<Result<Vec<_>, _>>()?; + if let MediaSource::Remote { host, remote_id } = source { + let (username, password, _) = CONF + .remote_credentials + .get(host) + .ok_or(anyhow!("no credentials on the server-side"))?; - let source_tracks = match source { - MediaSource::Local { tracks } => tracks, - MediaSource::Remote { host, remote_id } => { - let (username, password, _) = CONF - .remote_credentials - .get(&host) - .ok_or(anyhow!("no credentials on the server-side"))?; + let instance = federation.get_instance(&host)?.to_owned(); + let session = instance + .login( + username.to_owned(), + password.to_owned(), + Duration::from_secs(60), + ) + .await?; - let instance = federation.get_instance(&host)?.to_owned(); - let session = instance - .login( - username.to_owned(), - password.to_owned(), - Duration::from_secs(60), - ) - .await?; - - let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false)); - return Ok(Either::Right(Redirect::found(uri))); - } - }; + let uri = session.stream(&remote_id, &spec); + return Ok(Either::Right(Redirect::found(uri))); + } info!( "stream request (range={})", @@ -76,10 +62,6 @@ pub async fn r_stream( .unwrap_or(format!("none")) ); - let (a, b) = duplex(4096); - let remuxer = remuxer.deref().clone(); - let b = SyncIoBridge::new(b); - let urange = match &range { Some(r) => { let r = r.0.get(0).unwrap_or(&(None..None)); @@ -88,21 +70,13 @@ pub async fn r_stream( None => 0..(isize::MAX as usize), }; - tokio::task::spawn_blocking(move || { - if let Err(e) = remuxer.generate_into( - b, - urange, - CONF.library_path.clone(), - node.public, - source_tracks, - tracks, - webm.unwrap_or(false), - ) { - warn!("stream stopped: {e}") + match jellystream::stream(node, spec, urange).await { + Ok(stream) => Ok(Either::Left(StreamResponse { stream, range })), + Err(e) => { + warn!("stream error: {e}"); + Err(MyError(e)) } - }); - debug!("starting stream"); - Ok(Either::Left(StreamResponse { stream: a, range })) + } } pub struct StreamResponse { |