Add basic exchange of relay states

This commit is contained in:
Tobias Reisinger 2024-04-25 01:26:53 +02:00
parent 0460e838bc
commit 8a83602d6a
Signed by: serguzim
GPG key ID: 13AD60C237A28DFE
10 changed files with 152 additions and 41 deletions

View file

@ -1,3 +1,5 @@
DATABASE_URL=sqlite://emgauwa-dev.sqlite
#EMGAUWA_CONTROLLER_LOGGING_LEVEL=INFO #EMGAUWA_CONTROLLER_LOGGING_LEVEL=INFO
#EMGAUWA_CORE_LOGGING_LEVEL=INFO #EMGAUWA_CORE_LOGGING_LEVEL=INFO

BIN
Cargo.lock generated

Binary file not shown.

View file

@ -25,3 +25,4 @@ serde_derive = "1.0"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] } sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] }
futures = "0.3" futures = "0.3"
futures-channel = "0.3"

View file

@ -11,18 +11,29 @@ use tokio::sync::Notify;
#[rtype(result = "Result<(), EmgauwaError>")] #[rtype(result = "Result<(), EmgauwaError>")]
pub struct Reload {} pub struct Reload {}
#[derive(Message)]
#[rtype(result = "()")]
pub struct UpdateRelaysOn {
pub relays_are_on: Vec<Option<bool>>,
}
#[derive(Message)] #[derive(Message)]
#[rtype(result = "Controller")] #[rtype(result = "Controller")]
pub struct GetThis {} pub struct GetThis {}
#[derive(Message)] #[derive(Message)]
#[rtype(result = "Arc<Notify>")] #[rtype(result = "Arc<Notify>")]
pub struct GetNotifier {} pub struct GetControllerNotifier {}
#[derive(Message)]
#[rtype(result = "Arc<Notify>")]
pub struct GetRelayNotifier {}
pub struct AppState { pub struct AppState {
pub pool: Pool<Sqlite>, pub pool: Pool<Sqlite>,
pub this: Controller, pub this: Controller,
pub notifier: Arc<Notify>, pub controller_notifier: Arc<Notify>,
pub relay_notifier: Arc<Notify>,
} }
impl AppState { impl AppState {
@ -30,12 +41,17 @@ impl AppState {
AppState { AppState {
pool, pool,
this, this,
notifier: Arc::new(Notify::new()), controller_notifier: Arc::new(Notify::new()),
relay_notifier: Arc::new(Notify::new()),
} }
} }
pub fn notify_change(&self) { pub fn notify_controller_change(&self) {
self.notifier.notify_one(); self.controller_notifier.notify_one();
}
pub fn notify_relay_change(&self) {
self.relay_notifier.notify_one();
} }
} }
@ -52,12 +68,28 @@ impl Handler<Reload> for AppState {
self.this.reload(&mut pool_conn)?; self.this.reload(&mut pool_conn)?;
self.notify_change(); self.notify_controller_change();
Ok(()) Ok(())
} }
} }
impl Handler<UpdateRelaysOn> for AppState {
type Result = ();
fn handle(&mut self, msg: UpdateRelaysOn, _ctx: &mut Self::Context) -> Self::Result {
self.this
.relays
.iter_mut()
.zip(msg.relays_are_on.iter())
.for_each(|(relay, is_on)| {
relay.is_on = *is_on;
});
self.notify_relay_change();
}
}
impl Handler<GetThis> for AppState { impl Handler<GetThis> for AppState {
type Result = Controller; type Result = Controller;
@ -66,10 +98,18 @@ impl Handler<GetThis> for AppState {
} }
} }
impl Handler<GetNotifier> for AppState { impl Handler<GetControllerNotifier> for AppState {
type Result = Arc<Notify>; type Result = Arc<Notify>;
fn handle(&mut self, _msg: GetNotifier, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: GetControllerNotifier, _ctx: &mut Self::Context) -> Self::Result {
Arc::clone(&self.notifier) Arc::clone(&self.controller_notifier)
}
}
impl Handler<GetRelayNotifier> for AppState {
type Result = Arc<Notify>;
fn handle(&mut self, _msg: GetRelayNotifier, _ctx: &mut Self::Context) -> Self::Result {
Arc::clone(&self.relay_notifier)
} }
} }

View file

