From 50bcac2a1b87cba2278e1010107ddb7ac504e2d1 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Wed, 29 Nov 2023 01:03:04 +0100 Subject: [PATCH] Add log-init-function and websocket retry --- Cargo.lock | Bin 65712 -> 65712 bytes emgauwa-controller/src/main.rs | 79 +++++++++++---------- emgauwa-controller/src/relay_loop.rs | 2 +- emgauwa-core/Cargo.toml | 1 - emgauwa-core/src/main.rs | 13 +--- emgauwa-core/src/utils.rs | 9 +-- emgauwa-lib/Cargo.toml | 5 +- emgauwa-lib/src/constants.rs | 2 + emgauwa-lib/src/handlers/v1/controllers.rs | 8 +-- emgauwa-lib/src/handlers/v1/ws/mod.rs | 1 - emgauwa-lib/src/models/mod.rs | 6 +- emgauwa-lib/src/utils.rs | 14 ++++ 12 files changed, 72 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8a4a3f523e35cf57b4b3a27911714d818110fdd..5505180a768a74e42d0758be8e7c50d396599fc7 100644 GIT binary patch delta 35 qcmdnc$g%;5UOBOpmS(0*Zgi8LtnbJ^S;=|->n;$x#-va;&3=b{< diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index a5937c9..6ab7430 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -1,19 +1,20 @@ -use std::str; +use std::time::Duration; use crate::relay_loop::run_relay_loop; use crate::settings::Settings; -use emgauwa_lib::db; +use emgauwa_lib::constants::WEBSOCKET_RETRY_TIMEOUT; use emgauwa_lib::db::{DbController, DbRelay}; use emgauwa_lib::handlers::v1::ws::controllers::ControllerWsAction; use emgauwa_lib::models::{Controller, FromDbModel}; use emgauwa_lib::types::ControllerUid; -use futures::channel::mpsc; -use futures::{future, pin_mut, SinkExt, StreamExt}; +use emgauwa_lib::{db, utils}; +use futures::{SinkExt, StreamExt}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -use tokio::io::AsyncReadExt; +use tokio::time; use tokio_tungstenite::tungstenite::Error; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use utils::init_logging; mod driver; mod relay_loop; @@ -51,6 +52,7 @@ async fn create_this_relay( #[tokio::main] async fn main() { let settings = settings::init(); + init_logging(&settings.logging.level); let pool = db::init(&settings.database).await; @@ -87,46 +89,47 @@ async fn main() { settings.core.host, settings.core.port ); - let (stdin_tx, stdin_rx) = mpsc::unbounded(); - tokio::spawn(read_stdin(stdin_tx)); - - let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); - - let (mut write, read) = ws_stream.split(); - - let ws_action = ControllerWsAction::Register(this); - println!("Sending action: {:?}", ws_action); - let ws_action_json = serde_json::to_string(&ws_action).unwrap(); - println!("Sending json: {}", ws_action_json); - write.send(Message::text(ws_action_json)).await.unwrap(); - let ws_to_stdout = read.for_each(handle_message); - let stdin_to_ws = stdin_rx.map(Ok).forward(write); - tokio::spawn(run_relay_loop(settings)); - pin_mut!(stdin_to_ws, ws_to_stdout); - future::select(stdin_to_ws, ws_to_stdout).await; -} - -// Our helper method which will read data from stdin and send it along the -// sender provided. -async fn read_stdin(tx: mpsc::UnboundedSender) { - let mut stdin = tokio::io::stdin(); loop { - let mut buf = vec![0; 1024]; - let n = match stdin.read(&mut buf).await { - Err(_) | Ok(0) => break, - Ok(n) => n, - }; - buf.truncate(n); - tx.unbounded_send(Message::text(str::from_utf8(&buf).unwrap())) - .unwrap(); + time::sleep(WEBSOCKET_RETRY_TIMEOUT).await; + + let connect_result = connect_async(&url).await; + if let Err(err) = connect_result { + log::warn!( + "Failed to connect to websocket: {}. Retrying in {} seconds...", + err, + WEBSOCKET_RETRY_TIMEOUT.as_secs() + ); + continue; + } + let (ws_stream, _) = connect_result.unwrap(); + + let (mut write, read) = ws_stream.split(); + + let ws_action = ControllerWsAction::Register(this.clone()); + + let ws_action_json = serde_json::to_string(&ws_action).unwrap(); + write.send(Message::text(ws_action_json)).await.unwrap(); + + let read_handler = read.for_each(handle_message); + + read_handler.await; + + log::warn!( + "Lost connection to websocket. Retrying in {} seconds...", + WEBSOCKET_RETRY_TIMEOUT.as_secs() + ); } } pub async fn handle_message(message_result: Result) { match message_result { - Ok(message) => println!("{}", message.into_text().unwrap()), - Err(err) => println!("Error: {}", err), + Ok(message) => { + if let Message::Text(msg_text) = message { + log::debug!("{}", msg_text) + } + } + Err(err) => log::debug!("Error: {}", err), } } diff --git a/emgauwa-controller/src/relay_loop.rs b/emgauwa-controller/src/relay_loop.rs index f688106..c3e1c29 100644 --- a/emgauwa-controller/src/relay_loop.rs +++ b/emgauwa-controller/src/relay_loop.rs @@ -9,6 +9,6 @@ pub async fn run_relay_loop(settings: Settings) { loop { let next_timestamp = Local::now().naive_local().time() + default_duration; time::sleep(default_duration).await; - println!("Relay loop: {}", next_timestamp) + log::debug!("Relay loop: {}", next_timestamp) } } diff --git a/emgauwa-core/Cargo.toml b/emgauwa-core/Cargo.toml index 72e3c00..cfdfe77 100644 --- a/emgauwa-core/Cargo.toml +++ b/emgauwa-core/Cargo.toml @@ -12,7 +12,6 @@ actix-web = "4.4" actix-web-actors = "4.2" actix-cors = "0.6" -simple_logger = "4.2" log = "0.4" chrono = { version = "0.4", features = ["serde"] } diff --git a/emgauwa-core/src/main.rs b/emgauwa-core/src/main.rs index e2e797e..145bdad 100644 --- a/emgauwa-core/src/main.rs +++ b/emgauwa-core/src/main.rs @@ -10,8 +10,7 @@ use actix_web::{middleware, web, App, HttpServer}; use emgauwa_lib::db::DbController; use emgauwa_lib::handlers; use emgauwa_lib::types::ConnectedControllersType; -use log::{trace, LevelFilter}; -use simple_logger::SimpleLogger; +use emgauwa_lib::utils::init_logging; mod settings; mod utils; @@ -19,15 +18,7 @@ mod utils; #[actix_web::main] async fn main() -> std::io::Result<()> { let settings = settings::init(); - - let log_level: LevelFilter = - LevelFilter::from_str(&settings.logging.level).expect("Error parsing log level."); - trace!("Log level set to {:?}", log_level); - - SimpleLogger::new() - .with_level(log_level) - .init() - .expect("Error initializing logger."); + init_logging(&settings.logging.level); let listener = TcpListener::bind(format!("{}:{}", settings.host, settings.port)) .expect("Error creating listener"); diff --git a/emgauwa-core/src/utils.rs b/emgauwa-core/src/utils.rs index 4328baf..9fafd27 100644 --- a/emgauwa-core/src/utils.rs +++ b/emgauwa-core/src/utils.rs @@ -1,4 +1,5 @@ use crate::settings::Settings; +use log::log; use std::ffi::CString; use std::io::{Error, ErrorKind}; @@ -26,11 +27,11 @@ fn drop_privileges_group(group: &str) -> Result<(), Error> { if let Ok(cstr) = CString::new(group.as_bytes()) { let p = unsafe { libc::getgrnam(cstr.as_ptr()) }; if p.is_null() { - eprintln!("Unable to getgrnam of group: {}", group); + log::error!("Unable to getgrnam of group: {}", group); return Err(Error::last_os_error()); } if unsafe { libc::setgid((*p).gr_gid) } != 0 { - eprintln!("Unable to setgid of group: {}", group); + log::error!("Unable to setgid of group: {}", group); return Err(Error::last_os_error()); } } else { @@ -47,11 +48,11 @@ fn drop_privileges_user(user: &str) -> Result<(), Error> { if let Ok(cstr) = CString::new(user.as_bytes()) { let p = unsafe { libc::getpwnam(cstr.as_ptr()) }; if p.is_null() { - eprintln!("Unable to getpwnam of user: {}", user); + log::error!("Unable to getpwnam of user: {}", user); return Err(Error::last_os_error()); } if unsafe { libc::setuid((*p).pw_uid) } != 0 { - eprintln!("Unable to setuid of user: {}", user); + log::error!("Unable to setuid of user: {}", user); return Err(Error::last_os_error()); } } else { diff --git a/emgauwa-lib/Cargo.toml b/emgauwa-lib/Cargo.toml index c3bac8f..bca69c9 100644 --- a/emgauwa-lib/Cargo.toml +++ b/emgauwa-lib/Cargo.toml @@ -14,13 +14,14 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" +simple_logger = "4.2" +log = "0.4" + config = "0.13" chrono = { version = "0.4", features = ["serde"] } sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] } libsqlite3-sys = { version = "*", features = ["bundled"] } - -log = "0.4" uuid = "1.6" futures = "0.3" diff --git a/emgauwa-lib/src/constants.rs b/emgauwa-lib/src/constants.rs index e6d9657..acf6777 100644 --- a/emgauwa-lib/src/constants.rs +++ b/emgauwa-lib/src/constants.rs @@ -3,3 +3,5 @@ use std::time::Duration; pub const DEFAULT_PORT: u16 = 4419; pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15); + +pub const WEBSOCKET_RETRY_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/emgauwa-lib/src/handlers/v1/controllers.rs b/emgauwa-lib/src/handlers/v1/controllers.rs index 0333ba6..013251a 100644 --- a/emgauwa-lib/src/handlers/v1/controllers.rs +++ b/emgauwa-lib/src/handlers/v1/controllers.rs @@ -9,18 +9,12 @@ use crate::models::{convert_db_list, Controller}; use crate::types::ConnectedControllersType; #[get("/api/v1/controllers")] -pub async fn index( - pool: web::Data>, - connected_controllers: web::Data, -) -> Result { +pub async fn index(pool: web::Data>) -> Result { let mut pool_conn = pool.acquire().await?; let db_controllers = DbController::get_all(&mut pool_conn).await?; let controllers: Vec = convert_db_list(&mut pool_conn, db_controllers)?; - let data = connected_controllers.lock().unwrap(); - println!("{:?}", *data); - Ok(HttpResponse::Ok().json(controllers)) } diff --git a/emgauwa-lib/src/handlers/v1/ws/mod.rs b/emgauwa-lib/src/handlers/v1/ws/mod.rs index 4022c41..f1d194e 100644 --- a/emgauwa-lib/src/handlers/v1/ws/mod.rs +++ b/emgauwa-lib/src/handlers/v1/ws/mod.rs @@ -26,6 +26,5 @@ pub async fn ws_controllers( stream, ) .map_err(|_| ApiError::InternalError(String::from("error starting websocket"))); - println!("{:?}", resp); resp } diff --git a/emgauwa-lib/src/models/mod.rs b/emgauwa-lib/src/models/mod.rs index 5e91ca2..8e55dc6 100644 --- a/emgauwa-lib/src/models/mod.rs +++ b/emgauwa-lib/src/models/mod.rs @@ -17,14 +17,14 @@ pub trait FromDbModel { Self: Sized; } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Schedule { #[serde(flatten)] pub s: DbSchedule, pub tags: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Relay { #[serde(flatten)] pub r: DbRelay, @@ -33,7 +33,7 @@ pub struct Relay { pub tags: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Controller { #[serde(flatten)] pub c: DbController, diff --git a/emgauwa-lib/src/utils.rs b/emgauwa-lib/src/utils.rs index b74f68b..b6e7889 100644 --- a/emgauwa-lib/src/utils.rs +++ b/emgauwa-lib/src/utils.rs @@ -1,3 +1,7 @@ +use log::LevelFilter; +use simple_logger::SimpleLogger; +use std::str::FromStr; + pub fn load_settings(config_name: &str, env_prefix: &str) -> T where for<'de> T: serde::Deserialize<'de>, @@ -16,3 +20,13 @@ where .try_deserialize::() .expect("Error reading settings") } + +pub fn init_logging(level: &str) { + let log_level: LevelFilter = LevelFilter::from_str(level).expect("Error parsing log level."); + log::trace!("Log level set to {:?}", log_level); + + SimpleLogger::new() + .with_level(log_level) + .init() + .expect("Error initializing logger."); +}