summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock28
-rw-r--r--client-native-gui/Cargo.toml5
-rw-r--r--client-native-gui/mpv.conf13
-rw-r--r--client-native-gui/src/main.rs194
4 files changed, 227 insertions, 13 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7d45937..30f11a5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -873,6 +873,16 @@ dependencies = [
]
[[package]]
+name = "crossbeam-channel"
+version = "0.5.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
+dependencies = [
+ "cfg-if",
+ "crossbeam-utils",
+]
+
+[[package]]
name = "crossbeam-utils"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2121,11 +2131,14 @@ dependencies = [
name = "keks-meet"
version = "0.1.0"
dependencies = [
+ "anyhow",
"async-std",
"client-native-lib",
+ "crossbeam-channel",
"eframe",
"egui",
"env_logger",
+ "libmpv",
"log",
"tokio",
]
@@ -2208,6 +2221,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565"
[[package]]
+name = "libmpv"
+version = "2.0.1"
+source = "git+https://github.com/sirno/libmpv-rs.git?branch=upgrade-libmpv#03252794bd976de5600c29c108d719b707e382fb"
+dependencies = [
+ "libmpv-sys",
+]
+
+[[package]]
+name = "libmpv-sys"
+version = "3.1.0"
+source = "git+https://github.com/sirno/libmpv-rs.git?branch=upgrade-libmpv#03252794bd976de5600c29c108d719b707e382fb"
+
+[[package]]
name = "linux-raw-sys"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2250,7 +2276,7 @@ version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb907fe88d54d8d9ce32a3cceab4218ed2f6b7d35617cafe9adf84e43919cb"
dependencies = [
- "libc 0.1.12",
+ "libc 0.2.138",
]
[[package]]
diff --git a/client-native-gui/Cargo.toml b/client-native-gui/Cargo.toml
index 3912bc9..e917276 100644
--- a/client-native-gui/Cargo.toml
+++ b/client-native-gui/Cargo.toml
@@ -10,6 +10,11 @@ async-std = "1.12.0"
tokio = { version = "1.21.2", features = ["full"] }
env_logger = "0.8"
log = "0.4"
+anyhow = "1.0.66"
+crossbeam-channel = "0.5.6"
egui = "0.19.0"
eframe = "0.19.0"
+
+libmpv = { git = "https://github.com/sirno/libmpv-rs.git", branch = "upgrade-libmpv" }
+
diff --git a/client-native-gui/mpv.conf b/client-native-gui/mpv.conf
new file mode 100644
index 0000000..3669492
--- /dev/null
+++ b/client-native-gui/mpv.conf
@@ -0,0 +1,13 @@
+# configuration for the libmpv used in keks-meet
+# this is equivalent to --profile=low-latency
+audio-buffer=0
+vd-lavc-threads=1
+cache-pause=no
+demuxer-lavf-o-add=fflags=+nobuffer
+demuxer-lavf-probe-info=nostreams
+demuxer-lavf-analyzeduration=0.1
+video-sync=audio
+interpolation=no
+video-latency-hacks=yes
+stream-buffer-size=4k
+correct-pts=yes
diff --git a/client-native-gui/src/main.rs b/client-native-gui/src/main.rs
index 69d1657..f92fa7a 100644
--- a/client-native-gui/src/main.rs
+++ b/client-native-gui/src/main.rs
@@ -5,13 +5,28 @@ use client_native_lib::{
instance::Instance,
peer::Peer,
protocol::{ProvideInfo, RelayMessage},
+ webrtc::{
+ rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication,
+ rtp::{codecs::h264::H264Packet, packetizer::Depacketizer},
+ track::track_remote::TrackRemote,
+ },
Config, EventHandler,
};
+use crossbeam_channel::Sender;
use eframe::egui;
use egui::{Ui, Visuals};
+use log::{debug, error, warn};
use std::{
- collections::HashMap,
- sync::{Arc, RwLock},
+ collections::{HashMap, VecDeque},
+ fs::File,
+ io::Write,
+ ops::Deref,
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc, RwLock,
+ },
+ thread,
+ time::Duration,
};
use tokio::task::JoinHandle;
@@ -44,8 +59,8 @@ enum App {
}
struct Ingame {
- instance: Arc<Instance>,
- handler: Arc<Handler>,
+ pub instance: Arc<Instance>,
+ pub handler: Arc<Handler>,
}
struct Handler {
@@ -60,13 +75,15 @@ struct GuiPeer {
struct GuiResource {
info: ProvideInfo,
- state: GuiResourceState,
+ state: Arc<RwLock<GuiResourceState>>,
}
+#[derive(Debug, Clone)]
enum GuiResourceState {
Available,
Connecting,
Connected,
+ Disconnecting,
}
impl App {
@@ -136,12 +153,13 @@ impl GuiResource {
"{} {} {:?}",
self.info.id, self.info.kind, self.info.label
));
- match self.state {
+ let state = self.state.read().unwrap().to_owned();
+ match state {
GuiResourceState::Available => {
if ui.button("Enable").clicked() {
let id = self.info.id.clone();
let peer = peer.clone();
- self.state = GuiResourceState::Connecting;
+ *self.state.write().unwrap() = GuiResourceState::Connecting;
tokio::spawn(async move { peer.request_resource(id).await });
}
}
@@ -151,11 +169,17 @@ impl GuiResource {
ui.label("Connecting...")
});
}
+ GuiResourceState::Disconnecting => {
+ ui.horizontal(|ui| {
+ ui.spinner();
+ ui.label("Disconnecting...")
+ });
+ }
GuiResourceState::Connected => {
if ui.button("Disable").clicked() {
let id = self.info.id.clone();
let peer = peer.clone();
- self.state = GuiResourceState::Available;
+ *self.state.write().unwrap() = GuiResourceState::Disconnecting;
tokio::spawn(async move { peer.request_stop_resource(id).await });
}
}
@@ -213,7 +237,7 @@ impl EventHandler for Handler {
info.id.clone(),
GuiResource {
info,
- state: GuiResourceState::Available,
+ state: Arc::new(RwLock::new(GuiResourceState::Available)),
},
);
}
@@ -239,9 +263,20 @@ impl EventHandler for Handler {
) -> client_native_lib::DynFut<()> {
if let Some(gp) = self.peers.write().unwrap().get_mut(&peer.id) {
if let Some(gr) = gp.resources.get_mut(&resource.id) {
- gr.state = GuiResourceState::Connected
-
- // TODO
+ *gr.state.write().unwrap() = GuiResourceState::Connected;
+ match channel {
+ client_native_lib::peer::TransportChannel::Track(track) => {
+ let peer = gp.peer.clone();
+ let state = gr.state.clone();
+ tokio::task::spawn_blocking(move || {
+ play(peer, track);
+ *state.write().unwrap() = GuiResourceState::Available;
+ });
+ }
+ client_native_lib::peer::TransportChannel::DataChannel(_) => {
+ warn!("cant handle data channel yet")
+ }
+ }
}
}
Box::pin(async move {})
@@ -261,3 +296,138 @@ impl EventHandler for Handler {
Box::pin(async move {})
}
}
+
+pub fn play(peer: Arc<Peer>, track: Arc<TrackRemote>) {
+ let rid = block_on(track.stream_id());
+ let (exit_tx, exit_rx) = crossbeam_channel::unbounded();
+ let has_exited = Arc::new(AtomicBool::new(false));
+ let buffer = Arc::new(RwLock::new(VecDeque::new()));
+
+ {
+ let buffer = buffer.clone();
+ let track = track.clone();
+ tokio::spawn(async move {
+ if let Err(e) = track_to_raw(track, buffer).await {
+ error!("export error: {e}");
+ }
+ });
+ }
+ {
+ let media_ssrc = track.ssrc();
+ let peer = peer.clone();
+ let has_exited = has_exited.clone();
+ tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(Duration::from_secs(3)).await;
+ debug!("sending pli");
+ let r = peer
+ .peer_connection
+ .write_rtcp(&[Box::new(PictureLossIndication {
+ sender_ssrc: 0,
+ media_ssrc,
+ })])
+ .await;
+ if r.is_err() {
+ break;
+ }
+ if has_exited.load(Ordering::Relaxed) {
+ break;
+ }
+ }
+ debug!("pli send loop exited");
+ });
+ }
+ type State = (Arc<RwLock<VecDeque<u8>>>, Sender<()>, Arc<AtomicBool>);
+ fn open(state: &mut State, uri: &str) -> State {
+ debug!("mpv open: {uri}");
+ state.clone()
+ }
+ fn close(state: Box<State>) {
+ let _ = state.1.send(());
+ debug!("mpv close");
+ }
+ fn read(state: &mut State, buf: &mut [i8]) -> i64 {
+ let mut i = 0;
+ debug!("mpv request {} bytes", buf.len());
+ // TODO this is horrible
+ loop {
+ if state.2.load(Ordering::Relaxed) {
+ return 0;
+ }
+ let mut state = state.0.write().unwrap();
+ while let Some(e) = state.pop_front() {
+ if i >= buf.len() {
+ break;
+ }
+ buf[i] = e as i8;
+ i += 1;
+ }
+ if i != 0 {
+ break;
+ }
+ drop(state);
+ thread::sleep(Duration::from_millis(10));
+ }
+ debug!("mpv read {i} bytes");
+ i as i64
+ }
+
+ let mpv = libmpv::Mpv::new().unwrap();
+ File::create("/tmp/keks-meet-temp.conf")
+ .unwrap()
+ .write_all(include_bytes!("../mpv.conf"))
+ .unwrap();
+ mpv.load_config("/tmp/keks-meet-temp.conf").unwrap();
+
+ let proto = unsafe {
+ libmpv::protocol::Protocol::new(
+ "keks-meet-track".to_owned(),
+ (buffer, exit_tx.clone(), has_exited.clone()),
+ open,
+ close,
+ read,
+ None,
+ None,
+ )
+ };
+ let proto_ctx = mpv.create_protocol_context();
+ proto_ctx.register(proto).unwrap();
+ mpv.playlist_load_files(&[(
+ &format!("keks-meet-track://{}", rid),
+ libmpv::FileState::AppendPlay,
+ None,
+ )])
+ .unwrap();
+
+ block_on(track.onmute(move || {
+ debug!("mute");
+ let _ = exit_tx.send(());
+ Box::pin(async move {})
+ }));
+ exit_rx.recv().unwrap();
+ has_exited.store(true, Ordering::Relaxed);
+ block_on(peer.request_stop_resource(rid))
+}
+
+async fn track_to_raw(
+ track: Arc<TrackRemote>,
+ target: Arc<RwLock<VecDeque<u8>>>,
+) -> anyhow::Result<()> {
+ let mut cached_packet = H264Packet::default();
+ loop {
+ let (packet, _) = track.read_rtp().await?;
+ if !packet.payload.is_empty() {
+ let raw_payload = cached_packet.depacketize(&packet.payload)?;
+ if raw_payload.len() != 0 {
+ debug!("writing {} bytes", raw_payload.len());
+
+ let mut target = target.write().unwrap();
+ if target.len() < 10_000_000 {
+ target.extend(raw_payload.into_iter());
+ } else {
+ warn!("buffer is getting too big, dropping some data");
+ }
+ }
+ }
+ }
+}