From 8a83602d6a5e439ebe4d43a9fa32b58a7c6297d3 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Thu, 25 Apr 2024 01:26:53 +0200 Subject: [PATCH] Add basic exchange of relay states --- .env.example | 2 + Cargo.lock | Bin 68651 -> 68671 bytes emgauwa-controller/Cargo.toml | 1 + emgauwa-controller/src/app_state.rs | 58 +++++++++++++++--- emgauwa-controller/src/relay_loop.rs | 44 ++++++++----- emgauwa-controller/src/utils.rs | 25 +++++++- emgauwa-controller/src/ws/mod.rs | 41 +++++++++++-- .../handlers/v1/ws/controllers/handlers.rs | 10 ++- emgauwa-lib/src/db/relays.rs | 1 - emgauwa-lib/src/models/relay.rs | 11 +--- 10 files changed, 152 insertions(+), 41 deletions(-) diff --git a/.env.example b/.env.example index 91c7b95..d3e5feb 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,5 @@ +DATABASE_URL=sqlite://emgauwa-dev.sqlite + #EMGAUWA_CONTROLLER_LOGGING_LEVEL=INFO #EMGAUWA_CORE_LOGGING_LEVEL=INFO \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 013c284bbebba8d895c056adf247ff5ddd15fb14..eb44971ac783707b7177b719895df616fe94a467 100644 GIT binary patch delta 30 ocmV+(0O9|um;}F=1hAb!la|30lk`JPlMX`)lQ04Xv-&|X%F;Ou=l}o! delta 14 Wcmdl#gJty$mJM?qH*a+8I|TqV5(h2- diff --git a/emgauwa-controller/Cargo.toml b/emgauwa-controller/Cargo.toml index 4feb00a..5a95670 100644 --- a/emgauwa-controller/Cargo.toml +++ b/emgauwa-controller/Cargo.toml @@ -25,3 +25,4 @@ serde_derive = "1.0" sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] } futures = "0.3" +futures-channel = "0.3" diff --git a/emgauwa-controller/src/app_state.rs b/emgauwa-controller/src/app_state.rs index 75811ba..e67eabd 100644 --- a/emgauwa-controller/src/app_state.rs +++ b/emgauwa-controller/src/app_state.rs @@ -11,18 +11,29 @@ use tokio::sync::Notify; #[rtype(result = "Result<(), EmgauwaError>")] pub struct Reload {} +#[derive(Message)] +#[rtype(result = "()")] +pub struct UpdateRelaysOn { + pub relays_are_on: Vec>, +} + #[derive(Message)] #[rtype(result = "Controller")] pub struct GetThis {} #[derive(Message)] #[rtype(result = "Arc")] -pub struct GetNotifier {} +pub struct GetControllerNotifier {} + +#[derive(Message)] +#[rtype(result = "Arc")] +pub struct GetRelayNotifier {} pub struct AppState { pub pool: Pool, pub this: Controller, - pub notifier: Arc, + pub controller_notifier: Arc, + pub relay_notifier: Arc, } impl AppState { @@ -30,12 +41,17 @@ impl AppState { AppState { pool, this, - notifier: Arc::new(Notify::new()), + controller_notifier: Arc::new(Notify::new()), + relay_notifier: Arc::new(Notify::new()), } } - pub fn notify_change(&self) { - self.notifier.notify_one(); + pub fn notify_controller_change(&self) { + self.controller_notifier.notify_one(); + } + + pub fn notify_relay_change(&self) { + self.relay_notifier.notify_one(); } } @@ -52,12 +68,28 @@ impl Handler for AppState { self.this.reload(&mut pool_conn)?; - self.notify_change(); + self.notify_controller_change(); Ok(()) } } +impl Handler for AppState { + type Result = (); + + fn handle(&mut self, msg: UpdateRelaysOn, _ctx: &mut Self::Context) -> Self::Result { + self.this + .relays + .iter_mut() + .zip(msg.relays_are_on.iter()) + .for_each(|(relay, is_on)| { + relay.is_on = *is_on; + }); + + self.notify_relay_change(); + } +} + impl Handler for AppState { type Result = Controller; @@ -66,10 +98,18 @@ impl Handler for AppState { } } -impl Handler for AppState { +impl Handler for AppState { type Result = Arc; - fn handle(&mut self, _msg: GetNotifier, _ctx: &mut Self::Context) -> Self::Result { - Arc::clone(&self.notifier) + fn handle(&mut self, _msg: GetControllerNotifier, _ctx: &mut Self::Context) -> Self::Result { + Arc::clone(&self.controller_notifier) + } +} + +impl Handler for AppState { + type Result = Arc; + + fn handle(&mut self, _msg: GetRelayNotifier, _ctx: &mut Self::Context) -> Self::Result { + Arc::clone(&self.relay_notifier) } } diff --git a/emgauwa-controller/src/relay_loop.rs b/emgauwa-controller/src/relay_loop.rs index 25000fb..38fbbb2 100644 --- a/emgauwa-controller/src/relay_loop.rs +++ b/emgauwa-controller/src/relay_loop.rs @@ -4,10 +4,11 @@ use actix::Addr; use chrono::Local; use emgauwa_lib::constants::RELAYS_RETRY_TIMEOUT; use emgauwa_lib::errors::EmgauwaError; +use emgauwa_lib::models::Controller; use futures::pin_mut; use tokio::time; use tokio::time::timeout; -use utils::app_state_get_notifier; +use utils::app_state_get_controller_notifier; use crate::app_state::AppState; use crate::utils; @@ -25,10 +26,12 @@ pub async fn run_relays_loop(app_state: Addr) { async fn run_relays(app_state: &Addr) -> Result<(), EmgauwaError> { let default_duration = Duration::new(10, 0); - let notifier = &*app_state_get_notifier(app_state).await?; + let notifier = &*app_state_get_controller_notifier(app_state).await?; let mut last_weekday = emgauwa_lib::utils::get_weekday(); let mut this = utils::app_state_get_this(app_state).await?; + let mut relay_states: Vec> = Vec::new(); + init_relay_states(&mut relay_states, &this); loop { let notifier_future = notifier.notified(); @@ -51,22 +54,35 @@ async fn run_relays(app_state: &Addr) -> Result<(), EmgauwaError> { let mut relay_debug = String::new(); let now = Local::now().time(); - for relay in this.relays.iter() { - relay_debug.push_str(&format!( - "{}{}: {} ; ", - if relay.active_schedule.is_on(&now) { - "+" - } else { - "-" - }, - relay.r.name, - relay.active_schedule.name - )); - } + this.relays + .iter() + .zip(relay_states.iter_mut()) + .for_each(|(relay, state)| { + *state = Some(relay.active_schedule.is_on(&now)); + + relay_debug.push_str(&format!( + "{}{}: {} ; ", + if relay.active_schedule.is_on(&now) { + "+" + } else { + "-" + }, + relay.r.name, + relay.active_schedule.name + )); + }); log::debug!( "Relay loop at {}: {}", Local::now().naive_local().time(), relay_debug ); + utils::app_state_update_relays_on(app_state, relay_states.clone()).await?; + } +} + +fn init_relay_states(relay_states: &mut Vec>, this: &Controller) { + relay_states.clear(); + for _ in 0..this.c.relay_count { + relay_states.push(None); } } diff --git a/emgauwa-controller/src/utils.rs b/emgauwa-controller/src/utils.rs index 7be9ae6..1404f72 100644 --- a/emgauwa-controller/src/utils.rs +++ b/emgauwa-controller/src/utils.rs @@ -15,11 +15,20 @@ pub async fn app_state_get_this(app_state: &Addr) -> Result, ) -> Result, EmgauwaError> { app_state - .send(app_state::GetNotifier {}) + .send(app_state::GetRelayNotifier {}) + .await + .map_err(EmgauwaError::from) +} + +pub async fn app_state_get_controller_notifier( + app_state: &Addr, +) -> Result, EmgauwaError> { + app_state + .send(app_state::GetControllerNotifier {}) .await .map_err(EmgauwaError::from) } @@ -30,3 +39,15 @@ pub async fn app_state_reload(app_state: &Addr) -> Result<(), EmgauwaE .await .map_err(EmgauwaError::from)? } + +pub async fn app_state_update_relays_on( + app_state: &Addr, + relay_states: Vec>, +) -> Result<(), EmgauwaError> { + app_state + .send(app_state::UpdateRelaysOn { + relays_are_on: relay_states, + }) + .await + .map_err(EmgauwaError::from) +} diff --git a/emgauwa-controller/src/ws/mod.rs b/emgauwa-controller/src/ws/mod.rs index 0d80391..1799f63 100644 --- a/emgauwa-controller/src/ws/mod.rs +++ b/emgauwa-controller/src/ws/mod.rs @@ -4,7 +4,7 @@ use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule use emgauwa_lib::errors::{DatabaseError, EmgauwaError}; use emgauwa_lib::models::{Controller, Relay}; use emgauwa_lib::types::{ControllerWsAction, ScheduleUid}; -use futures::{SinkExt, StreamExt}; +use futures::{future, pin_mut, SinkExt, StreamExt}; use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use tokio::time; @@ -12,8 +12,8 @@ use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, tungstenite}; use crate::app_state::AppState; -use crate::utils::app_state_get_this; -use crate::{app_state, utils}; +use crate::utils; +use crate::utils::{app_state_get_relay_notifier, app_state_get_this}; pub async fn run_ws_loop(pool: Pool, app_state: Addr, url: String) { log::debug!("Spawned ws loop"); @@ -52,9 +52,14 @@ async fn run_websocket( return Ok(()); } + let (app_state_tx, app_state_rx) = futures_channel::mpsc::unbounded::(); + tokio::spawn(read_app_state(app_state.clone(), app_state_tx)); + let app_state_to_ws = app_state_rx.map(Ok).forward(write); + let read_handler = read.for_each(|msg| handle_message(pool.clone(), app_state, msg)); - read_handler.await; + pin_mut!(app_state_to_ws, read_handler); + future::select(app_state_to_ws, read_handler).await; log::warn!("Lost connection to websocket"); } @@ -65,6 +70,26 @@ async fn run_websocket( Ok(()) } +async fn read_app_state( + app_state: Addr, + tx: futures_channel::mpsc::UnboundedSender, +) -> Result<(), EmgauwaError> { + let notifier = &*app_state_get_relay_notifier(&app_state).await?; + loop { + notifier.notified().await; + log::debug!("Relay change detected"); + let ws_action = ControllerWsAction::Register(app_state_get_this(&app_state).await?); + + let ws_action_json = serde_json::to_string(&ws_action)?; + tx.unbounded_send(Message::text(ws_action_json)) + .map_err(|_| { + EmgauwaError::Other(String::from( + "Failed to forward message from app state to websocket", + )) + })?; + } +} + async fn handle_message( pool: Pool, app_state: &Addr, @@ -77,8 +102,8 @@ async fn handle_message( return; } }; - if let Message::Text(text) = msg { - match serde_json::from_str(&text) { + match msg { + Message::Text(text) => match serde_json::from_str(&text) { Ok(action) => { log::debug!("Received action: {:?}", action); let mut pool_conn = match pool.acquire().await { @@ -96,7 +121,11 @@ async fn handle_message( Err(e) => { log::error!("Error deserializing action: {:?}", e); } + }, + Message::Ping(_) => { + log::debug!("Received ping"); } + _ => {} } } diff --git a/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs b/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs index 3d67920..5f7aa37 100644 --- a/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs +++ b/emgauwa-core/src/handlers/v1/ws/controllers/handlers.rs @@ -31,7 +31,15 @@ impl ControllerWs { block_on(controller_db.update_active(conn, true))?; for relay in &controller.relays { - log::debug!("Registering relay: {}", relay.r.name); + log::debug!( + "Registering relay: {} ({})", + relay.r.name, + match relay.is_on { + Some(true) => "+", + Some(false) => "-", + None => "?", + } + ); let (new_relay, created) = block_on(DbRelay::get_by_controller_and_num_or_create( conn, &controller_db, diff --git a/emgauwa-lib/src/db/relays.rs b/emgauwa-lib/src/db/relays.rs index 8b91d0e..91d3759 100644 --- a/emgauwa-lib/src/db/relays.rs +++ b/emgauwa-lib/src/db/relays.rs @@ -1,6 +1,5 @@ use std::ops::DerefMut; -use futures::executor::block_on; use serde_derive::{Deserialize, Serialize}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; diff --git a/emgauwa-lib/src/models/relay.rs b/emgauwa-lib/src/models/relay.rs index 69bb099..02a03ea 100644 --- a/emgauwa-lib/src/models/relay.rs +++ b/emgauwa-lib/src/models/relay.rs @@ -1,4 +1,4 @@ -use chrono::{Local, NaiveTime}; +use chrono::NaiveTime; use futures::executor::block_on; use serde_derive::{Deserialize, Serialize}; use sqlx::pool::PoolConnection; @@ -17,7 +17,7 @@ pub struct Relay { pub controller_id: ControllerUid, pub schedules: Vec, pub active_schedule: DbSchedule, - pub is_on: bool, + pub is_on: Option, pub tags: Vec, } @@ -45,8 +45,7 @@ impl FromDbModel for Relay { let schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?; let active_schedule = block_on(db_model.get_active_schedule(conn))?; - let now = Local::now().time(); - let is_on = active_schedule.is_on(&now); + let is_on = None; Ok(Relay { r: db_model, @@ -74,10 +73,6 @@ impl Relay { conn: &mut PoolConnection, ) -> Result<(), DatabaseError> { self.active_schedule = block_on(self.r.get_active_schedule(conn))?; - - let now = Local::now().time(); - self.is_on = self.active_schedule.is_on(&now); - Ok(()) }