summaryrefslogtreecommitdiff
path: root/client-native-gui/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2022-12-11 14:33:17 +0100
committermetamuffin <metamuffin@disroot.org>2022-12-11 14:33:17 +0100
commit43bd492ffe61dfec82c172f3eeda869809aa0932 (patch)
tree301b8da07ef08da9dffb239c24e30775e97d230d /client-native-gui/src
parent62172003909956850879dee1252980b2547f8451 (diff)
downloadkeks-meet-43bd492ffe61dfec82c172f3eeda869809aa0932.tar
keks-meet-43bd492ffe61dfec82c172f3eeda869809aa0932.tar.bz2
keks-meet-43bd492ffe61dfec82c172f3eeda869809aa0932.tar.zst
stream playback with libmpv (native client)
Diffstat (limited to 'client-native-gui/src')
-rw-r--r--client-native-gui/src/main.rs194
1 files changed, 182 insertions, 12 deletions
diff --git a/client-native-gui/src/main.rs b/client-native-gui/src/main.rs
index 69d1657..f92fa7a 100644
--- a/client-native-gui/src/main.rs
+++ b/client-native-gui/src/main.rs
@@ -5,13 +5,28 @@ use client_native_lib::{
instance::Instance,
peer::Peer,
protocol::{ProvideInfo, RelayMessage},
+ webrtc::{
+ rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication,
+ rtp::{codecs::h264::H264Packet, packetizer::Depacketizer},
+ track::track_remote::TrackRemote,
+ },
Config, EventHandler,
};
+use crossbeam_channel::Sender;
use eframe::egui;
use egui::{Ui, Visuals};
+use log::{debug, error, warn};
use std::{
- collections::HashMap,
- sync::{Arc, RwLock},
+ collections::{HashMap, VecDeque},
+ fs::File,
+ io::Write,
+ ops::Deref,
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc, RwLock,
+ },
+ thread,
+ time::Duration,
};
use tokio::task::JoinHandle;
@@ -44,8 +59,8 @@ enum App {
}
struct Ingame {
- instance: Arc<Instance>,
- handler: Arc<Handler>,
+ pub instance: Arc<Instance>,
+ pub handler: Arc<Handler>,
}
struct Handler {
@@ -60,13 +75,15 @@ struct GuiPeer {
struct GuiResource {
info: ProvideInfo,
- state: GuiResourceState,
+ state: Arc<RwLock<GuiResourceState>>,
}
+#[derive(Debug, Clone)]
enum GuiResourceState {
Available,
Connecting,
Connected,
+ Disconnecting,
}
impl App {
@@ -136,12 +153,13 @@ impl GuiResource {
"{} {} {:?}",
self.info.id, self.info.kind, self.info.label
));
- match self.state {
+ let state = self.state.read().unwrap().to_owned();
+ match state {
GuiResourceState::Available => {
if ui.button("Enable").clicked() {
let id = self.info.id.clone();
let peer = peer.clone();
- self.state = GuiResourceState::Connecting;
+ *self.state.write().unwrap() = GuiResourceState::Connecting;
tokio::spawn(async move { peer.request_resource(id).await });
}
}
@@ -151,11 +169,17 @@ impl GuiResource {
ui.label("Connecting...")
});
}
+ GuiResourceState::Disconnecting => {
+ ui.horizontal(|ui| {
+ ui.spinner();
+ ui.label("Disconnecting...")
+ });
+ }
GuiResourceState::Connected => {
if ui.button("Disable").clicked() {
let id = self.info.id.clone();
let peer = peer.clone();
- self.state = GuiResourceState::Available;
+ *self.state.write().unwrap() = GuiResourceState::Disconnecting;
tokio::spawn(async move { peer.request_stop_resource(id).await });
}
}
@@ -213,7 +237,7 @@ impl EventHandler for Handler {
info.id.clone(),
GuiResource {
info,
- state: GuiResourceState::Available,
+ state: Arc::new(RwLock::new(GuiResourceState::Available)),
},
);
}
@@ -239,9 +263,20 @@ impl EventHandler for Handler {
) -> client_native_lib::DynFut<()> {
if let Some(gp) = self.peers.write().unwrap().get_mut(&peer.id) {
if let Some(gr) = gp.resources.get_mut(&resource.id) {
- gr.state = GuiResourceState::Connected
-
- // TODO
+ *gr.state.write().unwrap() = GuiResourceState::Connected;
+ match channel {
+ client_native_lib::peer::TransportChannel::Track(track) => {
+ let peer = gp.peer.clone();
+ let state = gr.state.clone();
+ tokio::task::spawn_blocking(move || {
+ play(peer, track);
+ *state.write().unwrap() = GuiResourceState::Available;
+ });
+ }
+ client_native_lib::peer::TransportChannel::DataChannel(_) => {
+ warn!("cant handle data channel yet")
+ }
+ }
}
}
Box::pin(async move {})
@@ -261,3 +296,138 @@ impl EventHandler for Handler {
Box::pin(async move {})
}
}
+
+pub fn play(peer: Arc<Peer>, track: Arc<TrackRemote>) {
+ let rid = block_on(track.stream_id());
+ let (exit_tx, exit_rx) = crossbeam_channel::unbounded();
+ let has_exited = Arc::new(AtomicBool::new(false));
+ let buffer = Arc::new(RwLock::new(VecDeque::new()));
+
+ {
+ let buffer = buffer.clone();
+ let track = track.clone();
+ tokio::spawn(async move {
+ if let Err(e) = track_to_raw(track, buffer).await {
+ error!("export error: {e}");
+ }
+ });
+ }
+ {
+ let media_ssrc = track.ssrc();
+ let peer = peer.clone();
+ let has_exited = has_exited.clone();
+ tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(Duration::from_secs(3)).await;
+ debug!("sending pli");
+ let r = peer
+ .peer_connection
+ .write_rtcp(&[Box::new(PictureLossIndication {
+ sender_ssrc: 0,
+ media_ssrc,
+ })])
+ .await;
+ if r.is_err() {
+ break;
+ }
+ if has_exited.load(Ordering::Relaxed) {
+ break;
+ }
+ }
+ debug!("pli send loop exited");
+ });
+ }
+ type State = (Arc<RwLock<VecDeque<u8>>>, Sender<()>, Arc<AtomicBool>);
+ fn open(state: &mut State, uri: &str) -> State {
+ debug!("mpv open: {uri}");
+ state.clone()
+ }
+ fn close(state: Box<State>) {
+ let _ = state.1.send(());
+ debug!("mpv close");
+ }
+ fn read(state: &mut State, buf: &mut [i8]) -> i64 {
+ let mut i = 0;
+ debug!("mpv request {} bytes", buf.len());
+ // TODO this is horrible
+ loop {
+ if state.2.load(Ordering::Relaxed) {
+ return 0;
+ }
+ let mut state = state.0.write().unwrap();
+ while let Some(e) = state.pop_front() {
+ if i >= buf.len() {
+ break;
+ }
+ buf[i] = e as i8;
+ i += 1;
+ }
+ if i != 0 {
+ break;
+ }
+ drop(state);
+ thread::sleep(Duration::from_millis(10));
+ }
+ debug!("mpv read {i} bytes");
+ i as i64
+ }
+
+ let mpv = libmpv::Mpv::new().unwrap();
+ File::create("/tmp/keks-meet-temp.conf")
+ .unwrap()
+ .write_all(include_bytes!("../mpv.conf"))
+ .unwrap();
+ mpv.load_config("/tmp/keks-meet-temp.conf").unwrap();
+
+ let proto = unsafe {
+ libmpv::protocol::Protocol::new(
+ "keks-meet-track".to_owned(),
+ (buffer, exit_tx.clone(), has_exited.clone()),
+ open,
+ close,
+ read,
+ None,
+ None,
+ )
+ };
+ let proto_ctx = mpv.create_protocol_context();
+ proto_ctx.register(proto).unwrap();
+ mpv.playlist_load_files(&[(
+ &format!("keks-meet-track://{}", rid),
+ libmpv::FileState::AppendPlay,
+ None,
+ )])
+ .unwrap();
+
+ block_on(track.onmute(move || {
+ debug!("mute");
+ let _ = exit_tx.send(());
+ Box::pin(async move {})
+ }));
+ exit_rx.recv().unwrap();
+ has_exited.store(true, Ordering::Relaxed);
+ block_on(peer.request_stop_resource(rid))
+}
+
+async fn track_to_raw(
+ track: Arc<TrackRemote>,
+ target: Arc<RwLock<VecDeque<u8>>>,
+) -> anyhow::Result<()> {
+ let mut cached_packet = H264Packet::default();
+ loop {
+ let (packet, _) = track.read_rtp().await?;
+ if !packet.payload.is_empty() {
+ let raw_payload = cached_packet.depacketize(&packet.payload)?;
+ if raw_payload.len() != 0 {
+ debug!("writing {} bytes", raw_payload.len());
+
+ let mut target = target.write().unwrap();
+ if target.len() < 10_000_000 {
+ target.extend(raw_payload.into_iter());
+ } else {
+ warn!("buffer is getting too big, dropping some data");
+ }
+ }
+ }
+ }
+}