From 32c75ad73a43dab03c7544752c9d7b8af199fb39 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Thu, 23 Nov 2023 16:00:24 +0100 Subject: [PATCH] Add WIP websocket to controller and refactor settings --- Cargo.lock | Bin 73888 -> 74409 bytes Makefile | 11 ++++ emgauwa-controller.toml | 9 +++ emgauwa-controller/Cargo.toml | 5 +- emgauwa-controller/src/main.rs | 52 +++++++++++++++++- emgauwa-controller/src/settings.rs | 17 ++---- emgauwa-core/Cargo.toml | 2 - emgauwa-core/src/settings.rs | 15 +---- emgauwa-lib/Cargo.toml | 2 + emgauwa-lib/src/handlers/v1/ws/controllers.rs | 1 + emgauwa-lib/src/utils.rs | 20 +++++++ 11 files changed, 101 insertions(+), 33 deletions(-) create mode 100644 Makefile create mode 100644 emgauwa-controller.toml diff --git a/Cargo.lock b/Cargo.lock index 45f02555a816584b22a6e3868f5069ff181e2e55..7e9e20f206c07c27c931b0cf466d394baea294cf 100644 GIT binary patch delta 355 zcmY+7&nv@W9LJ*_WQx>QE}EspqUU)&&#&*$xH{N!7thaH+NRAMm0aZdiQ<5ha-&^c zrnuqYFCe!a9Q*_3==SRUde`f^o4S2U-JLGt<5{J|K!FaFCW<*FIRqYD!ji?ALH=;Iol~Nos8q#dceY98b$fM&)b+(j`6je?sO|-YZOg{2nM^?9GkrHFm0v`I>RfMhiEK^#4%4 zG%LbtLj%*1`Cts?K@gp&gbVI07{jKGBh_dNj6J1Pf1O*urM|?F)=bQ zG&eC!F-lIgFfunuN-{J`OEfh}v#>NZFi$fzN}Sv{MSQchO{WlBa(-S~X8Pp)ep!=^ q{bwP#sgrdBIyN5&5b)Z(XLf$TWt*v-#-TCnf-XE;SSY diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9ef63f8 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ + +build: + cargo build + +sqlx: build + cargo sqlx database create + cargo sqlx migrate run + cargo sqlx prepare + +build-rpi: + cross build --target arm-unknown-linux-gnueabihf diff --git a/emgauwa-controller.toml b/emgauwa-controller.toml new file mode 100644 index 0000000..ee4d72d --- /dev/null +++ b/emgauwa-controller.toml @@ -0,0 +1,9 @@ +database = "sqlite://emgauwa-controller.sqlite" + +[core] +port = 4419 +host = "127.0.0.1" + +[logging] +level = "DEBUG" +file = "stdout" diff --git a/emgauwa-controller/Cargo.toml b/emgauwa-controller/Cargo.toml index 5a3cc45..b008756 100644 --- a/emgauwa-controller/Cargo.toml +++ b/emgauwa-controller/Cargo.toml @@ -7,8 +7,7 @@ authors = ["Tobias Reisinger "] [dependencies] emgauwa-lib = { path = "../emgauwa-lib" } -config = "0.13" - +tokio = { version = "1.34", features = ["io-std", "macros", "rt-multi-thread"] } tokio-tungstenite = "0.20" simple_logger = "4.2" @@ -21,4 +20,4 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -futures = "0.3.29" +futures = "0.3" diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index 031a0d1..020b544 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -1,5 +1,53 @@ +use std::str; + +use futures::{future, pin_mut, StreamExt}; +use futures::channel::mpsc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; + mod settings; -fn main() { - let _settings = settings::init(); +#[tokio::main] +async fn main() { + let settings = settings::init(); + + let url = format!( + "ws://{}:{}/api/v1/ws/controllers", + settings.core.host, + settings.core.port + ); + + let (stdin_tx, stdin_rx) = mpsc::unbounded(); + tokio::spawn(read_stdin(stdin_tx)); + + let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); + println!("WebSocket handshake has been successfully completed"); + + let (write, read) = ws_stream.split(); + + let stdin_to_ws = stdin_rx.map(Ok).forward(write); + let ws_to_stdout = { + read.for_each(|message| async { + let data = message.unwrap().into_text().unwrap(); + println!("{}", data); + }) + }; + + pin_mut!(stdin_to_ws, ws_to_stdout); + future::select(stdin_to_ws, ws_to_stdout).await; } + +// Our helper method which will read data from stdin and send it along the +// sender provided. +async fn read_stdin(tx: mpsc::UnboundedSender) { + let mut stdin = tokio::io::stdin(); + loop { + let mut buf = vec![0; 1024]; + let n = match stdin.read(&mut buf).await { + Err(_) | Ok(0) => break, + Ok(n) => n, + }; + buf.truncate(n); + tx.unbounded_send(Message::text(str::from_utf8(&buf).unwrap())).unwrap(); + } +} \ No newline at end of file diff --git a/emgauwa-controller/src/settings.rs b/emgauwa-controller/src/settings.rs index c05d8b6..5faa53f 100644 --- a/emgauwa-controller/src/settings.rs +++ b/emgauwa-controller/src/settings.rs @@ -1,5 +1,4 @@ -use config::Config; -use emgauwa_lib::constants; +use emgauwa_lib::{constants, utils}; use serde_derive::Deserialize; #[derive(Clone, Debug, Deserialize)] @@ -22,6 +21,7 @@ pub struct Logging { #[serde(default)] #[allow(unused)] pub struct Settings { + pub core: Core, pub database: String, pub logging: Logging, } @@ -29,6 +29,7 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Settings { + core: Core::default(), database: String::from("sqlite://emgauwa-controller.sqlite"), logging: Logging::default(), } @@ -54,15 +55,5 @@ impl Default for Logging { } pub fn init() -> Settings { - Config::builder() - .add_source(config::File::with_name("emgauwa-controller")) - .add_source( - config::Environment::with_prefix("EMGAUWA_CONTROLLER") - .prefix_separator("_") - .separator("__"), - ) - .build() - .unwrap() - .try_deserialize::() - .unwrap_or_else(|_| panic!("Error reading settings.")) + utils::load_settings("controller", "CONTROLLER") } diff --git a/emgauwa-core/Cargo.toml b/emgauwa-core/Cargo.toml index b033de4..074a0cb 100644 --- a/emgauwa-core/Cargo.toml +++ b/emgauwa-core/Cargo.toml @@ -11,8 +11,6 @@ actix = "0.13" actix-web = "4.4" actix-web-actors = "4.2" -config = "0.13" - simple_logger = "4.2" log = "0.4" diff --git a/emgauwa-core/src/settings.rs b/emgauwa-core/src/settings.rs index ea0ccb0..158909c 100644 --- a/emgauwa-core/src/settings.rs +++ b/emgauwa-core/src/settings.rs @@ -1,5 +1,4 @@ -use config::Config; -use emgauwa_lib::constants; +use emgauwa_lib::{constants, utils}; use serde_derive::Deserialize; #[derive(Clone, Debug, Deserialize)] @@ -41,15 +40,5 @@ impl Default for Logging { } pub fn init() -> Settings { - Config::builder() - .add_source(config::File::with_name("emgauwa-core")) - .add_source( - config::Environment::with_prefix("EMGAUWA_CORE") - .prefix_separator("__") - .separator("__"), - ) - .build() - .unwrap() - .try_deserialize::() - .expect("Error reading settings.") + utils::load_settings("core", "CORE") } diff --git a/emgauwa-lib/Cargo.toml b/emgauwa-lib/Cargo.toml index 96aa5c5..f79008e 100644 --- a/emgauwa-lib/Cargo.toml +++ b/emgauwa-lib/Cargo.toml @@ -14,6 +14,8 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" +config = "0.13" + chrono = { version = "0.4", features = ["serde"] } sqlx = { version = "0.7", features = ["sqlite", "runtime-async-std", "macros", "chrono"] } diff --git a/emgauwa-lib/src/handlers/v1/ws/controllers.rs b/emgauwa-lib/src/handlers/v1/ws/controllers.rs index b368b7c..fea347e 100644 --- a/emgauwa-lib/src/handlers/v1/ws/controllers.rs +++ b/emgauwa-lib/src/handlers/v1/ws/controllers.rs @@ -25,6 +25,7 @@ impl StreamHandler> for ControllerWs { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let schedules = futures::executor::block_on(get_schedules(&mut self.pool)).unwrap(); let schedules_json = serde_json::to_string(&schedules).unwrap(); + println!("{:?}", msg); match msg { Ok(Message::Ping(msg)) => ctx.pong(&msg), Ok(Message::Text(_text)) => ctx.text(schedules_json), diff --git a/emgauwa-lib/src/utils.rs b/emgauwa-lib/src/utils.rs index 7330c73..0a8e48d 100644 --- a/emgauwa-lib/src/utils.rs +++ b/emgauwa-lib/src/utils.rs @@ -1,3 +1,23 @@ pub fn vec_has_error(target: &[Result]) -> bool { target.iter().any(|t| t.is_err()) } + +pub fn load_settings(config_name: &str, env_prefix: &str) -> T +where + for<'de> T: serde::Deserialize<'de> +{ + let default_file = config::File::with_name(&format!("emgauwa-{}", config_name)) + .required(false); + + config::Config::builder() + .add_source(default_file) + .add_source( + config::Environment::with_prefix(&format!("EMGAUWA_{}", env_prefix)) + .prefix_separator("__") + .separator("__"), + ) + .build() + .expect("Error building settings") + .try_deserialize::() + .expect("Error reading settings") +} \ No newline at end of file