1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
pub mod protocol;
pub mod room;
use hyper::StatusCode;
use listenfd::ListenFd;
use log::error;
use room::Room;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::RwLock;
use warp::hyper::Server;
use warp::ws::WebSocket;
use warp::{Filter, Rejection, Reply};
type Rooms = Arc<RwLock<HashMap<String, Arc<Room>>>>;
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(run());
}
async fn run() {
env_logger::init_from_env("LOG");
let rooms = Rooms::default();
let rooms = warp::any().map(move || rooms.clone());
let signaling = warp::path!("signaling" / String)
.and(rooms)
.and(warp::ws())
.map(signaling_connect);
let index: _ = warp::path!().and(warp::fs::file("../client-web/public/start.html"));
let room: _ = warp::path!("room").and(warp::fs::file("../client-web/public/app.html"));
let assets: _ = warp::path("assets").and(warp::fs::dir("../client-web/public/assets"));
let favicon: _ = warp::path!("favicon.ico").map(|| "");
let routes = assets
.or(room)
.or(index)
.or(signaling)
.or(favicon)
.recover(handle_rejection)
.with(warp::log("stuff"));
// if listender fd is passed from the outside world, use it.
let mut listenfd = ListenFd::from_env();
let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() {
Server::from_tcp(l).unwrap()
} else {
Server::bind(&([127, 0, 0, 1], 8080).into())
};
let service = warp::service(routes);
server
.serve(hyper::service::make_service_fn(|_| {
let service = service.clone();
async move { Ok::<_, Infallible>(service) }
}))
.await
.unwrap();
}
async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
let code = if err.is_not_found() {
StatusCode::NOT_FOUND
} else if let Some(_) = err.find::<warp::filters::body::BodyDeserializeError>() {
StatusCode::BAD_REQUEST
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
StatusCode::METHOD_NOT_ALLOWED
} else {
error!("unhandled rejection: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
};
let json = warp::reply::html(format!(
"<!DOCTYPE html><html><head></head><body><pre>{}</pre></body></html>",
code.canonical_reason().unwrap_or("!?")
));
Ok(warp::reply::with_status(json, code))
}
fn signaling_connect(rname: String, rooms: Rooms, ws: warp::ws::Ws) -> impl Reply {
async fn inner(sock: WebSocket, rname: String, rooms: Rooms) {
let guard = rooms.read().await;
let room = match guard.get(&rname) {
Some(r) => r.to_owned(),
None => {
drop(guard); // make sure read-lock is dropped to avoid deadlock
let mut guard = rooms.write().await;
guard.insert(rname.to_owned(), Default::default());
guard.get(&rname).unwrap().to_owned() // TODO never expect this to always work!!
}
};
room.client_connect(sock).await;
if room.should_remove().await {
rooms.write().await.remove(&rname);
}
}
ws.on_upgrade(move |sock| inner(sock, rname, rooms))
}
|