From 83c1f033d5a98838caeb701aa648c1862b017190 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Thu, 7 Dec 2023 01:32:20 +0100 Subject: [PATCH] Add AppState to Controller and split up models --- Cargo.lock | Bin 65721 -> 65731 bytes Makefile | 6 +- emgauwa-controller/Cargo.toml | 2 + emgauwa-controller/src/app_state.rs | 48 ++++++++++ emgauwa-controller/src/main.rs | 22 ++--- emgauwa-controller/src/utils.rs | 9 ++ emgauwa-controller/src/ws/mod.rs | 31 ++++--- emgauwa-lib/src/models/controller.rs | 51 +++++++++++ emgauwa-lib/src/models/mod.rs | 130 ++------------------------- emgauwa-lib/src/models/relay.rs | 70 +++++++++++++++ emgauwa-lib/src/models/schedule.rs | 41 +++++++++ 11 files changed, 260 insertions(+), 150 deletions(-) create mode 100644 emgauwa-controller/src/app_state.rs create mode 100644 emgauwa-controller/src/utils.rs create mode 100644 emgauwa-lib/src/models/controller.rs create mode 100644 emgauwa-lib/src/models/relay.rs create mode 100644 emgauwa-lib/src/models/schedule.rs diff --git a/Cargo.lock b/Cargo.lock index 7e276a27b35fed72b4823994adf660c6516e98f9..09b3924a1d10bcbb910ea59ff115a1c15c7c083c 100644 GIT binary patch delta 22 ecmdnl$a1)mWkb5-WFIH4$sJA_oBJKL_5%QELJ0c+ delta 18 acmX@y$g;DMWkb5-"] [dependencies] emgauwa-lib = { path = "../emgauwa-lib" } +actix = "0.13" + tokio = { version = "1.34", features = ["io-std", "macros", "rt-multi-thread"] } tokio-tungstenite = "0.20" diff --git a/emgauwa-controller/src/app_state.rs b/emgauwa-controller/src/app_state.rs new file mode 100644 index 0000000..ff4f90f --- /dev/null +++ b/emgauwa-controller/src/app_state.rs @@ -0,0 +1,48 @@ +use actix::{Actor, Context, Handler, Message}; +use emgauwa_lib::errors::EmgauwaError; +use emgauwa_lib::models::Controller; +use futures::executor::block_on; +use sqlx::{Pool, Sqlite}; + +#[derive(Message)] +#[rtype(result = "Result<(), EmgauwaError>")] +pub struct Reload {} + +#[derive(Message)] +#[rtype(result = "Controller")] +pub struct GetThis {} + +pub struct AppState { + pub pool: Pool, + pub this: Controller, +} + +impl AppState { + pub fn new(pool: Pool, this: Controller) -> AppState { + AppState { pool, this } + } +} + +impl Actor for AppState { + type Context = Context; +} + +impl Handler for AppState { + type Result = Result<(), EmgauwaError>; + + fn handle(&mut self, _msg: Reload, _ctx: &mut Self::Context) -> Self::Result { + let mut pool_conn = block_on(self.pool.acquire())?; + + self.this.reload(&mut pool_conn)?; + + Ok(()) + } +} + +impl Handler for AppState { + type Result = Controller; + + fn handle(&mut self, _msg: GetThis, _ctx: &mut Self::Context) -> Self::Result { + self.this.clone() + } +} diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index 76453f1..9d820e6 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -1,21 +1,24 @@ +use actix::Actor; use emgauwa_lib::constants::WEBSOCKET_RETRY_TIMEOUT; +use emgauwa_lib::db; use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule}; use emgauwa_lib::errors::EmgauwaError; use emgauwa_lib::models::{Controller, FromDbModel}; use emgauwa_lib::types::ControllerUid; -use emgauwa_lib::{db, utils}; +use emgauwa_lib::utils::init_logging; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use tokio::time; -use utils::init_logging; use crate::relay_loop::run_relay_loop; use crate::settings::Settings; use crate::ws::run_websocket; +mod app_state; mod driver; mod relay_loop; mod settings; +mod utils; mod ws; async fn create_this_controller( @@ -55,7 +58,7 @@ async fn create_this_relay( Ok(relay) } -#[tokio::main] +#[actix::main] async fn main() -> Result<(), std::io::Error> { let settings = settings::init()?; init_logging(&settings.logging.level)?; @@ -98,6 +101,10 @@ async fn main() -> Result<(), std::io::Error> { .await .map_err(EmgauwaError::from)?; + let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?; + + let app_state = app_state::AppState::new(pool.clone(), this).start(); + let url = format!( "ws://{}:{}/api/v1/ws/controllers", settings.core.host, settings.core.port @@ -106,14 +113,7 @@ async fn main() -> Result<(), std::io::Error> { tokio::spawn(run_relay_loop(settings)); loop { - let db_controller = db_controller - .reload(&mut conn) - .await - .map_err(EmgauwaError::from)?; - let this = - Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?; - - let run_result = run_websocket(pool.clone(), this.clone(), &url).await; + let run_result = run_websocket(pool.clone(), &app_state, &url).await; if let Err(err) = run_result { log::error!("Error running websocket: {}", err); } diff --git a/emgauwa-controller/src/utils.rs b/emgauwa-controller/src/utils.rs new file mode 100644 index 0000000..df17faa --- /dev/null +++ b/emgauwa-controller/src/utils.rs @@ -0,0 +1,9 @@ +use actix::Addr; +use emgauwa_lib::errors::EmgauwaError; +use emgauwa_lib::models::Controller; + +use crate::app_state::{AppState, GetThis}; + +pub async fn get_this(app_state: &Addr) -> Result { + app_state.send(GetThis {}).await.map_err(EmgauwaError::from) +} diff --git a/emgauwa-controller/src/ws/mod.rs b/emgauwa-controller/src/ws/mod.rs index f0c5d55..0c67fab 100644 --- a/emgauwa-controller/src/ws/mod.rs +++ b/emgauwa-controller/src/ws/mod.rs @@ -1,3 +1,4 @@ +use actix::Addr; use emgauwa_lib::db::DbController; use emgauwa_lib::errors::{DatabaseError, EmgauwaError}; use emgauwa_lib::models::Controller; @@ -8,9 +9,13 @@ use sqlx::{Pool, Sqlite}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, tungstenite}; +use crate::app_state; +use crate::app_state::AppState; +use crate::utils::get_this; + pub async fn run_websocket( pool: Pool, - this: Controller, + app_state: &Addr, url: &str, ) -> Result<(), EmgauwaError> { match connect_async(url).await { @@ -19,7 +24,7 @@ pub async fn run_websocket( let (mut write, read) = ws_stream.split(); - let ws_action = ControllerWsAction::Register(this.clone()); + let ws_action = ControllerWsAction::Register(get_this(app_state).await?); let ws_action_json = serde_json::to_string(&ws_action)?; if let Err(err) = write.send(Message::text(ws_action_json)).await { @@ -27,7 +32,7 @@ pub async fn run_websocket( return Ok(()); } - let read_handler = read.for_each(|msg| handle_message(pool.clone(), this.clone(), msg)); + let read_handler = read.for_each(|msg| handle_message(pool.clone(), app_state, msg)); read_handler.await; @@ -42,7 +47,7 @@ pub async fn run_websocket( async fn handle_message( pool: Pool, - this: Controller, + app_state: &Addr, message_result: Result, ) { let msg = match message_result { @@ -52,8 +57,8 @@ async fn handle_message( return; } }; - match msg { - Message::Text(text) => match serde_json::from_str(&text) { + if let Message::Text(text) = msg { + match serde_json::from_str(&text) { Ok(action) => { log::debug!("Received action: {:?}", action); let mut pool_conn = match pool.acquire().await { @@ -63,7 +68,7 @@ async fn handle_message( return; } }; - let action_res = handle_action(&mut pool_conn, this, action).await; + let action_res = handle_action(&mut pool_conn, app_state, action).await; if let Err(e) = action_res { log::error!("Error handling action: {:?}", e); } @@ -71,19 +76,18 @@ async fn handle_message( Err(e) => { log::error!("Error deserializing action: {:?}", e); } - }, - _ => (), + } } } pub async fn handle_action( conn: &mut PoolConnection, - this: Controller, + app_state: &Addr, action: ControllerWsAction, ) -> Result<(), EmgauwaError> { match action { ControllerWsAction::Controller(controller) => { - handle_controller(conn, this, controller).await + handle_controller(conn, app_state, controller).await } _ => Ok(()), } @@ -91,9 +95,10 @@ pub async fn handle_action( pub async fn handle_controller( conn: &mut PoolConnection, - this: Controller, + app_state: &Addr, controller: Controller, ) -> Result<(), EmgauwaError> { + let this = get_this(app_state).await?; if controller.c.uid != this.c.uid { return Err(EmgauwaError::Other(String::from( "Controller UID mismatch during update", @@ -105,5 +110,7 @@ pub async fn handle_controller( .update(conn, controller.c.name.as_str(), this.c.relay_count) .await?; + app_state.send(app_state::Reload {}).await??; + Ok(()) } diff --git a/emgauwa-lib/src/models/controller.rs b/emgauwa-lib/src/models/controller.rs new file mode 100644 index 0000000..36148ee --- /dev/null +++ b/emgauwa-lib/src/models/controller.rs @@ -0,0 +1,51 @@ +use actix::MessageResponse; +use futures::executor::block_on; +use serde_derive::{Deserialize, Serialize}; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; + +use crate::db::DbController; +use crate::errors::{DatabaseError, EmgauwaError}; +use crate::models::{convert_db_list_cache, FromDbModel, Relay}; + +#[derive(Serialize, Deserialize, Debug, Clone, MessageResponse)] +pub struct Controller { + #[serde(flatten)] + pub c: DbController, + pub relays: Vec, +} + +impl FromDbModel for Controller { + type DbModel = DbController; + type DbModelCache = Vec; + + fn from_db_model( + conn: &mut PoolConnection, + db_model: Self::DbModel, + ) -> Result { + let relays_db = block_on(db_model.get_relays(conn))?; + let cache = convert_db_list_cache(conn, relays_db, db_model.clone())?; + Self::from_db_model_cache(conn, db_model, cache) + } + + fn from_db_model_cache( + _conn: &mut PoolConnection, + db_model: Self::DbModel, + cache: Self::DbModelCache, + ) -> Result { + Ok(Controller { + c: db_model, + relays: cache, + }) + } +} + +impl Controller { + pub fn reload(&mut self, conn: &mut PoolConnection) -> Result<(), EmgauwaError> { + self.c = block_on(self.c.reload(conn))?; + for relay in &mut self.relays { + relay.reload(conn)?; + } + Ok(()) + } +} diff --git a/emgauwa-lib/src/models/mod.rs b/emgauwa-lib/src/models/mod.rs index 031e44f..7333c28 100644 --- a/emgauwa-lib/src/models/mod.rs +++ b/emgauwa-lib/src/models/mod.rs @@ -1,12 +1,14 @@ -use futures::executor; -use serde_derive::{Deserialize, Serialize}; +mod controller; +mod relay; +mod schedule; + +pub use controller::Controller; +pub use relay::Relay; +pub use schedule::Schedule; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -use crate::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule}; use crate::errors::DatabaseError; -use crate::types::{ControllerUid, Weekday}; -use crate::utils; pub trait FromDbModel { type DbModel: Clone; @@ -28,124 +30,6 @@ pub trait FromDbModel { Self: Sized; } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Schedule { - #[serde(flatten)] - pub s: DbSchedule, - pub tags: Vec, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Relay { - #[serde(flatten)] - pub r: DbRelay, - pub controller: DbController, - pub controller_id: ControllerUid, - pub schedules: Vec, - pub active_schedule: DbSchedule, - pub tags: Vec, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Controller { - #[serde(flatten)] - pub c: DbController, - pub relays: Vec, -} - -impl FromDbModel for Schedule { - type DbModel = DbSchedule; - type DbModelCache = Vec; - - fn from_db_model( - conn: &mut PoolConnection, - db_model: Self::DbModel, - ) -> Result { - let cache = executor::block_on(db_model.get_tags(conn))?; - Self::from_db_model_cache(conn, db_model, cache) - } - - fn from_db_model_cache( - _conn: &mut PoolConnection, - db_model: Self::DbModel, - cache: Self::DbModelCache, - ) -> Result { - let schedule = db_model.clone(); - - Ok(Schedule { - s: schedule, - tags: cache, - }) - } -} - -impl FromDbModel for Relay { - type DbModel = DbRelay; - type DbModelCache = DbController; - - fn from_db_model( - conn: &mut PoolConnection, - db_model: Self::DbModel, - ) -> Result { - let cache = executor::block_on(db_model.get_controller(conn))?; - Self::from_db_model_cache(conn, db_model, cache) - } - - fn from_db_model_cache( - conn: &mut PoolConnection, - db_model: Self::DbModel, - cache: Self::DbModelCache, - ) -> Result { - let tags = executor::block_on(db_model.get_tags(conn))?; - let controller_id = cache.uid.clone(); - - let schedules = - executor::block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?; - - let weekday = utils::get_weekday(); - let active_schedule = executor::block_on(DbJunctionRelaySchedule::get_schedule( - conn, - &db_model, - weekday as Weekday, - ))? - .ok_or(DatabaseError::NotFound)?; - - Ok(Relay { - r: db_model, - controller: cache, - controller_id, - schedules, - active_schedule, - tags, - }) - } -} - -impl FromDbModel for Controller { - type DbModel = DbController; - type DbModelCache = Vec; - - fn from_db_model( - conn: &mut PoolConnection, - db_model: Self::DbModel, - ) -> Result { - let relays_db = executor::block_on(db_model.get_relays(conn))?; - let cache = convert_db_list_cache(conn, relays_db, db_model.clone())?; - Self::from_db_model_cache(conn, db_model, cache) - } - - fn from_db_model_cache( - _conn: &mut PoolConnection, - db_model: Self::DbModel, - cache: Self::DbModelCache, - ) -> Result { - Ok(Controller { - c: db_model, - relays: cache, - }) - } -} - fn convert_db_list_generic( conn: &mut PoolConnection, db_models: Vec, diff --git a/emgauwa-lib/src/models/relay.rs b/emgauwa-lib/src/models/relay.rs new file mode 100644 index 0000000..979d887 --- /dev/null +++ b/emgauwa-lib/src/models/relay.rs @@ -0,0 +1,70 @@ +use futures::executor::block_on; +use serde_derive::{Deserialize, Serialize}; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; + +use crate::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule}; +use crate::errors::DatabaseError; +use crate::models::FromDbModel; +use crate::types::{ControllerUid, Weekday}; +use crate::utils; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Relay { + #[serde(flatten)] + pub r: DbRelay, + pub controller: DbController, + pub controller_id: ControllerUid, + pub schedules: Vec, + pub active_schedule: DbSchedule, + pub tags: Vec, +} + + +impl FromDbModel for Relay { + type DbModel = DbRelay; + type DbModelCache = DbController; + + fn from_db_model( + conn: &mut PoolConnection, + db_model: Self::DbModel, + ) -> Result { + let cache = block_on(db_model.get_controller(conn))?; + Self::from_db_model_cache(conn, db_model, cache) + } + + fn from_db_model_cache( + conn: &mut PoolConnection, + db_model: Self::DbModel, + cache: Self::DbModelCache, + ) -> Result { + let tags = block_on(db_model.get_tags(conn))?; + let controller_id = cache.uid.clone(); + + let schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?; + + let weekday = utils::get_weekday(); + let active_schedule = block_on(DbJunctionRelaySchedule::get_schedule( + conn, + &db_model, + weekday as Weekday, + ))? + .ok_or(DatabaseError::NotFound)?; + + Ok(Relay { + r: db_model, + controller: cache, + controller_id, + schedules, + active_schedule, + tags, + }) + } +} + +impl Relay { + pub fn reload(&mut self, conn: &mut PoolConnection) -> Result<(), DatabaseError> { + self.r = block_on(self.r.reload(conn))?; + Ok(()) + } +} diff --git a/emgauwa-lib/src/models/schedule.rs b/emgauwa-lib/src/models/schedule.rs new file mode 100644 index 0000000..9bf07b4 --- /dev/null +++ b/emgauwa-lib/src/models/schedule.rs @@ -0,0 +1,41 @@ +use futures::executor::block_on; +use serde_derive::{Deserialize, Serialize}; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; + +use crate::db::DbSchedule; +use crate::errors::DatabaseError; +use crate::models::FromDbModel; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Schedule { + #[serde(flatten)] + pub s: DbSchedule, + pub tags: Vec, +} + +impl FromDbModel for Schedule { + type DbModel = DbSchedule; + type DbModelCache = Vec; + + fn from_db_model( + conn: &mut PoolConnection, + db_model: Self::DbModel, + ) -> Result { + let cache = block_on(db_model.get_tags(conn))?; + Self::from_db_model_cache(conn, db_model, cache) + } + + fn from_db_model_cache( + _conn: &mut PoolConnection, + db_model: Self::DbModel, + cache: Self::DbModelCache, + ) -> Result { + let schedule = db_model.clone(); + + Ok(Schedule { + s: schedule, + tags: cache, + }) + } +}