diff options
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r-- | stream/src/lib.rs | 74 |
1 files changed, 46 insertions, 28 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 751ecfa..59b4960 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -9,8 +9,7 @@ pub mod hls; pub mod jhls; pub mod webvtt; -use anyhow::Result; -use ebml_struct::matroska::{Info, Tracks}; +use anyhow::{anyhow, Context, Result}; use jellybase::common::{ stream::{ StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec, @@ -18,12 +17,11 @@ use jellybase::common::{ }, LocalTrack, Node, }; -use jellymatroska::block::LacingType; use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata}; -use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc}; +use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc}; use tokio::{ fs::File, - io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}, + io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, task::spawn_blocking, }; use tokio_util::io::SyncIoBridge; @@ -75,7 +73,7 @@ pub async fn stream( StreamSpec::Whep { track, seek } => todo!(), StreamSpec::WhepControl { token } => todo!(), StreamSpec::Remux { tracks, container } => todo!(), - StreamSpec::Original { track } => todo!(), + StreamSpec::Original { track } => original_stream(info, track, range, b).await?, StreamSpec::HlsSuperMultiVariant { container } => todo!(), StreamSpec::HlsMultiVariant { segment, container } => todo!(), StreamSpec::HlsVariant { @@ -102,15 +100,26 @@ async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<Option<MatroskaMet Ok(spawn_blocking(move || matroska_metadata(&path)).await??) } -async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> { +struct InternalStreamInfo { + paths: Vec<PathBuf>, + metadata: Vec<MatroskaMetadata>, + track_to_file: Vec<usize>, +} + +async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> { let mut metadata = Vec::new(); + let mut paths = Vec::new(); for path in &info.files { - metadata.extend((*async_matroska_metadata(path.clone()).await?).clone()); + if let Some(meta) = (*async_matroska_metadata(path.clone()).await?).clone() { + metadata.push(meta); + paths.push(path.clone()); + } } let mut tracks = Vec::new(); + let mut track_to_file = Vec::new(); - for m in &metadata { + for (i, m) in metadata.iter().enumerate() { if let Some(t) = &m.tracks { for t in &t.entries { let mut formats = Vec::new(); @@ -134,7 +143,8 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> { _ => todo!(), }, formats, - }) + }); + track_to_file.push(i); } } } @@ -144,14 +154,21 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> { duration: 0, tracks, }; - Ok(StreamInfo { - name: info.info.title.clone(), - segments: vec![segment], - }) + Ok(( + InternalStreamInfo { + metadata, + paths, + track_to_file, + }, + StreamInfo { + name: info.info.title.clone(), + segments: vec![segment], + }, + )) } async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> { - let info = stream_info(info).await?; + let (_, info) = stream_info(info).await?; b.write_all(&serde_json::to_vec(&info)?).await?; Ok(()) } @@ -181,24 +198,25 @@ async fn remux_stream( } async fn original_stream( - local_tracks: Vec<LocalTrack>, - spec: StreamSpec, + info: Arc<SMediaInfo>, + track: usize, range: Range<usize>, b: DuplexStream, ) -> Result<()> { - // if spec.track.len() != 1 { - // bail!("invalid amout of source \"tracks\". original only allows for exactly one.") - // } + let (iinfo, _info) = stream_info(info).await?; - // let source = local_tracks[spec.track[0]].clone(); - // let mut file = File::open(CONF.media_path.join(source.path)) - // .await - // .context("opening source")?; - // file.seek(SeekFrom::Start(range.start as u64)) - // .await - // .context("seek source")?; + let file_index = *iinfo + .track_to_file + .get(track) + .ok_or(anyhow!("unknown track"))?; + let mut file = File::open(&iinfo.paths[file_index]) + .await + .context("opening source")?; + file.seek(SeekFrom::Start(range.start as u64)) + .await + .context("seek source")?; - // tokio::task::spawn(copy_stream(file, b, range.end - range.start)); + tokio::task::spawn(copy_stream(file, b, range.end - range.start)); Ok(()) } |