Compare commits
No commits in common. "b065c8dd972df43516d49a57db228f1ad2ba6648" and "6df724f2db8e2a33074132b8e4e1013c25836e9f" have entirely different histories.
b065c8dd97
...
6df724f2db
11 changed files with 301 additions and 323 deletions
BIN
Cargo.lock
generated
BIN
Cargo.lock
generated
Binary file not shown.
|
@ -1,12 +1,11 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix::{Actor, Addr, Context, Handler, Message};
|
||||
use actix::{Actor, Context, Handler, Message};
|
||||
use emgauwa_common::constants;
|
||||
use emgauwa_common::db::DbSchedule;
|
||||
use emgauwa_common::errors::EmgauwaError;
|
||||
use emgauwa_common::models::Controller;
|
||||
use emgauwa_common::types::{RelayStates, Weekday};
|
||||
use emgauwa_common::types::RelayStates;
|
||||
use futures::executor::block_on;
|
||||
use sqlx::{Pool, Sqlite};
|
||||
use tokio::sync::Notify;
|
||||
|
@ -31,14 +30,6 @@ pub struct RelayPulse {
|
|||
pub duration: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(), EmgauwaError>")]
|
||||
pub struct RelayOverrideSchedule {
|
||||
pub relay_number: i64,
|
||||
pub schedule: Option<DbSchedule>,
|
||||
pub weekday: Weekday,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Controller")]
|
||||
pub struct GetThis {}
|
||||
|
@ -84,20 +75,6 @@ impl AppState {
|
|||
pub fn notify_relay_change(&self) {
|
||||
self.relay_notifier.notify_one();
|
||||
}
|
||||
|
||||
pub async fn trigger_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
||||
app_state
|
||||
.send(Reload {})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)?
|
||||
}
|
||||
|
||||
pub async fn get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
|
||||
app_state
|
||||
.send(GetThis {})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for AppState {
|
||||
|
@ -128,7 +105,7 @@ impl Handler<UpdateRelayStates> for AppState {
|
|||
.iter_mut()
|
||||
.zip(msg.relay_states.iter())
|
||||
.for_each(|(driver, state)| {
|
||||
if let Err(e) = driver.set(state.is_on.unwrap_or(false)) {
|
||||
if let Err(e) = driver.set(state.unwrap_or(false)) {
|
||||
log::error!("Error setting relay: {}", e);
|
||||
}
|
||||
});
|
||||
|
@ -171,35 +148,6 @@ impl Handler<RelayPulse> for AppState {
|
|||
}
|
||||
}
|
||||
|
||||
impl Handler<RelayOverrideSchedule> for AppState {
|
||||
type Result = Result<(), EmgauwaError>;
|
||||
|
||||
fn handle(&mut self, msg: RelayOverrideSchedule, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let relay_num = msg.relay_number;
|
||||
let schedule = msg.schedule;
|
||||
let weekday = msg.weekday;
|
||||
|
||||
let relay = self
|
||||
.this
|
||||
.relays
|
||||
.iter_mut()
|
||||
.find(|r| r.r.number == relay_num)
|
||||
.ok_or(EmgauwaError::Other(String::from("Relay not found")))?;
|
||||
|
||||
log::debug!(
|
||||
"Overriding schedule for relay {} to '{}' on day {}",
|
||||
relay_num,
|
||||
schedule.as_ref().map_or("NONE", |s| &s.name),
|
||||
weekday
|
||||
);
|
||||
|
||||
relay.override_schedule = schedule;
|
||||
relay.override_schedule_weekday = weekday;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<GetThis> for AppState {
|
||||
type Result = Controller;
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ pub use gpio::GpioDriver;
|
|||
pub use null::NullDriver;
|
||||
pub use piface::PiFaceDriver;
|
||||
use serde::{Deserialize, Deserializer};
|
||||
|
||||
use crate::errors::EmgauwaControllerError;
|
||||
|
||||
mod gpio;
|
||||
|
|
|
@ -11,11 +11,9 @@ pub struct PiFaceDriver {
|
|||
|
||||
impl PiFaceDriver {
|
||||
pub fn new(pin: u8, pfd: &Option<PiFaceDigital>) -> Result<Self, EmgauwaControllerError> {
|
||||
let pfd = pfd
|
||||
.as_ref()
|
||||
.ok_or(EmgauwaControllerError::Hardware(String::from(
|
||||
"PiFaceDigital not initialized",
|
||||
)))?;
|
||||
let pfd = pfd.as_ref().ok_or(EmgauwaControllerError::Hardware(String::from(
|
||||
"PiFaceDigital not initialized",
|
||||
)))?;
|
||||
let pfd_pin = pfd.get_output_pin(pin)?;
|
||||
Ok(Self { pfd_pin })
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use emgauwa_common::errors::EmgauwaError;
|
||||
use rppal::gpio;
|
||||
use rppal_mcp23s17::Mcp23s17Error;
|
||||
use rppal_pfd::PiFaceDigitalError;
|
||||
|
||||
use emgauwa_common::errors::EmgauwaError;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum EmgauwaControllerError {
|
||||
Hardware(String),
|
||||
|
|
21
src/main.rs
21
src/main.rs
|
@ -15,10 +15,11 @@ use crate::ws::run_ws_loop;
|
|||
|
||||
mod app_state;
|
||||
mod drivers;
|
||||
mod errors;
|
||||
mod relay_loop;
|
||||
mod settings;
|
||||
mod utils;
|
||||
mod ws;
|
||||
mod errors;
|
||||
|
||||
async fn create_this_controller(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
|
@ -79,10 +80,14 @@ async fn main() -> Result<(), std::io::Error> {
|
|||
};
|
||||
|
||||
for relay in &settings.relays {
|
||||
if DbRelay::get_by_controller_and_num(&mut conn, &db_controller, relay.number)
|
||||
.await
|
||||
.map_err(EmgauwaError::from)?
|
||||
.is_none()
|
||||
if DbRelay::get_by_controller_and_num(
|
||||
&mut conn,
|
||||
&db_controller,
|
||||
relay.number,
|
||||
)
|
||||
.await
|
||||
.map_err(EmgauwaError::from)?
|
||||
.is_none()
|
||||
{
|
||||
create_this_relay(&mut conn, &db_controller, relay)
|
||||
.await
|
||||
|
@ -98,11 +103,7 @@ async fn main() -> Result<(), std::io::Error> {
|
|||
let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?;
|
||||
|
||||
let now = chrono::Local::now().time();
|
||||
let initial_states: Vec<bool> = this
|
||||
.relays
|
||||
.iter()
|
||||
.map(|r| r.active_schedule.is_on(&now))
|
||||
.collect();
|
||||
let initial_states: Vec<bool> = this.relays.iter().map(|r| r.active_schedule.is_on(&now)).collect();
|
||||
|
||||
let mut pfd: Option<PiFaceDigital> = None;
|
||||
let drivers = settings.relays_make_drivers(&mut pfd, initial_states)?;
|
||||
|
|
|
@ -1,18 +1,19 @@
|
|||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix::Addr;
|
||||
use chrono::Timelike;
|
||||
use chrono::{Local, Timelike};
|
||||
use emgauwa_common::constants::RELAYS_RETRY_TIMEOUT;
|
||||
use emgauwa_common::errors::EmgauwaError;
|
||||
use emgauwa_common::models::Controller;
|
||||
use emgauwa_common::types::{EmgauwaNow, Weekday};
|
||||
use emgauwa_common::types::{RelayStates, Weekday};
|
||||
use emgauwa_common::utils::printable_relay_states;
|
||||
use futures::pin_mut;
|
||||
use tokio::time;
|
||||
use tokio::time::timeout;
|
||||
use utils::app_state_get_controller_notifier;
|
||||
|
||||
use crate::app_state::AppState;
|
||||
use crate::app_state;
|
||||
use crate::utils;
|
||||
|
||||
pub async fn run_relays_loop(app_state: Addr<AppState>) {
|
||||
log::debug!("Spawned relays loop");
|
||||
|
@ -26,49 +27,47 @@ pub async fn run_relays_loop(app_state: Addr<AppState>) {
|
|||
}
|
||||
|
||||
async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
||||
let notifier = &*app_state
|
||||
.send(app_state::GetControllerNotifier {})
|
||||
.await?;
|
||||
let notifier = &*app_state_get_controller_notifier(app_state).await?;
|
||||
|
||||
let mut last_weekday = emgauwa_common::utils::get_weekday();
|
||||
let mut this = AppState::get_this(app_state).await?;
|
||||
let mut this = utils::app_state_get_this(app_state).await?;
|
||||
let mut relay_states: RelayStates = Vec::new();
|
||||
init_relay_states(&mut relay_states, &this);
|
||||
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
|
||||
|
||||
let mut duration_override = None;
|
||||
|
||||
loop {
|
||||
let now = EmgauwaNow::now();
|
||||
log::debug!(
|
||||
"Relay loop at {}: {}",
|
||||
Local::now().naive_local().time(),
|
||||
printable_relay_states(&this.get_relay_states())
|
||||
);
|
||||
|
||||
let notifier_future = notifier.notified();
|
||||
pin_mut!(notifier_future);
|
||||
let mut changed = timeout(
|
||||
get_next_duration(&mut this, &mut duration_override, &now),
|
||||
get_next_duration(&this, &mut duration_override),
|
||||
&mut notifier_future,
|
||||
)
|
||||
.await
|
||||
.is_ok();
|
||||
|
||||
let now = EmgauwaNow::now();
|
||||
|
||||
check_weekday(app_state, &mut last_weekday, &mut changed, &now).await?;
|
||||
check_weekday(app_state, &mut last_weekday, &mut changed).await?;
|
||||
|
||||
if changed {
|
||||
log::debug!("Reloading controller in relay loop");
|
||||
this = AppState::get_this(app_state).await?;
|
||||
this = utils::app_state_get_this(app_state).await?;
|
||||
}
|
||||
|
||||
log::debug!(
|
||||
"Relay loop at {}: {}",
|
||||
now.time,
|
||||
printable_relay_states(&this.get_relay_states())
|
||||
);
|
||||
|
||||
let now_pulse = Instant::now();
|
||||
duration_override = this
|
||||
.relays
|
||||
.iter_mut()
|
||||
.filter_map(|relay| match relay.check_pulsing(&now.instant) {
|
||||
.filter_map(|relay| match relay.check_pulsing(&now_pulse) {
|
||||
None => None,
|
||||
Some(pulse) => {
|
||||
let dur = pulse - now.instant;
|
||||
let dur = pulse - now_pulse;
|
||||
log::debug!(
|
||||
"Pulsing relay {} for {}s until {:?} ",
|
||||
relay.r.number,
|
||||
|
@ -80,48 +79,50 @@ async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
|||
})
|
||||
.min();
|
||||
|
||||
calc_relay_states(&mut this, app_state, &now).await?;
|
||||
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
|
||||
}
|
||||
}
|
||||
|
||||
fn init_relay_states(relay_states: &mut RelayStates, this: &Controller) {
|
||||
relay_states.clear();
|
||||
for _ in 0..this.c.relay_count {
|
||||
relay_states.push(None);
|
||||
}
|
||||
}
|
||||
|
||||
async fn calc_relay_states(
|
||||
relay_states: &mut RelayStates,
|
||||
this: &mut Controller,
|
||||
app_state: &Addr<AppState>,
|
||||
now: &EmgauwaNow,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let now = Local::now().time();
|
||||
let now_pulse = Instant::now();
|
||||
|
||||
this.relays
|
||||
.iter_mut()
|
||||
.for_each(|relay| {
|
||||
relay.reload_active_schedule(now.weekday);
|
||||
.zip(relay_states.iter_mut())
|
||||
.for_each(|(relay, state)| {
|
||||
relay.is_on = Some(
|
||||
relay.active_schedule.is_on(&now.time)
|
||||
|| relay.check_pulsing(&now.instant).is_some(),
|
||||
relay.active_schedule.is_on(&now) || relay.check_pulsing(&now_pulse).is_some(),
|
||||
);
|
||||
*state = relay.is_on;
|
||||
});
|
||||
|
||||
app_state
|
||||
.send(app_state::UpdateRelayStates {
|
||||
relay_states: this.get_relay_states()
|
||||
})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)
|
||||
utils::app_state_update_relays_on(app_state, relay_states.clone()).await
|
||||
}
|
||||
|
||||
fn get_next_duration(
|
||||
this: &mut Controller,
|
||||
duration_override: &mut Option<Duration>,
|
||||
now: &EmgauwaNow,
|
||||
) -> Duration {
|
||||
fn get_next_duration(this: &Controller, duration_override: &mut Option<Duration>) -> Duration {
|
||||
if let Some(duration) = duration_override {
|
||||
log::debug!("Duration override. Waiting for {}s", duration.as_secs());
|
||||
return *duration;
|
||||
}
|
||||
|
||||
let now = Local::now().time();
|
||||
let now_in_s = now.num_seconds_from_midnight();
|
||||
let next_timestamp = this
|
||||
.check_next_time(now)
|
||||
.get_next_time(&now)
|
||||
.map_or(86400, |t| t.num_seconds_from_midnight());
|
||||
|
||||
let duration_to_next = Duration::from_secs((next_timestamp - now.time_in_s()) as u64);
|
||||
let duration_to_next = Duration::from_secs((next_timestamp - now_in_s) as u64);
|
||||
|
||||
log::debug!(
|
||||
"Next timestamp: {}; Waiting for {}s",
|
||||
|
@ -136,13 +137,12 @@ async fn check_weekday(
|
|||
app_state: &Addr<AppState>,
|
||||
last_weekday: &mut Weekday,
|
||||
changed: &mut bool,
|
||||
now: &EmgauwaNow,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let current_weekday = now.weekday;
|
||||
let current_weekday = emgauwa_common::utils::get_weekday();
|
||||
if current_weekday.ne(last_weekday) {
|
||||
log::debug!("Weekday changed");
|
||||
*last_weekday = current_weekday;
|
||||
AppState::trigger_reload(app_state).await?;
|
||||
utils::app_state_reload(app_state).await?;
|
||||
*changed = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ use emgauwa_common::errors::EmgauwaError;
|
|||
use emgauwa_common::settings;
|
||||
use rppal_pfd::PiFaceDigital;
|
||||
use serde_derive::Deserialize;
|
||||
|
||||
use crate::drivers;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
|
@ -70,17 +69,13 @@ impl Settings {
|
|||
&self,
|
||||
pfd: &mut Option<PiFaceDigital>,
|
||||
initial_states: Vec<bool>,
|
||||
) -> Result<Vec<Box<dyn drivers::RelayDriver>>, EmgauwaError> {
|
||||
) -> Result<Vec<Box<dyn drivers::RelayDriver>>, EmgauwaError> {
|
||||
let mut drivers = Vec::new();
|
||||
let result: Result<(), EmgauwaError> =
|
||||
self.relays
|
||||
.iter()
|
||||
.zip(initial_states)
|
||||
.try_for_each(|(relay, state)| {
|
||||
let driver = relay.make_driver(pfd, state)?;
|
||||
drivers.push(driver);
|
||||
Ok(())
|
||||
});
|
||||
let result: Result<(), EmgauwaError> = self.relays.iter().zip(initial_states.into_iter()).try_for_each(|(relay, state)| {
|
||||
let driver = relay.make_driver(pfd, state)?;
|
||||
drivers.push(driver);
|
||||
Ok(())
|
||||
});
|
||||
result?;
|
||||
Ok(drivers)
|
||||
}
|
||||
|
|
66
src/utils.rs
Normal file
66
src/utils.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use actix::Addr;
|
||||
use emgauwa_common::errors::EmgauwaError;
|
||||
use emgauwa_common::models::Controller;
|
||||
use emgauwa_common::types::RelayStates;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::app_state;
|
||||
use crate::app_state::AppState;
|
||||
|
||||
pub async fn app_state_get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::GetThis {})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)
|
||||
}
|
||||
|
||||
pub async fn app_state_get_relay_notifier(
|
||||
app_state: &Addr<AppState>,
|
||||
) -> Result<Arc<Notify>, EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::GetRelayNotifier {})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)
|
||||
}
|
||||
|
||||
pub async fn app_state_get_controller_notifier(
|
||||
app_state: &Addr<AppState>,
|
||||
) -> Result<Arc<Notify>, EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::GetControllerNotifier {})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)
|
||||
}
|
||||
|
||||
pub async fn app_state_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::Reload {})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)?
|
||||
}
|
||||
|
||||
pub async fn app_state_update_relays_on(
|
||||
app_state: &Addr<AppState>,
|
||||
relay_states: RelayStates,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::UpdateRelayStates { relay_states })
|
||||
.await
|
||||
.map_err(EmgauwaError::from)
|
||||
}
|
||||
|
||||
pub async fn app_state_relay_pulse(
|
||||
app_state: &Addr<AppState>,
|
||||
relay_number: i64,
|
||||
duration: Option<u32>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::RelayPulse {
|
||||
relay_number,
|
||||
duration,
|
||||
})
|
||||
.await
|
||||
.map_err(EmgauwaError::from)?
|
||||
}
|
|
@ -1,181 +0,0 @@
|
|||
use actix::Addr;
|
||||
use sqlx::{Pool, Sqlite};
|
||||
use sqlx::pool::PoolConnection;
|
||||
use tokio_tungstenite::tungstenite;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
|
||||
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
|
||||
use emgauwa_common::models::{Controller, Relay};
|
||||
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
|
||||
|
||||
use crate::app_state::AppState;
|
||||
use crate::app_state;
|
||||
|
||||
pub async fn handle_message(
|
||||
pool: Pool<Sqlite>,
|
||||
app_state: &Addr<AppState>,
|
||||
message_result: Result<Message, tungstenite::Error>,
|
||||
) {
|
||||
let msg = match message_result {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
log::error!("Error reading message: {}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Message::Text(text) = msg {
|
||||
match serde_json::from_str(&text) {
|
||||
Ok(action) => {
|
||||
log::debug!("Received action: {:?}", action);
|
||||
let mut pool_conn = match pool.acquire().await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
log::error!("Failed to acquire database connection: {:?}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let action_res = handle_action(&mut pool_conn, app_state, action).await;
|
||||
if let Err(e) = action_res {
|
||||
log::error!("Error handling action: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Error deserializing action: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_action(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
app_state: &Addr<AppState>,
|
||||
action: ControllerWsAction,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let this = AppState::get_this(app_state).await?;
|
||||
|
||||
match action {
|
||||
ControllerWsAction::Controller(controller) => {
|
||||
handle_controller(conn, &this, controller).await?
|
||||
}
|
||||
ControllerWsAction::Relays(relays) => handle_relays(conn, app_state, &this, relays).await?,
|
||||
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
|
||||
ControllerWsAction::RelayPulse((relay_num, duration)) => {
|
||||
handle_relay_pulse(app_state, relay_num, duration).await?
|
||||
}
|
||||
_ => return Ok(()),
|
||||
};
|
||||
|
||||
AppState::trigger_reload(app_state).await
|
||||
}
|
||||
|
||||
async fn handle_controller(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
this: &Controller,
|
||||
controller: Controller,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
if controller.c.uid != this.c.uid {
|
||||
return Err(EmgauwaError::Other(String::from(
|
||||
"Controller UID mismatch during update",
|
||||
)));
|
||||
}
|
||||
DbController::get_by_uid(conn, &controller.c.uid)
|
||||
.await?
|
||||
.ok_or(DatabaseError::NotFound)?
|
||||
.update(conn, controller.c.name.as_str(), this.c.relay_count)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_schedules(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
schedules: Vec<DbSchedule>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let mut handled_uids = vec![
|
||||
// on and off schedules are always present and should not be updated
|
||||
ScheduleUid::On,
|
||||
ScheduleUid::Off,
|
||||
];
|
||||
for schedule in schedules {
|
||||
if handled_uids.contains(&schedule.uid) {
|
||||
continue;
|
||||
}
|
||||
handled_uids.push(schedule.uid.clone());
|
||||
|
||||
log::debug!("Handling schedule: {:?}", schedule);
|
||||
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
|
||||
|
||||
if let Some(schedule_db) = schedule_db {
|
||||
schedule_db
|
||||
.update(conn, schedule.name.as_str(), &schedule.periods)
|
||||
.await?;
|
||||
} else {
|
||||
DbSchedule::create(
|
||||
conn,
|
||||
schedule.uid.clone(),
|
||||
schedule.name.as_str(),
|
||||
&schedule.periods,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_relays(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
app_state: &Addr<AppState>,
|
||||
this: &Controller,
|
||||
relays: Vec<Relay>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
for relay in relays {
|
||||
if relay.controller.uid != this.c.uid {
|
||||
return Err(EmgauwaError::Other(String::from(
|
||||
"Controller UID mismatch during relay update",
|
||||
)));
|
||||
}
|
||||
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
|
||||
.await?
|
||||
.ok_or(DatabaseError::NotFound)?;
|
||||
|
||||
db_relay.update(conn, relay.r.name.as_str()).await?;
|
||||
|
||||
handle_schedules(conn, relay.schedules.clone()).await?;
|
||||
|
||||
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
|
||||
for schedule in &relay.schedules {
|
||||
schedules.push(
|
||||
DbSchedule::get_by_uid(conn, &schedule.uid)
|
||||
.await?
|
||||
.ok_or(DatabaseError::NotFound)?,
|
||||
);
|
||||
}
|
||||
|
||||
app_state
|
||||
.send(app_state::RelayOverrideSchedule {
|
||||
relay_number: relay.r.number,
|
||||
schedule: relay.override_schedule.clone(),
|
||||
weekday: relay.override_schedule_weekday,
|
||||
})
|
||||
.await??;
|
||||
|
||||
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_relay_pulse(
|
||||
app_state: &Addr<AppState>,
|
||||
relay_number: i64,
|
||||
duration: Option<u32>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
app_state
|
||||
.send(app_state::RelayPulse {
|
||||
relay_number,
|
||||
duration,
|
||||
})
|
||||
.await?
|
||||
}
|
178
src/ws/mod.rs
178
src/ws/mod.rs
|
@ -1,18 +1,19 @@
|
|||
mod handlers;
|
||||
|
||||
use actix::Addr;
|
||||
use emgauwa_common::constants::WEBSOCKET_RETRY_TIMEOUT;
|
||||
use emgauwa_common::errors::EmgauwaError;
|
||||
use emgauwa_common::types::ControllerWsAction;
|
||||
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
|
||||
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
|
||||
use emgauwa_common::models::{Controller, Relay};
|
||||
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
|
||||
use futures::{future, pin_mut, SinkExt, StreamExt};
|
||||
use sqlx::pool::PoolConnection;
|
||||
use sqlx::{Pool, Sqlite};
|
||||
use tokio::time;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use crate::app_state;
|
||||
use tokio_tungstenite::{connect_async, tungstenite};
|
||||
|
||||
use crate::app_state::AppState;
|
||||
use crate::ws::handlers::handle_message;
|
||||
use crate::utils;
|
||||
use crate::utils::{app_state_get_relay_notifier, app_state_get_this};
|
||||
|
||||
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
|
||||
log::debug!("Spawned ws loop");
|
||||
|
@ -43,7 +44,7 @@ async fn run_websocket(
|
|||
|
||||
let (mut write, read) = ws_stream.split();
|
||||
|
||||
let ws_action = ControllerWsAction::Register(AppState::get_this(app_state).await?);
|
||||
let ws_action = ControllerWsAction::Register(app_state_get_this(app_state).await?);
|
||||
|
||||
let ws_action_json = serde_json::to_string(&ws_action)?;
|
||||
if let Err(err) = write.send(Message::text(ws_action_json)).await {
|
||||
|
@ -73,14 +74,11 @@ async fn read_app_state(
|
|||
app_state: Addr<AppState>,
|
||||
tx: futures_channel::mpsc::UnboundedSender<Message>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let notifier = &*app_state
|
||||
.send(app_state::GetRelayNotifier {})
|
||||
.await?;
|
||||
|
||||
let notifier = &*app_state_get_relay_notifier(&app_state).await?;
|
||||
loop {
|
||||
notifier.notified().await;
|
||||
log::debug!("Relay change detected");
|
||||
let this = AppState::get_this(&app_state).await?;
|
||||
let this = app_state_get_this(&app_state).await?;
|
||||
let relay_states = this.get_relay_states();
|
||||
let ws_action = ControllerWsAction::RelayStates((this.c.uid, relay_states));
|
||||
|
||||
|
@ -93,3 +91,157 @@ async fn read_app_state(
|
|||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_message(
|
||||
pool: Pool<Sqlite>,
|
||||
app_state: &Addr<AppState>,
|
||||
message_result: Result<Message, tungstenite::Error>,
|
||||
) {
|
||||
let msg = match message_result {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
log::error!("Error reading message: {}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Message::Text(text) = msg {
|
||||
match serde_json::from_str(&text) {
|
||||
Ok(action) => {
|
||||
log::debug!("Received action: {:?}", action);
|
||||
let mut pool_conn = match pool.acquire().await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
log::error!("Failed to acquire database connection: {:?}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let action_res = handle_action(&mut pool_conn, app_state, action).await;
|
||||
if let Err(e) = action_res {
|
||||
log::error!("Error handling action: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Error deserializing action: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_action(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
app_state: &Addr<AppState>,
|
||||
action: ControllerWsAction,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let this = app_state_get_this(app_state).await?;
|
||||
|
||||
match action {
|
||||
ControllerWsAction::Controller(controller) => {
|
||||
handle_controller(conn, &this, controller).await?
|
||||
}
|
||||
ControllerWsAction::Relays(relays) => handle_relays(conn, &this, relays).await?,
|
||||
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
|
||||
ControllerWsAction::RelayPulse((relay_num, duration)) => {
|
||||
handle_relay_pulse(app_state, relay_num, duration).await?
|
||||
}
|
||||
_ => return Ok(()),
|
||||
};
|
||||
|
||||
utils::app_state_reload(app_state).await
|
||||
}
|
||||
|
||||
async fn handle_controller(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
this: &Controller,
|
||||
controller: Controller,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
if controller.c.uid != this.c.uid {
|
||||
return Err(EmgauwaError::Other(String::from(
|
||||
"Controller UID mismatch during update",
|
||||
)));
|
||||
}
|
||||
DbController::get_by_uid(conn, &controller.c.uid)
|
||||
.await?
|
||||
.ok_or(DatabaseError::NotFound)?
|
||||
.update(conn, controller.c.name.as_str(), this.c.relay_count)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_schedules(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
schedules: Vec<DbSchedule>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
let mut handled_uids = vec![
|
||||
// on and off schedules are always present and should not be updated
|
||||
ScheduleUid::On,
|
||||
ScheduleUid::Off,
|
||||
];
|
||||
for schedule in schedules {
|
||||
if handled_uids.contains(&schedule.uid) {
|
||||
continue;
|
||||
}
|
||||
handled_uids.push(schedule.uid.clone());
|
||||
|
||||
log::debug!("Handling schedule: {:?}", schedule);
|
||||
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
|
||||
|
||||
if let Some(schedule_db) = schedule_db {
|
||||
schedule_db
|
||||
.update(conn, schedule.name.as_str(), &schedule.periods)
|
||||
.await?;
|
||||
} else {
|
||||
DbSchedule::create(
|
||||
conn,
|
||||
schedule.uid.clone(),
|
||||
schedule.name.as_str(),
|
||||
&schedule.periods,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_relays(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
this: &Controller,
|
||||
relays: Vec<Relay>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
for relay in relays {
|
||||
if relay.controller.uid != this.c.uid {
|
||||
return Err(EmgauwaError::Other(String::from(
|
||||
"Controller UID mismatch during relay update",
|
||||
)));
|
||||
}
|
||||
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
|
||||
.await?
|
||||
.ok_or(DatabaseError::NotFound)?;
|
||||
|
||||
db_relay.update(conn, relay.r.name.as_str()).await?;
|
||||
|
||||
handle_schedules(conn, relay.schedules.clone()).await?;
|
||||
|
||||
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
|
||||
for schedule in relay.schedules {
|
||||
schedules.push(
|
||||
DbSchedule::get_by_uid(conn, &schedule.uid)
|
||||
.await?
|
||||
.ok_or(DatabaseError::NotFound)?,
|
||||
);
|
||||
}
|
||||
|
||||
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_relay_pulse(
|
||||
app_state: &Addr<AppState>,
|
||||
relay_num: i64,
|
||||
duration: Option<u32>,
|
||||
) -> Result<(), EmgauwaError> {
|
||||
utils::app_state_relay_pulse(app_state, relay_num, duration).await
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue