aboutsummaryrefslogtreecommitdiff
path: root/server/src/logic/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/logic/stream.rs')
-rw-r--r--server/src/logic/stream.rs39
1 files changed, 22 insertions, 17 deletions
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs
index e3650ec..dfe2f86 100644
--- a/server/src/logic/stream.rs
+++ b/server/src/logic/stream.rs
@@ -9,7 +9,7 @@ use jellycommon::{api::NodeFilterSort, stream::StreamSpec, NodeID, TrackSource};
use jellyimport::asset_token::AssetInner;
use jellylogic::{node::get_node, session::Session};
use jellystream::SMediaInfo;
-use log::{info, warn};
+use log::info;
use rocket::{
get, head,
http::{Header, Status},
@@ -22,7 +22,11 @@ use std::{
ops::Range,
sync::Arc,
};
-use tokio::io::{duplex, DuplexStream};
+use tokio::{
+ io::{duplex, DuplexStream},
+ task::spawn_blocking,
+};
+use tokio_util::io::SyncIoBridge;
#[head("/n/<_id>/stream?<spec..>")]
pub async fn r_stream_head(
@@ -123,9 +127,9 @@ pub async fn r_stream(
let urange = match &range {
Some(r) => {
let r = r.0.first().unwrap_or(&(None..None));
- r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize)
+ r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX)
}
- None => 0..(isize::MAX as usize),
+ None => 0..u64::MAX,
};
let head = jellystream::stream_head(&spec);
@@ -143,18 +147,19 @@ pub async fn r_stream(
title: node.title.clone(),
});
- match jellystream::stream(media, spec, urange).await {
- Ok(stream) => Ok(Either::Left(StreamResponse {
- stream,
- range,
- advertise_range: head.range_supported,
- content_type: head.content_type,
- })),
- Err(e) => {
- warn!("stream error: {e}");
- Err(MyError(e))
- }
- }
+ // TODO cleaner solution needed
+ let mut reader = spawn_blocking(move || jellystream::stream(media, spec, urange))
+ .await
+ .unwrap()?;
+ let (stream_write, stream_read) = duplex(4096);
+ spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write)));
+
+ Ok(Either::Left(StreamResponse {
+ stream: stream_read,
+ range,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ }))
}
pub struct RedirectResponse(String);
@@ -199,7 +204,7 @@ impl<'r> Responder<'r, 'static> for StreamResponse {
}
#[derive(Debug)]
-pub struct RequestRange(Vec<Range<Option<usize>>>);
+pub struct RequestRange(Vec<Range<Option<u64>>>);
impl RequestRange {
pub fn to_cr_hv(&self) -> String {