aboutsummaryrefslogtreecommitdiff
path: root/stream/src/fragment.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-01-29 13:22:21 +0100
committermetamuffin <metamuffin@disroot.org>2024-01-29 13:22:21 +0100
commit2676e755286d117b100d379fce84ec3da6d8ae98 (patch)
tree2a075db5468f0c8c2f653be5222c183f4362fcfa /stream/src/fragment.rs
parenta4526fd2eb670c8fac2c28eb1597f0c091f25a2a (diff)
downloadjellything-2676e755286d117b100d379fce84ec3da6d8ae98.tar
jellything-2676e755286d117b100d379fce84ec3da6d8ae98.tar.bz2
jellything-2676e755286d117b100d379fce84ec3da6d8ae98.tar.zst
consistent name for {snippet,segment?,fragment}
Diffstat (limited to 'stream/src/fragment.rs')
-rw-r--r--stream/src/fragment.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs
new file mode 100644
index 0000000..f08114c
--- /dev/null
+++ b/stream/src/fragment.rs
@@ -0,0 +1,84 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2024 metamuffin <metamuffin.org>
+*/
+use anyhow::{anyhow, bail, Result};
+use jellybase::{permission::PermissionSetExt, CONF};
+use jellycommon::{
+ stream::StreamSpec,
+ user::{PermissionSet, UserPermission},
+ LocalTrack, Node,
+};
+use jellytranscoder::fragment::transcode;
+use log::warn;
+use tokio::{fs::File, io::DuplexStream};
+use tokio_util::io::SyncIoBridge;
+
+pub async fn fragment_stream(
+ node: Node,
+ local_tracks: Vec<LocalTrack>,
+ spec: StreamSpec,
+ mut b: DuplexStream,
+ perms: &PermissionSet,
+) -> Result<()> {
+ if spec.tracks.len() != 1 {
+ bail!("unsupported number of tracks for segment, must be exactly one");
+ }
+ let track = spec.tracks[0];
+ let n = spec.index.ok_or(anyhow!("segment index missing"))?;
+
+ let local_track = local_tracks
+ .get(0)
+ .ok_or(anyhow!("track missing"))?
+ .to_owned();
+
+ if let Some(profile) = spec.profile {
+ perms.assert(&UserPermission::Transcode)?;
+ let location = transcode(
+ &format!("{track} {n} {:?}", node.private.source), // TODO maybe not use the entire source
+ CONF.transcoding_profiles
+ .get(profile)
+ .ok_or(anyhow!("profile out of range"))?,
+ move |b| {
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = jellyremuxer::write_fragment_into(
+ SyncIoBridge::new(b),
+ &CONF.media_path,
+ &node.public,
+ &local_track,
+ track,
+ false,
+ n,
+ ) {
+ warn!("segment stream error: {err}");
+ }
+ });
+ },
+ )
+ .await?;
+ let mut output = File::open(location.abs()).await?;
+ tokio::task::spawn(async move {
+ if let Err(err) = tokio::io::copy(&mut output, &mut b).await {
+ warn!("cannot write stream: {err}")
+ }
+ });
+ } else {
+ let b = SyncIoBridge::new(b);
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = jellyremuxer::write_fragment_into(
+ b,
+ &CONF.media_path,
+ &node.public,
+ &local_track,
+ track,
+ spec.webm.unwrap_or(false),
+ n,
+ ) {
+ warn!("segment stream error: {err}");
+ }
+ });
+ }
+
+ Ok(())
+}