1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
use anyhow::{anyhow, bail, Context, Result};
use jellybase::CONF;
use jellycommon::{
stream::{StreamFormat, StreamSpec},
LocalTrack, MediaSource, Node,
};
use std::{io::SeekFrom, ops::Range};
use tokio::{
fs::File,
io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
};
use tokio_util::io::SyncIoBridge;
pub struct StreamHead {
pub content_type: &'static str,
pub range_supported: bool,
}
#[rustfmt::skip]
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
let webm_or_mkv = if spec.webm.unwrap_or(false) { "video/webm" } else { "video/x-matroska" };
match spec.format {
StreamFormat::Original => StreamHead { content_type: "video/x-matroska", range_supported: true },
StreamFormat::Matroska => StreamHead { content_type: webm_or_mkv, range_supported: true },
StreamFormat::Hls => StreamHead { content_type: "application/vnd.apple.mpegurl", range_supported: false },
StreamFormat::Jhls => StreamHead { content_type: "application/jellything-jhls+json", range_supported: false },
StreamFormat::Segment => StreamHead { content_type: webm_or_mkv, range_supported: false },
}
}
pub async fn stream(node: Node, spec: StreamSpec, range: Range<usize>) -> Result<DuplexStream> {
let (a, b) = duplex(4096);
let track_sources = match node
.private
.source
.as_ref()
.ok_or(anyhow!("node has no media"))?
{
MediaSource::Local { tracks } => tracks.to_owned(),
_ => bail!("node tracks are not local"),
};
match spec.format {
StreamFormat::Original => original_stream(track_sources, spec, range, b).await?,
StreamFormat::Matroska => remux_stream(node, track_sources, spec, range, b).await?,
StreamFormat::Hls => todo!(),
StreamFormat::Jhls => todo!(),
StreamFormat::Segment => todo!(),
}
Ok(a)
}
async fn remux_stream(
node: Node,
track_sources: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
let b = SyncIoBridge::new(b);
tokio::task::spawn_blocking(move || {
jellyremuxer::remux_stream_into(
b,
range,
CONF.library_path.to_owned(),
node.public,
track_sources,
spec.tracks,
spec.webm.unwrap_or(false),
)
});
Ok(())
}
async fn original_stream(
track_sources: Vec<LocalTrack>,
spec: StreamSpec,
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
if spec.tracks.len() != 1 {
bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
}
let source = track_sources[spec.tracks[0]].clone();
let mut file = File::open(source.path).await.context("opening source")?;
file.seek(SeekFrom::Start(range.start as u64))
.await
.context("seek source")?;
tokio::task::spawn(copy_stream(file, b, range.end - range.start));
Ok(())
}
async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> {
let mut buf = [0u8; 4096];
loop {
let size = inp.read(&mut buf[..amount.min(4096)]).await?;
out.write_all(&buf[..size]).await?;
amount -= size;
}
}
|