aboutsummaryrefslogtreecommitdiff
path: root/server/src/routes/stream.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-09-29 20:56:36 +0200
committermetamuffin <metamuffin@disroot.org>2023-09-29 20:56:36 +0200
commitc62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914 (patch)
tree7a32678b59c123ea6fbe6c01237aec5e3b143e87 /server/src/routes/stream.rs
parent29b12a48bcfa3aa0f814f7b39a64868b6313c13d (diff)
downloadjellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar
jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar.bz2
jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar.zst
move stream generation to new crate
Diffstat (limited to 'server/src/routes/stream.rs')
-rw-r--r--server/src/routes/stream.rs86
1 files changed, 30 insertions, 56 deletions
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index e920341..6b268e4 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -7,9 +7,8 @@ use super::ui::{account::session::Session, error::MyError};
use crate::{database::Database, federation::Federation};
use anyhow::{anyhow, Result};
use jellybase::CONF;
-use jellycommon::MediaSource;
-use jellyremuxer::RemuxerContext;
-use log::{debug, info, warn};
+use jellycommon::{stream::StreamSpec, MediaSource};
+use log::{info, warn};
use rocket::{
get,
http::{ContentType, Header, Status},
@@ -17,56 +16,43 @@ use rocket::{
response::{self, Redirect, Responder},
Either, Request, Response, State,
};
-use std::{
- ops::{Deref, Range},
- time::Duration,
-};
-use tokio::io::{duplex, DuplexStream};
-use tokio_util::io::SyncIoBridge;
+use std::{ops::Range, time::Duration};
+use tokio::io::DuplexStream;
-#[get("/n/<id>/stream?<tracks>&<webm>")]
+#[get("/n/<id>/stream?<spec>")]
pub async fn r_stream(
_sess: Session,
- id: String,
- webm: Option<bool>,
- tracks: String,
- remuxer: &State<RemuxerContext>,
federation: &State<Federation>,
db: &State<Database>,
+ id: String,
range: Option<RequestRange>,
+ spec: StreamSpec,
) -> Result<Either<StreamResponse, Redirect>, MyError> {
let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?;
let source = node
.private
.source
+ .as_ref()
.ok_or(anyhow!("item does not contain media"))?;
- let tracks = tracks
- .split(',')
- .map(|e| e.parse().map_err(|_| anyhow!("invalid number")))
- .collect::<Result<Vec<_>, _>>()?;
+ if let MediaSource::Remote { host, remote_id } = source {
+ let (username, password, _) = CONF
+ .remote_credentials
+ .get(host)
+ .ok_or(anyhow!("no credentials on the server-side"))?;
- let source_tracks = match source {
- MediaSource::Local { tracks } => tracks,
- MediaSource::Remote { host, remote_id } => {
- let (username, password, _) = CONF
- .remote_credentials
- .get(&host)
- .ok_or(anyhow!("no credentials on the server-side"))?;
+ let instance = federation.get_instance(&host)?.to_owned();
+ let session = instance
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60),
+ )
+ .await?;
- let instance = federation.get_instance(&host)?.to_owned();
- let session = instance
- .login(
- username.to_owned(),
- password.to_owned(),
- Duration::from_secs(60),
- )
- .await?;
-
- let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false));
- return Ok(Either::Right(Redirect::found(uri)));
- }
- };
+ let uri = session.stream(&remote_id, &spec);
+ return Ok(Either::Right(Redirect::found(uri)));
+ }
info!(
"stream request (range={})",
@@ -76,10 +62,6 @@ pub async fn r_stream(
.unwrap_or(format!("none"))
);
- let (a, b) = duplex(4096);
- let remuxer = remuxer.deref().clone();
- let b = SyncIoBridge::new(b);
-
let urange = match &range {
Some(r) => {
let r = r.0.get(0).unwrap_or(&(None..None));
@@ -88,21 +70,13 @@ pub async fn r_stream(
None => 0..(isize::MAX as usize),
};
- tokio::task::spawn_blocking(move || {
- if let Err(e) = remuxer.generate_into(
- b,
- urange,
- CONF.library_path.clone(),
- node.public,
- source_tracks,
- tracks,
- webm.unwrap_or(false),
- ) {
- warn!("stream stopped: {e}")
+ match jellystream::stream(node, spec, urange).await {
+ Ok(stream) => Ok(Either::Left(StreamResponse { stream, range })),
+ Err(e) => {
+ warn!("stream error: {e}");
+ Err(MyError(e))
}
- });
- debug!("starting stream");
- Ok(Either::Left(StreamResponse { stream: a, range }))
+ }
}
pub struct StreamResponse {