@ -4,10 +4,11 @@ use actix::Addr;
use chrono::Local; use chrono::Local;
use emgauwa_lib::constants::RELAYS_RETRY_TIMEOUT; use emgauwa_lib::constants::RELAYS_RETRY_TIMEOUT;
use emgauwa_lib::errors::EmgauwaError; use emgauwa_lib::errors::EmgauwaError;
use emgauwa_lib::models::Controller;
use futures::pin_mut; use futures::pin_mut;
use tokio::time; use tokio::time;
use tokio::time::timeout; use tokio::time::timeout;
use utils::app_state_get_notifier; use utils::app_state_get_controller_notifier;
use crate::app_state::AppState; use crate::app_state::AppState;
use crate::utils; use crate::utils;
@ -25,10 +26,12 @@ pub async fn run_relays_loop(app_state: Addr<AppState>) {
async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> { async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
let default_duration = Duration::new(10, 0); let default_duration = Duration::new(10, 0);
let notifier = &*app_state_get_notifier(app_state).await?; let notifier = &*app_state_get_controller_notifier(app_state).await?;
let mut last_weekday = emgauwa_lib::utils::get_weekday(); let mut last_weekday = emgauwa_lib::utils::get_weekday();
let mut this = utils::app_state_get_this(app_state).await?; let mut this = utils::app_state_get_this(app_state).await?;
let mut relay_states: Vec<Option<bool>> = Vec::new();
init_relay_states(&mut relay_states, &this);
loop { loop {
let notifier_future = notifier.notified(); let notifier_future = notifier.notified();
@ -51,7 +54,12 @@ async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
let mut relay_debug = String::new(); let mut relay_debug = String::new();
let now = Local::now().time(); let now = Local::now().time();
for relay in this.relays.iter() { this.relays
.iter()
.zip(relay_states.iter_mut())
.for_each(|(relay, state)| {
*state = Some(relay.active_schedule.is_on(&now));
relay_debug.push_str(&format!( relay_debug.push_str(&format!(
"{}{}: {} ; ", "{}{}: {} ; ",
if relay.active_schedule.is_on(&now) { if relay.active_schedule.is_on(&now) {
@ -62,11 +70,19 @@ async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
relay.r.name, relay.r.name,
relay.active_schedule.name relay.active_schedule.name
)); ));
} });
log::debug!( log::debug!(
"Relay loop at {}: {}", "Relay loop at {}: {}",
Local::now().naive_local().time(), Local::now().naive_local().time(),
relay_debug relay_debug
); );
utils::app_state_update_relays_on(app_state, relay_states.clone()).await?;
}
}
fn init_relay_states(relay_states: &mut Vec<Option<bool>>, this: &Controller) {
relay_states.clear();
for _ in 0..this.c.relay_count {
relay_states.push(None);
} }
} }

View file

@ -15,11 +15,20 @@ pub async fn app_state_get_this(app_state: &Addr<AppState>) -> Result<Controller
.map_err(EmgauwaError::from) .map_err(EmgauwaError::from)
} }
pub async fn app_state_get_notifier( pub async fn app_state_get_relay_notifier(
app_state: &Addr<AppState>, app_state: &Addr<AppState>,
) -> Result<Arc<Notify>, EmgauwaError> { ) -> Result<Arc<Notify>, EmgauwaError> {
app_state app_state
.send(app_state::GetNotifier {}) .send(app_state::GetRelayNotifier {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_get_controller_notifier(
app_state: &Addr<AppState>,
) -> Result<Arc<Notify>, EmgauwaError> {
app_state
.send(app_state::GetControllerNotifier {})
.await .await
.map_err(EmgauwaError::from) .map_err(EmgauwaError::from)
} }
@ -30,3 +39,15 @@ pub async fn app_state_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaE
.await .await
.map_err(EmgauwaError::from)? .map_err(EmgauwaError::from)?
} }
pub async fn app_state_update_relays_on(
app_state: &Addr<AppState>,
relay_states: Vec<Option<bool>>,
) -> Result<(), EmgauwaError> {
app_state
.send(app_state::UpdateRelaysOn {
relays_are_on: relay_states,
})
.await
.map_err(EmgauwaError::from)
}

View file

@ -4,7 +4,7 @@ use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule
use emgauwa_lib::errors::{DatabaseError, EmgauwaError}; use emgauwa_lib::errors::{DatabaseError, EmgauwaError};
use emgauwa_lib::models::{Controller, Relay}; use emgauwa_lib::models::{Controller, Relay};
use emgauwa_lib::types::{ControllerWsAction, ScheduleUid}; use emgauwa_lib::types::{ControllerWsAction, ScheduleUid};
use futures::{SinkExt, StreamExt}; use futures::{future, pin_mut, SinkExt, StreamExt};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite}; use sqlx::{Pool, Sqlite};
use tokio::time; use tokio::time;
@ -12,8 +12,8 @@ use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, tungstenite}; use tokio_tungstenite::{connect_async, tungstenite};
use crate::app_state::AppState; use crate::app_state::AppState;
use crate::utils::app_state_get_this; use crate::utils;
use crate::{app_state, utils}; use crate::utils::{app_state_get_relay_notifier, app_state_get_this};
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) { pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
log::debug!("Spawned ws loop"); log::debug!("Spawned ws loop");
@ -52,9 +52,14 @@ async fn run_websocket(
return Ok(()); return Ok(());
} }
let (app_state_tx, app_state_rx) = futures_channel::mpsc::unbounded::<Message>();
tokio::spawn(read_app_state(app_state.clone(), app_state_tx));
let app_state_to_ws = app_state_rx.map(Ok).forward(write);
let read_handler = read.for_each(|msg| handle_message(pool.clone(), app_state, msg)); let read_handler = read.for_each(|msg| handle_message(pool.clone(), app_state, msg));
read_handler.await; pin_mut!(app_state_to_ws, read_handler);
future::select(app_state_to_ws, read_handler).await;
log::warn!("Lost connection to websocket"); log::warn!("Lost connection to websocket");
} }
@ -65,6 +70,26 @@ async fn run_websocket(
Ok(()) Ok(())
} }
async fn read_app_state(
app_state: Addr<AppState>,
tx: futures_channel::mpsc::UnboundedSender<Message>,
) -> Result<(), EmgauwaError> {
let notifier = &*app_state_get_relay_notifier(&app_state).await?;
loop {
notifier.notified().await;
log::debug!("Relay change detected");
let ws_action = ControllerWsAction::Register(app_state_get_this(&app_state).await?);
let ws_action_json = serde_json::to_string(&ws_action)?;
tx.unbounded_send(Message::text(ws_action_json))
.map_err(|_| {
EmgauwaError::Other(String::from(
"Failed to forward message from app state to websocket",
))
})?;
}
}
async fn handle_message( async fn handle_message(
pool: Pool<Sqlite>, pool: Pool<Sqlite>,
app_state: &Addr<AppState>, app_state: &Addr<AppState>,
@ -77,8 +102,8 @@ async fn handle_message(
return; return;
} }
}; };
if let Message::Text(text) = msg { match msg {
match serde_json::from_str(&text) { Message::Text(text) => match serde_json::from_str(&text) {
Ok(action) => { Ok(action) => {
log::debug!("Received action: {:?}", action); log::debug!("Received action: {:?}", action);
let mut pool_conn = match pool.acquire().await { let mut pool_conn = match pool.acquire().await {
@ -96,7 +121,11 @@ async fn handle_message(
Err(e) => { Err(e) => {
log::error!("Error deserializing action: {:?}", e); log::error!("Error deserializing action: {:?}", e);
} }
},
Message::Ping(_) => {
log::debug!("Received ping");
} }
_ => {}
} }
} }

View file

@ -31,7 +31,15 @@ impl ControllerWs {
block_on(controller_db.update_active(conn, true))?; block_on(controller_db.update_active(conn, true))?;
for relay in &controller.relays { for relay in &controller.relays {
log::debug!("Registering relay: {}", relay.r.name); log::debug!(
"Registering relay: {} ({})",
relay.r.name,
match relay.is_on {
Some(true) => "+",
Some(false) => "-",
None => "?",
}
);
let (new_relay, created) = block_on(DbRelay::get_by_controller_and_num_or_create( let (new_relay, created) = block_on(DbRelay::get_by_controller_and_num_or_create(
conn, conn,
&controller_db, &controller_db,

View file

@ -1,6 +1,5 @@
use std::ops::DerefMut; use std::ops::DerefMut;
use futures::executor::block_on;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;

View file

@ -1,4 +1,4 @@
use chrono::{Local, NaiveTime}; use chrono::NaiveTime;
use futures::executor::block_on; use futures::executor::block_on;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
@ -17,7 +17,7 @@ pub struct Relay {
pub controller_id: ControllerUid, pub controller_id: ControllerUid,
pub schedules: Vec<DbSchedule>, pub schedules: Vec<DbSchedule>,
pub active_schedule: DbSchedule, pub active_schedule: DbSchedule,
pub is_on: bool, pub is_on: Option<bool>,
pub tags: Vec<String>, pub tags: Vec<String>,
} }
@ -45,8 +45,7 @@ impl FromDbModel for Relay {
let schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?; let schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?;
let active_schedule = block_on(db_model.get_active_schedule(conn))?; let active_schedule = block_on(db_model.get_active_schedule(conn))?;
let now = Local::now().time(); let is_on = None;
let is_on = active_schedule.is_on(&now);
Ok(Relay { Ok(Relay {
r: db_model, r: db_model,
@ -74,10 +73,6 @@ impl Relay {
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
self.active_schedule = block_on(self.r.get_active_schedule(conn))?; self.active_schedule = block_on(self.r.get_active_schedule(conn))?;
let now = Local::now().time();
self.is_on = self.active_schedule.is_on(&now);
Ok(()) Ok(())
} }