From c0f90386900f0f767c7d0569e0f758d305af3d09 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 17 Aug 2022 17:49:31 +0200 Subject: Revert "Revert "modularize interfaces"" This reverts commit c10e88e42b26637e3c48ed781ced1382fb20fa26. --- karld/Cargo.toml | 7 +++ karld/src/demo.rs | 93 +++++++++++++++++++++++++++++++++++ karld/src/interface.rs | 79 ------------------------------ karld/src/interface/mod.rs | 10 ++++ karld/src/interface/unix.rs | 77 +++++++++++++++++++++++++++++ karld/src/interface/websocket.rs | 46 ++++++++++++++++++ karld/src/main.rs | 101 +++++---------------------------------- 7 files changed, 244 insertions(+), 169 deletions(-) create mode 100644 karld/src/demo.rs delete mode 100644 karld/src/interface.rs create mode 100644 karld/src/interface/mod.rs create mode 100644 karld/src/interface/unix.rs create mode 100644 karld/src/interface/websocket.rs (limited to 'karld') diff --git a/karld/Cargo.toml b/karld/Cargo.toml index b797dc9..9e2d926 100644 --- a/karld/Cargo.toml +++ b/karld/Cargo.toml @@ -14,3 +14,10 @@ crossbeam-channel = "0.5.4" serde_json = "1.0.81" chrono = "0.4.19" lazy_static = "1.4.0" + +websocket = { version = "0.26.5", optional = true } + +[features] +default = ["websocket", "unix"] +websocket = ["dep:websocket"] +unix = [] diff --git a/karld/src/demo.rs b/karld/src/demo.rs new file mode 100644 index 0000000..fb5b43a --- /dev/null +++ b/karld/src/demo.rs @@ -0,0 +1,93 @@ +use karlcommon::{Task, Schedule, Condition, Property}; + +use crate::TASKS; + +pub fn load_demo() { + TASKS.write().unwrap().insert( + 0, + Task { + id: 0, + name: "Mittagessen im Februar".to_string(), + description: None, + tags: vec!["Essen".to_string(), "Unwichtig".to_string()], + schedule: Schedule::Condition(Condition::And(vec![ + Condition::Equal { + modulus: None, + prop: Property::Monthofyear, + value: 1, + }, + Condition::Equal { + modulus: None, + prop: Property::Hour, + value: 12, + }, + ])), + }, + ); + TASKS.write().unwrap().insert( + 1, + Task { + id: 1, + name: "Abendessen oder Frühstück".to_string(), + description: Some("Nom nom nom".to_string()), + tags: vec!["Essen".to_string()], + schedule: Schedule::Condition(Condition::Or(vec![ + Condition::Equal { + modulus: None, + prop: Property::Hour, + value: 18, + }, + Condition::Equal { + modulus: None, + prop: Property::Hour, + value: 7, + }, + ])), + }, + ); + TASKS.write().unwrap().insert( + 2, + Task { + id: 2, + description: None, + name: "Wichtiger termin™".to_string(), + tags: vec![], + schedule: Schedule::Static(1654997366..1655007366), + }, + ); + + TASKS.write().unwrap().insert( + 3, + Task { + id: 3, + description: None, + name: "Staubsaugen".to_string(), + tags: vec!["Unwichtig".to_string()], + schedule: Schedule::Dynamic { + scheduled: None, + duration: 15 * 60, + priority: 2.0, + condition: Condition::Equal { + prop: Property::Monthofyear, + value: 6, + modulus: None, + }, + }, + }, + ); + TASKS.write().unwrap().insert( + 4, + Task { + id: 4, + description: Some("sollte ich wirklich mal machen".to_string()), + name: "Geschirrspüler ausräumen".to_string(), + tags: vec!["Unwichtig".to_string()], + schedule: Schedule::Dynamic { + scheduled: None, + duration: 15 * 60, + priority: 5.0, + condition: Condition::Never, + }, + }, + ); +} diff --git a/karld/src/interface.rs b/karld/src/interface.rs deleted file mode 100644 index e8e0b90..0000000 --- a/karld/src/interface.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::handle_packet; -use karlcommon::{socket_path, version, ClientboundPacket, ProtoError, ServerboundPacket}; -use log::{debug, error, info, warn}; -use std::io; -use std::io::{BufRead, BufReader, ErrorKind, Write}; -use std::os::unix::net::{UnixListener, UnixStream}; -use std::thread; - -pub fn network_loop() { - if socket_path().exists() { - info!("remove old socket"); - std::fs::remove_file(socket_path()).unwrap(); - } - let listener = UnixListener::bind(socket_path()).unwrap(); - info!("listening."); - let mut id_counter = 0; - - loop { - let (stream, addr) = listener.accept().unwrap(); - let id = id_counter; - id_counter += 1; - thread::spawn(move || { - info!("client connected: {:?}", addr); - if let Err(err) = handle_connection(id, stream) { - warn!("client dropped: {:?} ({})", addr, err); - } else { - info!("client dropped: {:?}", addr); - } - }); - } -} - -fn handle_connection(id: u32, mut stream: UnixStream) -> io::Result<()> { - let mut reader = BufReader::new(stream.try_clone()?); - let (responder, responses) = crossbeam_channel::unbounded(); - responder - .send(ClientboundPacket::Handshake { - version: version!(), - }) - .unwrap(); - thread::spawn(move || { - for m in responses { - debug!("{id} -> {m:?}"); - match stream - .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap())) - .map_err(|e| e.kind()) - { - Ok(_) => (), - Err(ErrorKind::BrokenPipe) => break, - Err(e) => error!("network error: {:?}", e), - } - } - }); - { - let mut buf = String::new(); - loop { - if reader.read_line(&mut buf)? == 0 { - break Ok(()); - }; - match serde_json::from_str::(buf.as_str()) { - Ok(packet) => { - debug!("{id} <- {packet:?}"); - handle_packet(id, packet, responder.clone()); - } - Err(err) => { - warn!("client error: {:?}", &err); - responder - .send(ClientboundPacket::Error(ProtoError::FormatError(format!( - "{}", - &err - )))) - .map_err(|_| io::Error::from(ErrorKind::InvalidInput))? - } - } - - buf.clear(); - } - } -} diff --git a/karld/src/interface/mod.rs b/karld/src/interface/mod.rs new file mode 100644 index 0000000..38c9895 --- /dev/null +++ b/karld/src/interface/mod.rs @@ -0,0 +1,10 @@ +use std::thread; +mod unix; +mod websocket; + +pub fn start() { + #[cfg(feature = "unix")] + thread::spawn(|| unix::run()); + #[cfg(feature = "websocket")] + thread::spawn(|| websocket::run()); +} diff --git a/karld/src/interface/unix.rs b/karld/src/interface/unix.rs new file mode 100644 index 0000000..8d038d4 --- /dev/null +++ b/karld/src/interface/unix.rs @@ -0,0 +1,77 @@ +use crate::{handle_packet, CLIENT_ID_COUNTER}; +use karlcommon::{socket_path, version, ClientboundPacket, ProtoError, ServerboundPacket}; +use log::{debug, error, info, warn}; +use std::io; +use std::io::{BufRead, BufReader, ErrorKind, Write}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::thread; + +pub fn run() { + if socket_path().exists() { + info!("remove old socket"); + std::fs::remove_file(socket_path()).unwrap(); + } + let listener = UnixListener::bind(socket_path()).unwrap(); + info!("listening."); + + loop { + let (stream, addr) = listener.accept().unwrap(); + let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + thread::spawn(move || { + info!("client connected: {:?}", addr); + if let Err(err) = handle_connection(id, stream) { + warn!("client dropped: {:?} ({})", addr, err); + } else { + info!("client dropped: {:?}", addr); + } + }); + } +} + +fn handle_connection(id: u32, mut stream: UnixStream) -> io::Result<()> { + let mut reader = BufReader::new(stream.try_clone()?); + let (responder, responses) = crossbeam_channel::unbounded(); + responder + .send(ClientboundPacket::Handshake { + version: version!(), + }) + .unwrap(); + thread::spawn(move || { + for m in responses { + debug!("{id} -> {m:?}"); + match stream + .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap())) + .map_err(|e| e.kind()) + { + Ok(_) => (), + Err(ErrorKind::BrokenPipe) => break, + Err(e) => error!("network error: {:?}", e), + } + } + }); + { + let mut buf = String::new(); + loop { + if reader.read_line(&mut buf)? == 0 { + break Ok(()); + }; + match serde_json::from_str::(buf.as_str()) { + Ok(packet) => { + debug!("{id} <- {packet:?}"); + handle_packet(id, packet, responder.clone()); + } + Err(err) => { + warn!("client error: {:?}", &err); + responder + .send(ClientboundPacket::Error(ProtoError::FormatError(format!( + "{}", + &err + )))) + .map_err(|_| io::Error::from(ErrorKind::InvalidInput))? + } + } + + buf.clear(); + } + } +} diff --git a/karld/src/interface/websocket.rs b/karld/src/interface/websocket.rs new file mode 100644 index 0000000..b69de20 --- /dev/null +++ b/karld/src/interface/websocket.rs @@ -0,0 +1,46 @@ +use std::thread; +use websocket::sync::Server; +use websocket::OwnedMessage; + +pub fn run() { + let server = Server::bind("127.0.0.1:2794").unwrap(); + + for request in server.filter_map(Result::ok) { + // Spawn a new thread for each connection. + thread::spawn(|| { + if !request.protocols().contains(&"rust-websocket".to_string()) { + request.reject().unwrap(); + return; + } + + let mut client = request.use_protocol("rust-websocket").accept().unwrap(); + + let ip = client.peer_addr().unwrap(); + + println!("Connection from {}", ip); + + let message = OwnedMessage::Text("Hello".to_string()); + client.send_message(&message).unwrap(); + + let (mut receiver, mut sender) = client.split().unwrap(); + + for message in receiver.incoming_messages() { + let message = message.unwrap(); + + match message { + OwnedMessage::Close(_) => { + let message = OwnedMessage::Close(None); + sender.send_message(&message).unwrap(); + println!("Client {} disconnected", ip); + return; + } + OwnedMessage::Ping(ping) => { + let message = OwnedMessage::Pong(ping); + sender.send_message(&message).unwrap(); + } + _ => sender.send_message(&message).unwrap(), + } + } + }); + } +} diff --git a/karld/src/main.rs b/karld/src/main.rs index 22a12f7..2b95cb7 100644 --- a/karld/src/main.rs +++ b/karld/src/main.rs @@ -2,6 +2,7 @@ #![feature(fs_try_exists)] pub mod condition; +pub mod demo; pub mod helper; pub mod interface; pub mod savestate; @@ -11,12 +12,14 @@ use chrono::NaiveDateTime; use condition::ConditionFind; use crossbeam_channel::Sender; use helper::Overlaps; -use interface::network_loop; use karlcommon::{ ClientboundPacket, Condition, Property, ProtoError, Schedule, ServerboundPacket, Task, }; use log::{debug, error, info}; -use std::{collections::HashMap, sync::RwLock}; +use std::{ + collections::HashMap, + sync::{atomic::AtomicU32, RwLock}, +}; use crate::schedule::schedule_dynamic; @@ -26,104 +29,22 @@ fn main() { if let Err(e) = savestate::load() { error!("load failed: {}", e); } - TASKS.write().unwrap().insert( - 0, - Task { - id: 0, - name: "Mittagessen im Februar".to_string(), - description: None, - tags: vec!["Essen".to_string(), "Unwichtig".to_string()], - schedule: Schedule::Condition(Condition::And(vec![ - Condition::Equal { - modulus: None, - prop: Property::Monthofyear, - value: 1, - }, - Condition::Equal { - modulus: None, - prop: Property::Hour, - value: 12, - }, - ])), - }, - ); - TASKS.write().unwrap().insert( - 1, - Task { - id: 1, - name: "Abendessen oder Frühstück".to_string(), - description: Some("Nom nom nom".to_string()), - tags: vec!["Essen".to_string()], - schedule: Schedule::Condition(Condition::Or(vec![ - Condition::Equal { - modulus: None, - prop: Property::Hour, - value: 18, - }, - Condition::Equal { - modulus: None, - prop: Property::Hour, - value: 7, - }, - ])), - }, - ); - TASKS.write().unwrap().insert( - 2, - Task { - id: 2, - description: None, - name: "Wichtiger termin™".to_string(), - tags: vec![], - schedule: Schedule::Static(1654997366..1655007366), - }, - ); - - TASKS.write().unwrap().insert( - 3, - Task { - id: 3, - description: None, - name: "Staubsaugen".to_string(), - tags: vec!["Unwichtig".to_string()], - schedule: Schedule::Dynamic { - scheduled: None, - duration: 15 * 60, - priority: 2.0, - condition: Condition::Equal { - prop: Property::Monthofyear, - value: 6, - modulus: None, - }, - }, - }, - ); - TASKS.write().unwrap().insert( - 4, - Task { - id: 4, - description: Some("sollte ich wirklich mal machen".to_string()), - name: "Geschirrspüler ausräumen".to_string(), - tags: vec!["Unwichtig".to_string()], - schedule: Schedule::Dynamic { - scheduled: None, - duration: 15 * 60, - priority: 5.0, - condition: Condition::Never, - }, - }, - ); std::thread::spawn(move || { std::thread::sleep(std::time::Duration::from_secs_f64(0.1)); schedule_dynamic(); }); - network_loop(); + interface::start(); + + loop { + std::thread::sleep(std::time::Duration::from_secs_f64(100.0)); + } } lazy_static::lazy_static! { static ref TASKS: RwLock> = RwLock::new(HashMap::new()); + static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::new(0); } pub fn handle_packet(client: u32, packet: ServerboundPacket, responder: Sender) { -- cgit v1.2.3-70-g09d2