From 6f8d63e7beaaa6613a0944be8720fc893d5dcd9c Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Tue, 5 Dec 2023 16:11:40 +0100 Subject: [PATCH] Fix bugs and add controller action for controller ws --- emgauwa-controller/src/main.rs | 58 ++-------- emgauwa-controller/src/relay_loop.rs | 2 +- emgauwa-controller/src/ws/mod.rs | 109 ++++++++++++++++++ emgauwa-core/src/app_state.rs | 5 +- emgauwa-core/src/handlers/v1/controllers.rs | 22 +++- .../handlers/v1/ws/controllers/handlers.rs | 6 +- .../src/handlers/v1/ws/controllers/mod.rs | 12 +- emgauwa-lib/src/db/controllers.rs | 9 ++ emgauwa-lib/src/db/mod.rs | 6 +- emgauwa-lib/src/db/relays.rs | 2 +- emgauwa-lib/src/types/mod.rs | 2 + 11 files changed, 177 insertions(+), 56 deletions(-) create mode 100644 emgauwa-controller/src/ws/mod.rs diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index 5a2b1e1..76453f1 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -2,23 +2,21 @@ use emgauwa_lib::constants::WEBSOCKET_RETRY_TIMEOUT; use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule}; use emgauwa_lib::errors::EmgauwaError; use emgauwa_lib::models::{Controller, FromDbModel}; -use emgauwa_lib::types::{ControllerUid, ControllerWsAction}; +use emgauwa_lib::types::ControllerUid; use emgauwa_lib::{db, utils}; -use futures::{SinkExt, StreamExt}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use tokio::time; -use tokio_tungstenite::connect_async; -use tokio_tungstenite::tungstenite::protocol::Message; -use tokio_tungstenite::tungstenite::Error; use utils::init_logging; use crate::relay_loop::run_relay_loop; use crate::settings::Settings; +use crate::ws::run_websocket; mod driver; mod relay_loop; mod settings; +mod ws; async fn create_this_controller( conn: &mut PoolConnection, @@ -57,34 +55,6 @@ async fn create_this_relay( Ok(relay) } -async fn run_websocket(this: Controller, url: &str) -> Result<(), EmgauwaError> { - match connect_async(url).await { - Ok(connection) => { - let (ws_stream, _) = connection; - - let (mut write, read) = ws_stream.split(); - - let ws_action = ControllerWsAction::Register(this.clone()); - - let ws_action_json = serde_json::to_string(&ws_action)?; - if let Err(err) = write.send(Message::text(ws_action_json)).await { - log::error!("Failed to register at websocket: {}", err); - return Ok(()); - } - - let read_handler = read.for_each(handle_message); - - read_handler.await; - - log::warn!("Lost connection to websocket"); - } - Err(err) => { - log::warn!("Failed to connect to websocket: {}", err,); - } - } - Ok(()) -} - #[tokio::main] async fn main() -> Result<(), std::io::Error> { let settings = settings::init()?; @@ -128,8 +98,6 @@ async fn main() -> Result<(), std::io::Error> { .await .map_err(EmgauwaError::from)?; - let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?; - let url = format!( "ws://{}:{}/api/v1/ws/controllers", settings.core.host, settings.core.port @@ -138,7 +106,14 @@ async fn main() -> Result<(), std::io::Error> { tokio::spawn(run_relay_loop(settings)); loop { - let run_result = run_websocket(this.clone(), &url).await; + let db_controller = db_controller + .reload(&mut conn) + .await + .map_err(EmgauwaError::from)?; + let this = + Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?; + + let run_result = run_websocket(pool.clone(), this.clone(), &url).await; if let Err(err) = run_result { log::error!("Error running websocket: {}", err); } @@ -150,14 +125,3 @@ async fn main() -> Result<(), std::io::Error> { time::sleep(WEBSOCKET_RETRY_TIMEOUT).await; } } - -async fn handle_message(message_result: Result) { - match message_result { - Ok(message) => { - if let Message::Text(msg_text) = message { - log::debug!("{}", msg_text) - } - } - Err(err) => log::debug!("Error: {}", err), - } -} diff --git a/emgauwa-controller/src/relay_loop.rs b/emgauwa-controller/src/relay_loop.rs index b9ad749..42b5009 100644 --- a/emgauwa-controller/src/relay_loop.rs +++ b/emgauwa-controller/src/relay_loop.rs @@ -8,7 +8,7 @@ use crate::settings::Settings; #[allow(unused_variables)] pub async fn run_relay_loop(settings: Settings) { - let default_duration = Duration::from_millis(1000); + let default_duration = Duration::from_millis(10000); loop { let next_timestamp = Instant::now() + default_duration; time::sleep_until(next_timestamp).await; diff --git a/emgauwa-controller/src/ws/mod.rs b/emgauwa-controller/src/ws/mod.rs new file mode 100644 index 0000000..f0c5d55 --- /dev/null +++ b/emgauwa-controller/src/ws/mod.rs @@ -0,0 +1,109 @@ +use emgauwa_lib::db::DbController; +use emgauwa_lib::errors::{DatabaseError, EmgauwaError}; +use emgauwa_lib::models::Controller; +use emgauwa_lib::types::ControllerWsAction; +use futures::{SinkExt, StreamExt}; +use sqlx::pool::PoolConnection; +use sqlx::{Pool, Sqlite}; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{connect_async, tungstenite}; + +pub async fn run_websocket( + pool: Pool, + this: Controller, + url: &str, +) -> Result<(), EmgauwaError> { + match connect_async(url).await { + Ok(connection) => { + let (ws_stream, _) = connection; + + let (mut write, read) = ws_stream.split(); + + let ws_action = ControllerWsAction::Register(this.clone()); + + let ws_action_json = serde_json::to_string(&ws_action)?; + if let Err(err) = write.send(Message::text(ws_action_json)).await { + log::error!("Failed to register at websocket: {}", err); + return Ok(()); + } + + let read_handler = read.for_each(|msg| handle_message(pool.clone(), this.clone(), msg)); + + read_handler.await; + + log::warn!("Lost connection to websocket"); + } + Err(err) => { + log::warn!("Failed to connect to websocket: {}", err,); + } + } + Ok(()) +} + +async fn handle_message( + pool: Pool, + this: Controller, + message_result: Result, +) { + let msg = match message_result { + Ok(msg) => msg, + Err(err) => { + log::error!("Error reading message: {}", err); + return; + } + }; + match msg { + Message::Text(text) => match serde_json::from_str(&text) { + Ok(action) => { + log::debug!("Received action: {:?}", action); + let mut pool_conn = match pool.acquire().await { + Ok(conn) => conn, + Err(err) => { + log::error!("Failed to acquire database connection: {:?}", err); + return; + } + }; + let action_res = handle_action(&mut pool_conn, this, action).await; + if let Err(e) = action_res { + log::error!("Error handling action: {:?}", e); + } + } + Err(e) => { + log::error!("Error deserializing action: {:?}", e); + } + }, + _ => (), + } +} + +pub async fn handle_action( + conn: &mut PoolConnection, + this: Controller, + action: ControllerWsAction, +) -> Result<(), EmgauwaError> { + match action { + ControllerWsAction::Controller(controller) => { + handle_controller(conn, this, controller).await + } + _ => Ok(()), + } +} + +pub async fn handle_controller( + conn: &mut PoolConnection, + this: Controller, + controller: Controller, +) -> Result<(), EmgauwaError> { + if controller.c.uid != this.c.uid { + return Err(EmgauwaError::Other(String::from( + "Controller UID mismatch during update", + ))); + } + DbController::get_by_uid(conn, &controller.c.uid) + .await? + .ok_or(DatabaseError::NotFound)? + .update(conn, controller.c.name.as_str(), this.c.relay_count) + .await?; + + Ok(()) +} diff --git a/emgauwa-core/src/app_state.rs b/emgauwa-core/src/app_state.rs index f309420..a30712a 100644 --- a/emgauwa-core/src/app_state.rs +++ b/emgauwa-core/src/app_state.rs @@ -51,7 +51,8 @@ impl Handler for AppServer { fn handle(&mut self, msg: DisconnectController, _ctx: &mut Self::Context) -> Self::Result { let mut pool_conn = block_on(self.pool.acquire())?; - if let Some((controller, _)) = self.connected_controllers.remove(&msg.controller_uid) { + if let Some((controller, address)) = self.connected_controllers.remove(&msg.controller_uid) + { if let Err(err) = block_on(controller.c.update_active(&mut pool_conn, false)) { log::error!( "Failed to mark controller {} as inactive: {:?}", @@ -59,6 +60,7 @@ impl Handler for AppServer { err ); } + block_on(address.send(ControllerWsAction::Disconnect))??; } Ok(()) } @@ -79,6 +81,7 @@ impl Handler for AppServer { type Result = Result<(), EmgauwaError>; fn handle(&mut self, msg: Action, _ctx: &mut Self::Context) -> Self::Result { + log::debug!("Forwarding action: {:?}", msg.action); if let Some((_, address)) = self.connected_controllers.get(&msg.controller_uid) { block_on(address.send(msg.action))? } else { diff --git a/emgauwa-core/src/handlers/v1/controllers.rs b/emgauwa-core/src/handlers/v1/controllers.rs index 8157c19..bfccf4a 100644 --- a/emgauwa-core/src/handlers/v1/controllers.rs +++ b/emgauwa-core/src/handlers/v1/controllers.rs @@ -1,10 +1,14 @@ +use actix::Addr; use actix_web::{delete, get, put, web, HttpResponse}; use emgauwa_lib::db::DbController; use emgauwa_lib::errors::{DatabaseError, EmgauwaError}; use emgauwa_lib::models::{convert_db_list, Controller, FromDbModel}; -use emgauwa_lib::types::{ControllerUid, RequestUpdateController}; +use emgauwa_lib::types::{ControllerUid, ControllerWsAction, RequestUpdateController}; use sqlx::{Pool, Sqlite}; +use crate::app_state; +use crate::app_state::AppServer; + #[get("/api/v1/controllers")] pub async fn index(pool: web::Data>) -> Result { let mut pool_conn = pool.acquire().await?; @@ -37,6 +41,7 @@ pub async fn show( #[put("/api/v1/controllers/{controller_id}")] pub async fn update( pool: web::Data>, + app_server: web::Data>, path: web::Path<(String,)>, data: web::Json, ) -> Result { @@ -54,12 +59,21 @@ pub async fn update( .await?; let return_controller = Controller::from_db_model(&mut pool_conn, controller)?; + + app_server + .send(app_state::Action { + controller_uid: uid.clone(), + action: ControllerWsAction::Controller(return_controller.clone()), + }) + .await??; + Ok(HttpResponse::Ok().json(return_controller)) } #[delete("/api/v1/controllers/{controller_id}")] pub async fn delete( pool: web::Data>, + app_server: web::Data>, path: web::Path<(String,)>, ) -> Result { let mut pool_conn = pool.acquire().await?; @@ -67,6 +81,12 @@ pub async fn delete( let (controller_uid,) = path.into_inner(); let uid = ControllerUid::try_from(controller_uid.as_str())?; + app_server + .send(app_state::DisconnectController { + controller_uid: uid.clone(), + }) + .await??; + DbController::delete_by_uid(&mut pool_conn, uid).await?; Ok(HttpResponse::Ok().json("controller got deleted")) } diff --git a/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs b/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs index 6408727..a2e12de 100644 --- a/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs +++ b/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs @@ -16,7 +16,11 @@ impl ControllerWs { ctx: &mut ::Context, controller: Controller, ) -> Result<(), EmgauwaError> { - log::info!("Registering controller: {:?}", controller); + log::info!( + "Registering controller: {} ({})", + controller.c.name, + controller.c.uid + ); let c = &controller.c; let controller_db = block_on(DbController::get_by_uid_or_create( conn, diff --git a/emgauwa-core/src/handlers/v1/ws/controllers/mod.rs b/emgauwa-core/src/handlers/v1/ws/controllers/mod.rs index 4a4e020..e48030b 100644 --- a/emgauwa-core/src/handlers/v1/ws/controllers/mod.rs +++ b/emgauwa-core/src/handlers/v1/ws/controllers/mod.rs @@ -78,8 +78,16 @@ impl Handler for ControllerWs { type Result = Result<(), EmgauwaError>; fn handle(&mut self, action: ControllerWsAction, ctx: &mut Self::Context) -> Self::Result { - let action_json = serde_json::to_string(&action)?; - ctx.text(action_json); + match action { + ControllerWsAction::Disconnect => { + ctx.close(None); + ctx.stop(); + } + _ => { + let action_json = serde_json::to_string(&action)?; + ctx.text(action_json); + } + } Ok(()) } } diff --git a/emgauwa-lib/src/db/controllers.rs b/emgauwa-lib/src/db/controllers.rs index 1b316d7..e9a0732 100644 --- a/emgauwa-lib/src/db/controllers.rs +++ b/emgauwa-lib/src/db/controllers.rs @@ -172,4 +172,13 @@ impl DbController { .await?; Ok(()) } + + pub async fn reload( + &self, + conn: &mut PoolConnection, + ) -> Result { + Self::get(conn, self.id) + .await? + .ok_or(DatabaseError::NotFound) + } } diff --git a/emgauwa-lib/src/db/mod.rs b/emgauwa-lib/src/db/mod.rs index 0b0650d..eeefd57 100644 --- a/emgauwa-lib/src/db/mod.rs +++ b/emgauwa-lib/src/db/mod.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use sqlx::migrate::Migrator; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use sqlx::{Pool, Sqlite}; +use sqlx::{ConnectOptions, Pool, Sqlite}; mod controllers; mod junction_relay_schedule; @@ -30,7 +30,9 @@ pub async fn run_migrations(pool: &Pool) -> Result<(), EmgauwaError> { } pub async fn init(db: &str) -> Result, EmgauwaError> { - let options = SqliteConnectOptions::from_str(db)?.create_if_missing(true); + let options = SqliteConnectOptions::from_str(db)? + .create_if_missing(true) + .log_statements(log::LevelFilter::Trace); let pool: Pool = SqlitePoolOptions::new() .acquire_timeout(std::time::Duration::from_secs(1)) diff --git a/emgauwa-lib/src/db/relays.rs b/emgauwa-lib/src/db/relays.rs index 6a9bb51..1c9b18f 100644 --- a/emgauwa-lib/src/db/relays.rs +++ b/emgauwa-lib/src/db/relays.rs @@ -71,7 +71,7 @@ impl DbRelay { conn: &mut PoolConnection, tag: &DbTag, ) -> Result, DatabaseError> { - sqlx::query_as!(DbRelay, "SELECT schedule.* FROM relays AS schedule INNER JOIN junction_tag ON junction_tag.schedule_id = schedule.id WHERE junction_tag.tag_id = ?", tag.id) + sqlx::query_as!(DbRelay, "SELECT relay.* FROM relays AS relay INNER JOIN junction_tag ON junction_tag.relay_id = relay.id WHERE junction_tag.tag_id = ?", tag.id) .fetch_all(conn.deref_mut()) .await .map_err(DatabaseError::from) diff --git a/emgauwa-lib/src/types/mod.rs b/emgauwa-lib/src/types/mod.rs index 0f936eb..dcf88d1 100644 --- a/emgauwa-lib/src/types/mod.rs +++ b/emgauwa-lib/src/types/mod.rs @@ -18,6 +18,8 @@ pub type Weekday = i64; #[rtype(result = "Result<(), EmgauwaError>")] pub enum ControllerWsAction { Register(Controller), + Disconnect, Schedules(Vec), Relays(Vec), + Controller(Controller), }