aboutsummaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-09-29 20:56:36 +0200
committermetamuffin <metamuffin@disroot.org>2023-09-29 20:56:36 +0200
commitc62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914 (patch)
tree7a32678b59c123ea6fbe6c01237aec5e3b143e87 /server
parent29b12a48bcfa3aa0f814f7b39a64868b6313c13d (diff)
downloadjellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar
jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar.bz2
jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar.zst
move stream generation to new crate
Diffstat (limited to 'server')
-rw-r--r--server/Cargo.toml4
-rw-r--r--server/src/main.rs4
-rw-r--r--server/src/routes/mod.rs3
-rw-r--r--server/src/routes/stream.rs86
-rw-r--r--server/src/routes/ui/player.rs40
5 files changed, 57 insertions, 80 deletions
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 57592cf..9a87f61 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
jellycommon = { path = "../common", features = ["rocket"] }
jellybase = { path = "../base" }
-jellyremuxer = { path = "../remuxer" }
+jellystream = { path = "../stream" }
jellyclient = { path = "../client" }
jellytranscoder = { path = "../transcoder" }
@@ -15,8 +15,8 @@ bincode = { version = "2.0.0-rc.3", features = ["serde", "derive"] }
serde_json = "1.0.107"
log = { workspace = true }
+anyhow = { workspace = true }
env_logger = "0.10.0"
-anyhow = "1.0.75"
rand = "0.8.5"
base64 = "0.21.4"
chrono = { version = "0.4.31", features = ["serde"] }
diff --git a/server/src/main.rs b/server/src/main.rs
index 43b1db4..f2c1440 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -10,7 +10,6 @@ use crate::routes::ui::admin::log::enable_logging;
use database::Database;
use federation::Federation;
use jellybase::CONF;
-use jellyremuxer::RemuxerContext;
use log::{error, warn};
use routes::build_rocket;
use tokio::fs::create_dir_all;
@@ -33,14 +32,13 @@ fn main() {
}
async fn async_main() {
create_dir_all(&CONF.cache_path).await.unwrap();
- let remuxer = RemuxerContext::new();
let database = Database::open(&CONF.database_path).unwrap();
let federation = Federation::initialize();
database.create_admin();
// if let Err(err) = import::import(&database, &federation).await {
// log::error!("import not sucessful: {err:?}")
// }
- let r = build_rocket(remuxer, database, federation).launch().await;
+ let r = build_rocket(database, federation).launch().await;
match r {
Ok(_) => warn!("server shutdown"),
Err(e) => error!("server exited: {e}"),
diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs
index 86a55ac..6fe5018 100644
--- a/server/src/routes/mod.rs
+++ b/server/src/routes/mod.rs
@@ -7,7 +7,6 @@ use crate::{database::Database, federation::Federation, routes::ui::error::MyRes
use api::{r_api_account_login, r_api_node_raw, r_api_root, r_api_version};
use base64::Engine;
use jellybase::CONF;
-use jellyremuxer::RemuxerContext;
use log::warn;
use rand::random;
use rocket::{
@@ -47,7 +46,6 @@ macro_rules! uri {
}
pub fn build_rocket(
- remuxer: RemuxerContext,
database: Database,
federation: Federation,
) -> Rocket<Build> {
@@ -70,7 +68,6 @@ pub fn build_rocket(
),
..Default::default()
})
- .manage(remuxer)
.manage(database)
.manage(federation)
.attach(AdHoc::on_response("set server header", |_req, res| {
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index e920341..6b268e4 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -7,9 +7,8 @@ use super::ui::{account::session::Session, error::MyError};
use crate::{database::Database, federation::Federation};
use anyhow::{anyhow, Result};
use jellybase::CONF;
-use jellycommon::MediaSource;
-use jellyremuxer::RemuxerContext;
-use log::{debug, info, warn};
+use jellycommon::{stream::StreamSpec, MediaSource};
+use log::{info, warn};
use rocket::{
get,
http::{ContentType, Header, Status},
@@ -17,56 +16,43 @@ use rocket::{
response::{self, Redirect, Responder},
Either, Request, Response, State,
};
-use std::{
- ops::{Deref, Range},
- time::Duration,
-};
-use tokio::io::{duplex, DuplexStream};
-use tokio_util::io::SyncIoBridge;
+use std::{ops::Range, time::Duration};
+use tokio::io::DuplexStream;
-#[get("/n/<id>/stream?<tracks>&<webm>")]
+#[get("/n/<id>/stream?<spec>")]
pub async fn r_stream(
_sess: Session,
- id: String,
- webm: Option<bool>,
- tracks: String,
- remuxer: &State<RemuxerContext>,
federation: &State<Federation>,
db: &State<Database>,
+ id: String,
range: Option<RequestRange>,
+ spec: StreamSpec,
) -> Result<Either<StreamResponse, Redirect>, MyError> {
let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?;
let source = node
.private
.source
+ .as_ref()
.ok_or(anyhow!("item does not contain media"))?;
- let tracks = tracks
- .split(',')
- .map(|e| e.parse().map_err(|_| anyhow!("invalid number")))
- .collect::<Result<Vec<_>, _>>()?;
+ if let MediaSource::Remote { host, remote_id } = source {
+ let (username, password, _) = CONF
+ .remote_credentials
+ .get(host)
+ .ok_or(anyhow!("no credentials on the server-side"))?;
- let source_tracks = match source {
- MediaSource::Local { tracks } => tracks,
- MediaSource::Remote { host, remote_id } => {
- let (username, password, _) = CONF
- .remote_credentials
- .get(&host)
- .ok_or(anyhow!("no credentials on the server-side"))?;
+ let instance = federation.get_instance(&host)?.to_owned();
+ let session = instance
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60),
+ )
+ .await?;
- let instance = federation.get_instance(&host)?.to_owned();
- let session = instance
- .login(
- username.to_owned(),
- password.to_owned(),
- Duration::from_secs(60),
- )
- .await?;
-
- let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false));
- return Ok(Either::Right(Redirect::found(uri)));
- }
- };
+ let uri = session.stream(&remote_id, &spec);
+ return Ok(Either::Right(Redirect::found(uri)));
+ }
info!(
"stream request (range={})",
@@ -76,10 +62,6 @@ pub async fn r_stream(
.unwrap_or(format!("none"))
);
- let (a, b) = duplex(4096);
- let remuxer = remuxer.deref().clone();
- let b = SyncIoBridge::new(b);
-
let urange = match &range {
Some(r) => {
let r = r.0.get(0).unwrap_or(&(None..None));
@@ -88,21 +70,13 @@ pub async fn r_stream(
None => 0..(isize::MAX as usize),
};
- tokio::task::spawn_blocking(move || {
- if let Err(e) = remuxer.generate_into(
- b,
- urange,
- CONF.library_path.clone(),
- node.public,
- source_tracks,
- tracks,
- webm.unwrap_or(false),
- ) {
- warn!("stream stopped: {e}")
+ match jellystream::stream(node, spec, urange).await {
+ Ok(stream) => Ok(Either::Left(StreamResponse { stream, range })),
+ Err(e) => {
+ warn!("stream error: {e}");
+ Err(MyError(e))
}
- });
- debug!("starting stream");
- Ok(Either::Left(StreamResponse { stream: a, range }))
+ }
}
pub struct StreamResponse {
diff --git a/server/src/routes/ui/player.rs b/server/src/routes/ui/player.rs
index 005e513..fa9657f 100644
--- a/server/src/routes/ui/player.rs
+++ b/server/src/routes/ui/player.rs
@@ -17,15 +17,18 @@ use crate::{
uri,
};
use anyhow::anyhow;
-use jellycommon::{Node, SourceTrackKind};
+use jellycommon::{
+ stream::{StreamFormat, StreamSpec},
+ Node, SourceTrackKind, TrackID,
+};
use markup::DynRender;
use rocket::{get, FromForm, State, UriDisplayQuery};
#[derive(FromForm, Default, Clone, Debug, UriDisplayQuery)]
pub struct PlayerConfig {
- pub a: Option<u64>,
- pub v: Option<u64>,
- pub s: Option<u64>,
+ pub a: Option<TrackID>,
+ pub v: Option<TrackID>,
+ pub s: Option<TrackID>,
pub webm: bool,
}
@@ -37,24 +40,29 @@ pub fn r_player(
conf: PlayerConfig,
) -> MyResult<DynLayoutPage<'_>> {
let item = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?;
- let tracks = None
- .into_iter()
- .chain(conf.v.into_iter())
- .chain(conf.a.into_iter())
- .chain(conf.s.into_iter())
- .map(|e| format!("{e}"))
- .collect::<Vec<_>>()
- .join(",");
- let conf = player_conf(item.clone(), !tracks.is_empty())?;
+ let spec = StreamSpec {
+ tracks: None
+ .into_iter()
+ .chain(conf.v.into_iter())
+ .chain(conf.a.into_iter())
+ .chain(conf.s.into_iter())
+ .collect::<Vec<_>>(),
+ format: StreamFormat::Webm,
+ ..Default::default()
+ };
+
+ let playing = !spec.tracks.is_empty();
+
+ let conf = player_conf(item.clone(), playing)?;
Ok(LayoutPage {
title: item.public.title.to_owned(),
class: Some("player"),
content: markup::new! {
- @if tracks.is_empty() {
- img.backdrop[src=uri!(r_item_assets(&id, AssetRole::Backdrop, Some(2048))).to_string()];
+ @if playing {
+ video[src=uri!(r_stream(&id, &spec)), controls, preload="auto"]{}
} else {
- video[src=uri!(r_stream(&id, &tracks, Some(true))), controls, preload="auto"]{}
+ img.backdrop[src=uri!(r_item_assets(&id, AssetRole::Backdrop, Some(2048))).to_string()];
}
@conf
},