Compare commits

..

No commits in common. "5ee542b44b3c7f1d1d59a4dfe7cb0679d44e0f3f" and "6df724f2db8e2a33074132b8e4e1013c25836e9f" have entirely different histories.

12 changed files with 316 additions and 355 deletions

BIN
Cargo.lock generated

Binary file not shown.

View file

@ -1,4 +1,4 @@
amends "package://emgauwa.app/pkl/emgauwa@0.2.1#/controller.pkl"
amends "package://emgauwa.app/pkl/emgauwa@0.1.1#/controller.pkl"
logging {
level = "DEBUG"

View file

@ -1,12 +1,11 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::{Actor, Addr, Context, Handler, Message};
use actix::{Actor, Context, Handler, Message};
use emgauwa_common::constants;
use emgauwa_common::db::DbSchedule;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::Controller;
use emgauwa_common::types::{EmgauwaNow, RelayStates};
use emgauwa_common::types::RelayStates;
use futures::executor::block_on;
use sqlx::{Pool, Sqlite};
use tokio::sync::Notify;
@ -31,13 +30,6 @@ pub struct RelayPulse {
pub duration: Option<u32>,
}
#[derive(Message)]
#[rtype(result = "Result<(), EmgauwaError>")]
pub struct RelayOverrideSchedule {
pub relay_number: i64,
pub schedule: Option<DbSchedule>,
}
#[derive(Message)]
#[rtype(result = "Controller")]
pub struct GetThis {}
@ -83,20 +75,6 @@ impl AppState {
pub fn notify_relay_change(&self) {
self.relay_notifier.notify_one();
}
pub async fn trigger_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
app_state
.send(Reload {})
.await
.map_err(EmgauwaError::from)?
}
pub async fn get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
app_state
.send(GetThis {})
.await
.map_err(EmgauwaError::from)
}
}
impl Actor for AppState {
@ -127,7 +105,7 @@ impl Handler<UpdateRelayStates> for AppState {
.iter_mut()
.zip(msg.relay_states.iter())
.for_each(|(driver, state)| {
if let Err(e) = driver.set(state.is_on.unwrap_or(false)) {
if let Err(e) = driver.set(state.unwrap_or(false)) {
log::error!("Error setting relay: {}", e);
}
});
@ -170,35 +148,6 @@ impl Handler<RelayPulse> for AppState {
}
}
impl Handler<RelayOverrideSchedule> for AppState {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, msg: RelayOverrideSchedule, _ctx: &mut Self::Context) -> Self::Result {
let relay_num = msg.relay_number;
let schedule = msg.schedule;
let weekday = EmgauwaNow::now(&self.settings.midnight).weekday;
let relay = self
.this
.relays
.iter_mut()
.find(|r| r.r.number == relay_num)
.ok_or(EmgauwaError::Other(String::from("Relay not found")))?;
log::debug!(
"Overriding schedule for relay {} to '{}' on day {}",
relay_num,
schedule.as_ref().map_or("NONE", |s| &s.name),
weekday
);
relay.override_schedule = schedule;
relay.override_schedule_weekday = weekday;
Ok(())
}
}
impl Handler<GetThis> for AppState {
type Result = Controller;

View file

@ -2,7 +2,6 @@ pub use gpio::GpioDriver;
pub use null::NullDriver;
pub use piface::PiFaceDriver;
use serde::{Deserialize, Deserializer};
use crate::errors::EmgauwaControllerError;
mod gpio;

View file

@ -11,9 +11,7 @@ pub struct PiFaceDriver {
impl PiFaceDriver {
pub fn new(pin: u8, pfd: &Option<PiFaceDigital>) -> Result<Self, EmgauwaControllerError> {
let pfd = pfd
.as_ref()
.ok_or(EmgauwaControllerError::Hardware(String::from(
let pfd = pfd.as_ref().ok_or(EmgauwaControllerError::Hardware(String::from(
"PiFaceDigital not initialized",
)))?;
let pfd_pin = pfd.get_output_pin(pin)?;

View file

@ -1,10 +1,10 @@
use std::fmt::{Display, Formatter};
use emgauwa_common::errors::EmgauwaError;
use rppal::gpio;
use rppal_mcp23s17::Mcp23s17Error;
use rppal_pfd::PiFaceDigitalError;
use emgauwa_common::errors::EmgauwaError;
#[derive(Debug)]
pub enum EmgauwaControllerError {
Hardware(String),

View file

@ -3,7 +3,7 @@ use emgauwa_common::db;
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::{Controller, FromDbModel};
use emgauwa_common::types::{EmgauwaNow, EmgauwaUid};
use emgauwa_common::types::EmgauwaUid;
use emgauwa_common::utils::{drop_privileges, init_logging};
use rppal_pfd::PiFaceDigital;
use sqlx::pool::PoolConnection;
@ -15,10 +15,11 @@ use crate::ws::run_ws_loop;
mod app_state;
mod drivers;
mod errors;
mod relay_loop;
mod settings;
mod utils;
mod ws;
mod errors;
async fn create_this_controller(
conn: &mut PoolConnection<Sqlite>,
@ -61,7 +62,7 @@ async fn main() -> Result<(), std::io::Error> {
drop_privileges(&settings.permissions)?;
init_logging(&settings.logging)?;
init_logging(&settings.logging.level)?;
let pool = db::init(&settings.database, 5)
.await
@ -79,7 +80,11 @@ async fn main() -> Result<(), std::io::Error> {
};
for relay in &settings.relays {
if DbRelay::get_by_controller_and_num(&mut conn, &db_controller, relay.number)
if DbRelay::get_by_controller_and_num(
&mut conn,
&db_controller,
relay.number,
)
.await
.map_err(EmgauwaError::from)?
.is_none()
@ -95,17 +100,10 @@ async fn main() -> Result<(), std::io::Error> {
.await
.map_err(EmgauwaError::from)?;
let mut this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?;
let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?;
let now = EmgauwaNow::now(&settings.midnight);
let initial_states: Vec<bool> = this
.relays
.iter_mut()
.map(|r| {
r.reload_active_schedule(now.weekday);
r.is_on(&now.time)
})
.collect();
let now = chrono::Local::now().time();
let initial_states: Vec<bool> = this.relays.iter().map(|r| r.active_schedule.is_on(&now)).collect();
let mut pfd: Option<PiFaceDigital> = None;
let drivers = settings.relays_make_drivers(&mut pfd, initial_states)?;
@ -115,12 +113,12 @@ async fn main() -> Result<(), std::io::Error> {
settings.server.host, settings.server.port
);
let app_state = app_state::AppState::new(pool.clone(), this, settings.clone(), drivers).start();
let app_state = app_state::AppState::new(pool.clone(), this, settings, drivers).start();
log::info!("Starting main loops");
let _ = tokio::join!(
tokio::spawn(run_relays_loop(app_state.clone(), settings.clone())),
tokio::spawn(run_relays_loop(app_state.clone())),
tokio::spawn(run_ws_loop(pool.clone(), app_state.clone(), url)),
);

View file

@ -1,24 +1,24 @@
use std::time::Duration;
use std::time::{Duration, Instant};
use actix::Addr;
use chrono::{Timelike, Weekday};
use chrono::{Local, Timelike};
use emgauwa_common::constants::RELAYS_RETRY_TIMEOUT;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::Controller;
use emgauwa_common::types::EmgauwaNow;
use emgauwa_common::types::{RelayStates, Weekday};
use emgauwa_common::utils::printable_relay_states;
use futures::pin_mut;
use tokio::time;
use tokio::time::timeout;
use utils::app_state_get_controller_notifier;
use crate::app_state::AppState;
use crate::app_state;
use crate::settings::Settings;
use crate::utils;
pub async fn run_relays_loop(app_state: Addr<AppState>, settings: Settings) {
pub async fn run_relays_loop(app_state: Addr<AppState>) {
log::debug!("Spawned relays loop");
loop {
let run_result = run_relays(&app_state, &settings).await;
let run_result = run_relays(&app_state).await;
if let Err(err) = run_result {
log::error!("Error running relays: {}", err);
}
@ -26,54 +26,48 @@ pub async fn run_relays_loop(app_state: Addr<AppState>, settings: Settings) {
}
}
async fn run_relays(app_state: &Addr<AppState>, settings: &Settings) -> Result<(), EmgauwaError> {
let notifier = &*app_state
.send(app_state::GetControllerNotifier {})
.await?;
async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
let notifier = &*app_state_get_controller_notifier(app_state).await?;
let mut last_weekday = EmgauwaNow::now(&settings.midnight).weekday;
let mut this = AppState::get_this(app_state).await?;
let mut last_weekday = emgauwa_common::utils::get_weekday();
let mut this = utils::app_state_get_this(app_state).await?;
let mut relay_states: RelayStates = Vec::new();
init_relay_states(&mut relay_states, &this);
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
let mut duration_override = None;
let now = EmgauwaNow::now(&settings.midnight);
calc_relay_states(&mut this, app_state, &now).await?;
loop {
let now = EmgauwaNow::now(&settings.midnight);
log::debug!(
"Relay loop at {}: {}",
Local::now().naive_local().time(),
printable_relay_states(&this.get_relay_states())
);
let notifier_future = notifier.notified();
pin_mut!(notifier_future);
let mut changed = timeout(
get_next_duration(&mut this, &mut duration_override, &now),
get_next_duration(&this, &mut duration_override),
&mut notifier_future,
)
.await
.is_ok();
let now = EmgauwaNow::now(&settings.midnight);
check_weekday(app_state, &mut last_weekday, &mut changed, &now).await?;
check_weekday(app_state, &mut last_weekday, &mut changed).await?;
if changed {
log::debug!("Reloading controller in relay loop");
this = AppState::get_this(app_state).await?;
this = utils::app_state_get_this(app_state).await?;
}
log::debug!(
"Relay loop at {} on {}: {}",
now.time,
now.weekday.to_string(),
printable_relay_states(&this.get_relay_states())
);
let now_pulse = Instant::now();
duration_override = this
.relays
.iter_mut()
.filter_map(|relay| match relay.check_pulsing(&now.instant) {
.filter_map(|relay| match relay.check_pulsing(&now_pulse) {
None => None,
Some(pulse) => {
let dur = pulse - now.instant;
let dur = pulse - now_pulse;
log::debug!(
"Pulsing relay {} for {}s until {:?} ",
relay.r.number,
@ -85,60 +79,54 @@ async fn run_relays(app_state: &Addr<AppState>, settings: &Settings) -> Result<(
})
.min();
calc_relay_states(&mut this, app_state, &now).await?;
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
}
}
fn init_relay_states(relay_states: &mut RelayStates, this: &Controller) {
relay_states.clear();
for _ in 0..this.c.relay_count {
relay_states.push(None);
}
}
async fn calc_relay_states(
relay_states: &mut RelayStates,
this: &mut Controller,
app_state: &Addr<AppState>,
now: &EmgauwaNow,
) -> Result<(), EmgauwaError> {
let now = Local::now().time();
let now_pulse = Instant::now();
this.relays
.iter_mut()
.for_each(|relay| {
relay.reload_active_schedule(now.weekday);
.zip(relay_states.iter_mut())
.for_each(|(relay, state)| {
relay.is_on = Some(
relay.is_on(&now.time)
|| relay.check_pulsing(&now.instant).is_some(),
relay.active_schedule.is_on(&now) || relay.check_pulsing(&now_pulse).is_some(),
);
*state = relay.is_on;
});
app_state
.send(app_state::UpdateRelayStates {
relay_states: this.get_relay_states()
})
.await
.map_err(EmgauwaError::from)
utils::app_state_update_relays_on(app_state, relay_states.clone()).await
}
fn get_next_duration(
this: &mut Controller,
duration_override: &mut Option<Duration>,
now: &EmgauwaNow,
) -> Duration {
fn get_next_duration(this: &Controller, duration_override: &mut Option<Duration>) -> Duration {
if let Some(duration) = duration_override {
log::debug!("Duration override. Waiting for {}s", duration.as_secs());
return *duration;
}
let next_time = this
.check_next_time(now)
.unwrap_or(now.midnight);
let now = Local::now().time();
let now_in_s = now.num_seconds_from_midnight();
let next_timestamp = this
.get_next_time(&now)
.map_or(86400, |t| t.num_seconds_from_midnight());
// 86400 is the number of seconds in a day
// If the next timestamp is before the current time, we need to wait until the next day
let mut seconds_to_next = (86400 + next_time.num_seconds_from_midnight() - now.num_seconds_from_midnight()) % 86400;
if seconds_to_next == 0 {
seconds_to_next = 86400;
}
let duration_to_next = Duration::from_secs(seconds_to_next as u64);
let duration_to_next = Duration::from_secs((next_timestamp - now_in_s) as u64);
log::debug!(
"Next time: {}; Waiting for {}s",
next_time,
"Next timestamp: {}; Waiting for {}s",
next_timestamp,
duration_to_next.as_secs()
);
@ -149,13 +137,12 @@ async fn check_weekday(
app_state: &Addr<AppState>,
last_weekday: &mut Weekday,
changed: &mut bool,
now: &EmgauwaNow,
) -> Result<(), EmgauwaError> {
let current_weekday = now.weekday;
let current_weekday = emgauwa_common::utils::get_weekday();
if current_weekday.ne(last_weekday) {
log::debug!("Weekday changed");
*last_weekday = current_weekday;
AppState::trigger_reload(app_state).await?;
utils::app_state_reload(app_state).await?;
*changed = true;
}

View file

@ -1,9 +1,7 @@
use chrono::NaiveTime;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::settings;
use rppal_pfd::PiFaceDigital;
use serde_derive::Deserialize;
use crate::drivers;
#[derive(Clone, Debug, Deserialize)]
@ -28,7 +26,6 @@ pub struct Settings {
pub logging: settings::Logging,
pub name: String,
pub midnight: NaiveTime,
pub relays: Vec<Relay>,
}
@ -41,7 +38,6 @@ impl Default for Settings {
logging: settings::Logging::default(),
name: String::from("Emgauwa Controller"),
midnight: NaiveTime::default(),
relays: Vec::new(),
}
}
@ -75,11 +71,7 @@ impl Settings {
initial_states: Vec<bool>,
) -> Result<Vec<Box<dyn drivers::RelayDriver>>, EmgauwaError> {
let mut drivers = Vec::new();
let result: Result<(), EmgauwaError> =
self.relays
.iter()
.zip(initial_states)
.try_for_each(|(relay, state)| {
let result: Result<(), EmgauwaError> = self.relays.iter().zip(initial_states.into_iter()).try_for_each(|(relay, state)| {
let driver = relay.make_driver(pfd, state)?;
drivers.push(driver);
Ok(())

66
src/utils.rs Normal file
View file

@ -0,0 +1,66 @@
use std::sync::Arc;
use actix::Addr;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::Controller;
use emgauwa_common::types::RelayStates;
use tokio::sync::Notify;
use crate::app_state;
use crate::app_state::AppState;
pub async fn app_state_get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
app_state
.send(app_state::GetThis {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_get_relay_notifier(
app_state: &Addr<AppState>,
) -> Result<Arc<Notify>, EmgauwaError> {
app_state
.send(app_state::GetRelayNotifier {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_get_controller_notifier(
app_state: &Addr<AppState>,
) -> Result<Arc<Notify>, EmgauwaError> {
app_state
.send(app_state::GetControllerNotifier {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
app_state
.send(app_state::Reload {})
.await
.map_err(EmgauwaError::from)?
}
pub async fn app_state_update_relays_on(
app_state: &Addr<AppState>,
relay_states: RelayStates,
) -> Result<(), EmgauwaError> {
app_state
.send(app_state::UpdateRelayStates { relay_states })
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_relay_pulse(
app_state: &Addr<AppState>,
relay_number: i64,
duration: Option<u32>,
) -> Result<(), EmgauwaError> {
app_state
.send(app_state::RelayPulse {
relay_number,
duration,
})
.await
.map_err(EmgauwaError::from)?
}

View file

@ -1,180 +0,0 @@
use actix::Addr;
use sqlx::{Pool, Sqlite};
use sqlx::pool::PoolConnection;
use tokio_tungstenite::tungstenite;
use tokio_tungstenite::tungstenite::Message;
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{Controller, Relay};
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
use crate::app_state::AppState;
use crate::app_state;
pub async fn handle_message(
pool: Pool<Sqlite>,
app_state: &Addr<AppState>,
message_result: Result<Message, tungstenite::Error>,
) {
let msg = match message_result {
Ok(msg) => msg,
Err(err) => {
log::error!("Error reading message: {}", err);
return;
}
};
if let Message::Text(text) = msg {
match serde_json::from_str(&text) {
Ok(action) => {
log::debug!("Received action: {:?}", action);
let mut pool_conn = match pool.acquire().await {
Ok(conn) => conn,
Err(err) => {
log::error!("Failed to acquire database connection: {:?}", err);
return;
}
};
let action_res = handle_action(&mut pool_conn, app_state, action).await;
if let Err(e) = action_res {
log::error!("Error handling action: {:?}", e);
}
}
Err(e) => {
log::error!("Error deserializing action: {:?}", e);
}
}
}
}
pub async fn handle_action(
conn: &mut PoolConnection<Sqlite>,
app_state: &Addr<AppState>,
action: ControllerWsAction,
) -> Result<(), EmgauwaError> {
let this = AppState::get_this(app_state).await?;
match action {
ControllerWsAction::Controller(controller) => {
handle_controller(conn, &this, controller).await?
}
ControllerWsAction::Relays(relays) => handle_relays(conn, app_state, &this, relays).await?,
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
ControllerWsAction::RelayPulse((relay_num, duration)) => {
handle_relay_pulse(app_state, relay_num, duration).await?
}
_ => return Ok(()),
};
AppState::trigger_reload(app_state).await
}
async fn handle_controller(
conn: &mut PoolConnection<Sqlite>,
this: &Controller,
controller: Controller,
) -> Result<(), EmgauwaError> {
if controller.c.uid != this.c.uid {
return Err(EmgauwaError::Other(String::from(
"Controller UID mismatch during update",
)));
}
DbController::get_by_uid(conn, &controller.c.uid)
.await?
.ok_or(DatabaseError::NotFound)?
.update(conn, controller.c.name.as_str(), this.c.relay_count)
.await?;
Ok(())
}
async fn handle_schedules(
conn: &mut PoolConnection<Sqlite>,
schedules: Vec<DbSchedule>,
) -> Result<(), EmgauwaError> {
let mut handled_uids = vec![
// on and off schedules are always present and should not be updated
ScheduleUid::On,
ScheduleUid::Off,
];
for schedule in schedules {
if handled_uids.contains(&schedule.uid) {
continue;
}
handled_uids.push(schedule.uid.clone());
log::debug!("Handling schedule: {:?}", schedule);
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
if let Some(schedule_db) = schedule_db {
schedule_db
.update(conn, schedule.name.as_str(), &schedule.periods)
.await?;
} else {
DbSchedule::create(
conn,
schedule.uid.clone(),
schedule.name.as_str(),
&schedule.periods,
)
.await?;
}
}
Ok(())
}
async fn handle_relays(
conn: &mut PoolConnection<Sqlite>,
app_state: &Addr<AppState>,
this: &Controller,
relays: Vec<Relay>,
) -> Result<(), EmgauwaError> {
for relay in relays {
if relay.r.controller_uid != this.c.uid {
return Err(EmgauwaError::Other(String::from(
"Controller UID mismatch during relay update",
)));
}
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
.await?
.ok_or(DatabaseError::NotFound)?;
db_relay.update(conn, relay.r.name.as_str()).await?;
handle_schedules(conn, relay.schedules.clone()).await?;
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
for schedule in &relay.schedules {
schedules.push(
DbSchedule::get_by_uid(conn, &schedule.uid)
.await?
.ok_or(DatabaseError::NotFound)?,
);
}
app_state
.send(app_state::RelayOverrideSchedule {
relay_number: relay.r.number,
schedule: relay.override_schedule.clone()
})
.await??;
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
}
Ok(())
}
async fn handle_relay_pulse(
app_state: &Addr<AppState>,
relay_number: i64,
duration: Option<u32>,
) -> Result<(), EmgauwaError> {
app_state
.send(app_state::RelayPulse {
relay_number,
duration,
})
.await?
}

View file

@ -1,18 +1,19 @@
mod handlers;
use actix::Addr;
use emgauwa_common::constants::WEBSOCKET_RETRY_TIMEOUT;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::types::ControllerWsAction;
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{Controller, Relay};
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
use futures::{future, pin_mut, SinkExt, StreamExt};
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use tokio::time;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use crate::app_state;
use tokio_tungstenite::{connect_async, tungstenite};
use crate::app_state::AppState;
use crate::ws::handlers::handle_message;
use crate::utils;
use crate::utils::{app_state_get_relay_notifier, app_state_get_this};
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
log::debug!("Spawned ws loop");
@ -43,7 +44,7 @@ async fn run_websocket(
let (mut write, read) = ws_stream.split();
let ws_action = ControllerWsAction::Register(AppState::get_this(app_state).await?);
let ws_action = ControllerWsAction::Register(app_state_get_this(app_state).await?);
let ws_action_json = serde_json::to_string(&ws_action)?;
if let Err(err) = write.send(Message::text(ws_action_json)).await {
@ -73,14 +74,11 @@ async fn read_app_state(
app_state: Addr<AppState>,
tx: futures_channel::mpsc::UnboundedSender<Message>,
) -> Result<(), EmgauwaError> {
let notifier = &*app_state
.send(app_state::GetRelayNotifier {})
.await?;
let notifier = &*app_state_get_relay_notifier(&app_state).await?;
loop {
notifier.notified().await;
log::debug!("Relay change detected");
let this = AppState::get_this(&app_state).await?;
let this = app_state_get_this(&app_state).await?;
let relay_states = this.get_relay_states();
let ws_action = ControllerWsAction::RelayStates((this.c.uid, relay_states));
@ -93,3 +91,157 @@ async fn read_app_state(
})?;
}
}
async fn handle_message(
pool: Pool<Sqlite>,
app_state: &Addr<AppState>,
message_result: Result<Message, tungstenite::Error>,
) {
let msg = match message_result {
Ok(msg) => msg,
Err(err) => {
log::error!("Error reading message: {}", err);
return;
}
};
if let Message::Text(text) = msg {
match serde_json::from_str(&text) {
Ok(action) => {
log::debug!("Received action: {:?}", action);
let mut pool_conn = match pool.acquire().await {
Ok(conn) => conn,
Err(err) => {
log::error!("Failed to acquire database connection: {:?}", err);
return;
}
};
let action_res = handle_action(&mut pool_conn, app_state, action).await;
if let Err(e) = action_res {
log::error!("Error handling action: {:?}", e);
}
}
Err(e) => {
log::error!("Error deserializing action: {:?}", e);
}
}
}
}
pub async fn handle_action(
conn: &mut PoolConnection<Sqlite>,
app_state: &Addr<AppState>,
action: ControllerWsAction,
) -> Result<(), EmgauwaError> {
let this = app_state_get_this(app_state).await?;
match action {
ControllerWsAction::Controller(controller) => {
handle_controller(conn, &this, controller).await?
}
ControllerWsAction::Relays(relays) => handle_relays(conn, &this, relays).await?,
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
ControllerWsAction::RelayPulse((relay_num, duration)) => {
handle_relay_pulse(app_state, relay_num, duration).await?
}
_ => return Ok(()),
};
utils::app_state_reload(app_state).await
}
async fn handle_controller(
conn: &mut PoolConnection<Sqlite>,
this: &Controller,
controller: Controller,
) -> Result<(), EmgauwaError> {
if controller.c.uid != this.c.uid {
return Err(EmgauwaError::Other(String::from(
"Controller UID mismatch during update",
)));
}
DbController::get_by_uid(conn, &controller.c.uid)
.await?
.ok_or(DatabaseError::NotFound)?
.update(conn, controller.c.name.as_str(), this.c.relay_count)
.await?;
Ok(())
}
async fn handle_schedules(
conn: &mut PoolConnection<Sqlite>,
schedules: Vec<DbSchedule>,
) -> Result<(), EmgauwaError> {
let mut handled_uids = vec![
// on and off schedules are always present and should not be updated
ScheduleUid::On,
ScheduleUid::Off,
];
for schedule in schedules {
if handled_uids.contains(&schedule.uid) {
continue;
}
handled_uids.push(schedule.uid.clone());
log::debug!("Handling schedule: {:?}", schedule);
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
if let Some(schedule_db) = schedule_db {
schedule_db
.update(conn, schedule.name.as_str(), &schedule.periods)
.await?;
} else {
DbSchedule::create(
conn,
schedule.uid.clone(),
schedule.name.as_str(),
&schedule.periods,
)
.await?;
}
}
Ok(())
}
async fn handle_relays(
conn: &mut PoolConnection<Sqlite>,
this: &Controller,
relays: Vec<Relay>,
) -> Result<(), EmgauwaError> {
for relay in relays {
if relay.controller.uid != this.c.uid {
return Err(EmgauwaError::Other(String::from(
"Controller UID mismatch during relay update",
)));
}
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
.await?
.ok_or(DatabaseError::NotFound)?;
db_relay.update(conn, relay.r.name.as_str()).await?;
handle_schedules(conn, relay.schedules.clone()).await?;
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
for schedule in relay.schedules {
schedules.push(
DbSchedule::get_by_uid(conn, &schedule.uid)
.await?
.ok_or(DatabaseError::NotFound)?,
);
}
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
}
Ok(())
}
async fn handle_relay_pulse(
app_state: &Addr<AppState>,
relay_num: i64,
duration: Option<u32>,
) -> Result<(), EmgauwaError> {
utils::app_state_relay_pulse(app_state, relay_num, duration).await
}