From 7af66484702fa99ed30a6b498fca0066207d4885 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 29 Sep 2023 22:51:16 +0200 Subject: theoretically support original stream --- stream/src/lib.rs | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) (limited to 'stream/src') diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 867b310..ccb424a 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -1,11 +1,14 @@ -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use jellybase::CONF; use jellycommon::{ stream::{StreamFormat, StreamSpec}, LocalTrack, MediaSource, Node, }; -use std::ops::Range; -use tokio::io::{duplex, DuplexStream}; +use std::{io::SeekFrom, ops::Range}; +use tokio::{ + fs::File, + io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, +}; use tokio_util::io::SyncIoBridge; pub async fn stream(node: Node, spec: StreamSpec, range: Range) -> Result { @@ -22,7 +25,7 @@ pub async fn stream(node: Node, spec: StreamSpec, range: Range) -> Result }; match spec.format { - StreamFormat::Original => todo!(), + StreamFormat::Original => original_stream(track_sources, spec, range, b).await?, StreamFormat::Matroska | StreamFormat::Webm => { remux_stream(node, track_sources, spec, range, b).await? } @@ -57,3 +60,33 @@ async fn remux_stream( Ok(()) } + +async fn original_stream( + track_sources: Vec, + spec: StreamSpec, + range: Range, + b: DuplexStream, +) -> Result<()> { + if spec.tracks.len() != 1 { + bail!("invalid amout of source \"tracks\". original only allows for exactly one.") + } + + let source = track_sources[spec.tracks[0]].clone(); + let mut file = File::open(source.path).await.context("opening source")?; + file.seek(SeekFrom::Start(range.start as u64)) + .await + .context("seek source")?; + + tokio::task::spawn(copy_stream(file, b, range.end - range.start)); + + Ok(()) +} + +async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { + let mut buf = [0u8; 4096]; + loop { + let size = inp.read(&mut buf[..amount.min(4096)]).await?; + out.write_all(&buf[..size]).await?; + amount -= size; + } +} -- cgit v1.2.3-70-g09d2