Compare commits
No commits in common. "5ee542b44b3c7f1d1d59a4dfe7cb0679d44e0f3f" and "6df724f2db8e2a33074132b8e4e1013c25836e9f" have entirely different histories.
5ee542b44b
...
6df724f2db
12 changed files with 316 additions and 355 deletions
BIN
Cargo.lock
generated
BIN
Cargo.lock
generated
Binary file not shown.
|
@ -1,4 +1,4 @@
|
||||||
amends "package://emgauwa.app/pkl/emgauwa@0.2.1#/controller.pkl"
|
amends "package://emgauwa.app/pkl/emgauwa@0.1.1#/controller.pkl"
|
||||||
|
|
||||||
logging {
|
logging {
|
||||||
level = "DEBUG"
|
level = "DEBUG"
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use actix::{Actor, Addr, Context, Handler, Message};
|
use actix::{Actor, Context, Handler, Message};
|
||||||
use emgauwa_common::constants;
|
use emgauwa_common::constants;
|
||||||
use emgauwa_common::db::DbSchedule;
|
|
||||||
use emgauwa_common::errors::EmgauwaError;
|
use emgauwa_common::errors::EmgauwaError;
|
||||||
use emgauwa_common::models::Controller;
|
use emgauwa_common::models::Controller;
|
||||||
use emgauwa_common::types::{EmgauwaNow, RelayStates};
|
use emgauwa_common::types::RelayStates;
|
||||||
use futures::executor::block_on;
|
use futures::executor::block_on;
|
||||||
use sqlx::{Pool, Sqlite};
|
use sqlx::{Pool, Sqlite};
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
|
@ -31,13 +30,6 @@ pub struct RelayPulse {
|
||||||
pub duration: Option<u32>,
|
pub duration: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
|
||||||
#[rtype(result = "Result<(), EmgauwaError>")]
|
|
||||||
pub struct RelayOverrideSchedule {
|
|
||||||
pub relay_number: i64,
|
|
||||||
pub schedule: Option<DbSchedule>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "Controller")]
|
#[rtype(result = "Controller")]
|
||||||
pub struct GetThis {}
|
pub struct GetThis {}
|
||||||
|
@ -83,20 +75,6 @@ impl AppState {
|
||||||
pub fn notify_relay_change(&self) {
|
pub fn notify_relay_change(&self) {
|
||||||
self.relay_notifier.notify_one();
|
self.relay_notifier.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn trigger_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
|
||||||
app_state
|
|
||||||
.send(Reload {})
|
|
||||||
.await
|
|
||||||
.map_err(EmgauwaError::from)?
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
|
|
||||||
app_state
|
|
||||||
.send(GetThis {})
|
|
||||||
.await
|
|
||||||
.map_err(EmgauwaError::from)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor for AppState {
|
impl Actor for AppState {
|
||||||
|
@ -127,7 +105,7 @@ impl Handler<UpdateRelayStates> for AppState {
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.zip(msg.relay_states.iter())
|
.zip(msg.relay_states.iter())
|
||||||
.for_each(|(driver, state)| {
|
.for_each(|(driver, state)| {
|
||||||
if let Err(e) = driver.set(state.is_on.unwrap_or(false)) {
|
if let Err(e) = driver.set(state.unwrap_or(false)) {
|
||||||
log::error!("Error setting relay: {}", e);
|
log::error!("Error setting relay: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -170,35 +148,6 @@ impl Handler<RelayPulse> for AppState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler<RelayOverrideSchedule> for AppState {
|
|
||||||
type Result = Result<(), EmgauwaError>;
|
|
||||||
|
|
||||||
fn handle(&mut self, msg: RelayOverrideSchedule, _ctx: &mut Self::Context) -> Self::Result {
|
|
||||||
let relay_num = msg.relay_number;
|
|
||||||
let schedule = msg.schedule;
|
|
||||||
let weekday = EmgauwaNow::now(&self.settings.midnight).weekday;
|
|
||||||
|
|
||||||
let relay = self
|
|
||||||
.this
|
|
||||||
.relays
|
|
||||||
.iter_mut()
|
|
||||||
.find(|r| r.r.number == relay_num)
|
|
||||||
.ok_or(EmgauwaError::Other(String::from("Relay not found")))?;
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"Overriding schedule for relay {} to '{}' on day {}",
|
|
||||||
relay_num,
|
|
||||||
schedule.as_ref().map_or("NONE", |s| &s.name),
|
|
||||||
weekday
|
|
||||||
);
|
|
||||||
|
|
||||||
relay.override_schedule = schedule;
|
|
||||||
relay.override_schedule_weekday = weekday;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler<GetThis> for AppState {
|
impl Handler<GetThis> for AppState {
|
||||||
type Result = Controller;
|
type Result = Controller;
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ pub use gpio::GpioDriver;
|
||||||
pub use null::NullDriver;
|
pub use null::NullDriver;
|
||||||
pub use piface::PiFaceDriver;
|
pub use piface::PiFaceDriver;
|
||||||
use serde::{Deserialize, Deserializer};
|
use serde::{Deserialize, Deserializer};
|
||||||
|
|
||||||
use crate::errors::EmgauwaControllerError;
|
use crate::errors::EmgauwaControllerError;
|
||||||
|
|
||||||
mod gpio;
|
mod gpio;
|
||||||
|
|
|
@ -11,11 +11,9 @@ pub struct PiFaceDriver {
|
||||||
|
|
||||||
impl PiFaceDriver {
|
impl PiFaceDriver {
|
||||||
pub fn new(pin: u8, pfd: &Option<PiFaceDigital>) -> Result<Self, EmgauwaControllerError> {
|
pub fn new(pin: u8, pfd: &Option<PiFaceDigital>) -> Result<Self, EmgauwaControllerError> {
|
||||||
let pfd = pfd
|
let pfd = pfd.as_ref().ok_or(EmgauwaControllerError::Hardware(String::from(
|
||||||
.as_ref()
|
"PiFaceDigital not initialized",
|
||||||
.ok_or(EmgauwaControllerError::Hardware(String::from(
|
)))?;
|
||||||
"PiFaceDigital not initialized",
|
|
||||||
)))?;
|
|
||||||
let pfd_pin = pfd.get_output_pin(pin)?;
|
let pfd_pin = pfd.get_output_pin(pin)?;
|
||||||
Ok(Self { pfd_pin })
|
Ok(Self { pfd_pin })
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
use std::fmt::{Display, Formatter};
|
use std::fmt::{Display, Formatter};
|
||||||
|
|
||||||
use emgauwa_common::errors::EmgauwaError;
|
|
||||||
use rppal::gpio;
|
use rppal::gpio;
|
||||||
use rppal_mcp23s17::Mcp23s17Error;
|
use rppal_mcp23s17::Mcp23s17Error;
|
||||||
use rppal_pfd::PiFaceDigitalError;
|
use rppal_pfd::PiFaceDigitalError;
|
||||||
|
|
||||||
|
use emgauwa_common::errors::EmgauwaError;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum EmgauwaControllerError {
|
pub enum EmgauwaControllerError {
|
||||||
Hardware(String),
|
Hardware(String),
|
||||||
|
|
36
src/main.rs
36
src/main.rs
|
@ -3,7 +3,7 @@ use emgauwa_common::db;
|
||||||
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
|
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
|
||||||
use emgauwa_common::errors::EmgauwaError;
|
use emgauwa_common::errors::EmgauwaError;
|
||||||
use emgauwa_common::models::{Controller, FromDbModel};
|
use emgauwa_common::models::{Controller, FromDbModel};
|
||||||
use emgauwa_common::types::{EmgauwaNow, EmgauwaUid};
|
use emgauwa_common::types::EmgauwaUid;
|
||||||
use emgauwa_common::utils::{drop_privileges, init_logging};
|
use emgauwa_common::utils::{drop_privileges, init_logging};
|
||||||
use rppal_pfd::PiFaceDigital;
|
use rppal_pfd::PiFaceDigital;
|
||||||
use sqlx::pool::PoolConnection;
|
use sqlx::pool::PoolConnection;
|
||||||
|
@ -15,10 +15,11 @@ use crate::ws::run_ws_loop;
|
||||||
|
|
||||||
mod app_state;
|
mod app_state;
|
||||||
mod drivers;
|
mod drivers;
|
||||||
mod errors;
|
|
||||||
mod relay_loop;
|
mod relay_loop;
|
||||||
mod settings;
|
mod settings;
|
||||||
|
mod utils;
|
||||||
mod ws;
|
mod ws;
|
||||||
|
mod errors;
|
||||||
|
|
||||||
async fn create_this_controller(
|
async fn create_this_controller(
|
||||||
conn: &mut PoolConnection<Sqlite>,
|
conn: &mut PoolConnection<Sqlite>,
|
||||||
|
@ -61,7 +62,7 @@ async fn main() -> Result<(), std::io::Error> {
|
||||||
|
|
||||||
drop_privileges(&settings.permissions)?;
|
drop_privileges(&settings.permissions)?;
|
||||||
|
|
||||||
init_logging(&settings.logging)?;
|
init_logging(&settings.logging.level)?;
|
||||||
|
|
||||||
let pool = db::init(&settings.database, 5)
|
let pool = db::init(&settings.database, 5)
|
||||||
.await
|
.await
|
||||||
|
@ -79,10 +80,14 @@ async fn main() -> Result<(), std::io::Error> {
|
||||||
};
|
};
|
||||||
|
|
||||||
for relay in &settings.relays {
|
for relay in &settings.relays {
|
||||||
if DbRelay::get_by_controller_and_num(&mut conn, &db_controller, relay.number)
|
if DbRelay::get_by_controller_and_num(
|
||||||
.await
|
&mut conn,
|
||||||
.map_err(EmgauwaError::from)?
|
&db_controller,
|
||||||
.is_none()
|
relay.number,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)?
|
||||||
|
.is_none()
|
||||||
{
|
{
|
||||||
create_this_relay(&mut conn, &db_controller, relay)
|
create_this_relay(&mut conn, &db_controller, relay)
|
||||||
.await
|
.await
|
||||||
|
@ -95,17 +100,10 @@ async fn main() -> Result<(), std::io::Error> {
|
||||||
.await
|
.await
|
||||||
.map_err(EmgauwaError::from)?;
|
.map_err(EmgauwaError::from)?;
|
||||||
|
|
||||||
let mut this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?;
|
let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?;
|
||||||
|
|
||||||
let now = EmgauwaNow::now(&settings.midnight);
|
let now = chrono::Local::now().time();
|
||||||
let initial_states: Vec<bool> = this
|
let initial_states: Vec<bool> = this.relays.iter().map(|r| r.active_schedule.is_on(&now)).collect();
|
||||||
.relays
|
|
||||||
.iter_mut()
|
|
||||||
.map(|r| {
|
|
||||||
r.reload_active_schedule(now.weekday);
|
|
||||||
r.is_on(&now.time)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut pfd: Option<PiFaceDigital> = None;
|
let mut pfd: Option<PiFaceDigital> = None;
|
||||||
let drivers = settings.relays_make_drivers(&mut pfd, initial_states)?;
|
let drivers = settings.relays_make_drivers(&mut pfd, initial_states)?;
|
||||||
|
@ -115,12 +113,12 @@ async fn main() -> Result<(), std::io::Error> {
|
||||||
settings.server.host, settings.server.port
|
settings.server.host, settings.server.port
|
||||||
);
|
);
|
||||||
|
|
||||||
let app_state = app_state::AppState::new(pool.clone(), this, settings.clone(), drivers).start();
|
let app_state = app_state::AppState::new(pool.clone(), this, settings, drivers).start();
|
||||||
|
|
||||||
log::info!("Starting main loops");
|
log::info!("Starting main loops");
|
||||||
|
|
||||||
let _ = tokio::join!(
|
let _ = tokio::join!(
|
||||||
tokio::spawn(run_relays_loop(app_state.clone(), settings.clone())),
|
tokio::spawn(run_relays_loop(app_state.clone())),
|
||||||
tokio::spawn(run_ws_loop(pool.clone(), app_state.clone(), url)),
|
tokio::spawn(run_ws_loop(pool.clone(), app_state.clone(), url)),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -1,24 +1,24 @@
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use actix::Addr;
|
use actix::Addr;
|
||||||
use chrono::{Timelike, Weekday};
|
use chrono::{Local, Timelike};
|
||||||
use emgauwa_common::constants::RELAYS_RETRY_TIMEOUT;
|
use emgauwa_common::constants::RELAYS_RETRY_TIMEOUT;
|
||||||
use emgauwa_common::errors::EmgauwaError;
|
use emgauwa_common::errors::EmgauwaError;
|
||||||
use emgauwa_common::models::Controller;
|
use emgauwa_common::models::Controller;
|
||||||
use emgauwa_common::types::EmgauwaNow;
|
use emgauwa_common::types::{RelayStates, Weekday};
|
||||||
use emgauwa_common::utils::printable_relay_states;
|
use emgauwa_common::utils::printable_relay_states;
|
||||||
use futures::pin_mut;
|
use futures::pin_mut;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
use utils::app_state_get_controller_notifier;
|
||||||
|
|
||||||
use crate::app_state::AppState;
|
use crate::app_state::AppState;
|
||||||
use crate::app_state;
|
use crate::utils;
|
||||||
use crate::settings::Settings;
|
|
||||||
|
|
||||||
pub async fn run_relays_loop(app_state: Addr<AppState>, settings: Settings) {
|
pub async fn run_relays_loop(app_state: Addr<AppState>) {
|
||||||
log::debug!("Spawned relays loop");
|
log::debug!("Spawned relays loop");
|
||||||
loop {
|
loop {
|
||||||
let run_result = run_relays(&app_state, &settings).await;
|
let run_result = run_relays(&app_state).await;
|
||||||
if let Err(err) = run_result {
|
if let Err(err) = run_result {
|
||||||
log::error!("Error running relays: {}", err);
|
log::error!("Error running relays: {}", err);
|
||||||
}
|
}
|
||||||
|
@ -26,54 +26,48 @@ pub async fn run_relays_loop(app_state: Addr<AppState>, settings: Settings) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_relays(app_state: &Addr<AppState>, settings: &Settings) -> Result<(), EmgauwaError> {
|
async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
||||||
let notifier = &*app_state
|
let notifier = &*app_state_get_controller_notifier(app_state).await?;
|
||||||
.send(app_state::GetControllerNotifier {})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let mut last_weekday = EmgauwaNow::now(&settings.midnight).weekday;
|
let mut last_weekday = emgauwa_common::utils::get_weekday();
|
||||||
let mut this = AppState::get_this(app_state).await?;
|
let mut this = utils::app_state_get_this(app_state).await?;
|
||||||
|
let mut relay_states: RelayStates = Vec::new();
|
||||||
|
init_relay_states(&mut relay_states, &this);
|
||||||
|
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
|
||||||
|
|
||||||
let mut duration_override = None;
|
let mut duration_override = None;
|
||||||
|
|
||||||
let now = EmgauwaNow::now(&settings.midnight);
|
|
||||||
calc_relay_states(&mut this, app_state, &now).await?;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let now = EmgauwaNow::now(&settings.midnight);
|
log::debug!(
|
||||||
|
"Relay loop at {}: {}",
|
||||||
|
Local::now().naive_local().time(),
|
||||||
|
printable_relay_states(&this.get_relay_states())
|
||||||
|
);
|
||||||
|
|
||||||
let notifier_future = notifier.notified();
|
let notifier_future = notifier.notified();
|
||||||
pin_mut!(notifier_future);
|
pin_mut!(notifier_future);
|
||||||
let mut changed = timeout(
|
let mut changed = timeout(
|
||||||
get_next_duration(&mut this, &mut duration_override, &now),
|
get_next_duration(&this, &mut duration_override),
|
||||||
&mut notifier_future,
|
&mut notifier_future,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.is_ok();
|
.is_ok();
|
||||||
|
|
||||||
let now = EmgauwaNow::now(&settings.midnight);
|
check_weekday(app_state, &mut last_weekday, &mut changed).await?;
|
||||||
|
|
||||||
check_weekday(app_state, &mut last_weekday, &mut changed, &now).await?;
|
|
||||||
|
|
||||||
if changed {
|
if changed {
|
||||||
log::debug!("Reloading controller in relay loop");
|
log::debug!("Reloading controller in relay loop");
|
||||||
this = AppState::get_this(app_state).await?;
|
this = utils::app_state_get_this(app_state).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
log::debug!(
|
let now_pulse = Instant::now();
|
||||||
"Relay loop at {} on {}: {}",
|
|
||||||
now.time,
|
|
||||||
now.weekday.to_string(),
|
|
||||||
printable_relay_states(&this.get_relay_states())
|
|
||||||
);
|
|
||||||
|
|
||||||
duration_override = this
|
duration_override = this
|
||||||
.relays
|
.relays
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.filter_map(|relay| match relay.check_pulsing(&now.instant) {
|
.filter_map(|relay| match relay.check_pulsing(&now_pulse) {
|
||||||
None => None,
|
None => None,
|
||||||
Some(pulse) => {
|
Some(pulse) => {
|
||||||
let dur = pulse - now.instant;
|
let dur = pulse - now_pulse;
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"Pulsing relay {} for {}s until {:?} ",
|
"Pulsing relay {} for {}s until {:?} ",
|
||||||
relay.r.number,
|
relay.r.number,
|
||||||
|
@ -85,60 +79,54 @@ async fn run_relays(app_state: &Addr<AppState>, settings: &Settings) -> Result<(
|
||||||
})
|
})
|
||||||
.min();
|
.min();
|
||||||
|
|
||||||
calc_relay_states(&mut this, app_state, &now).await?;
|
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_relay_states(relay_states: &mut RelayStates, this: &Controller) {
|
||||||
|
relay_states.clear();
|
||||||
|
for _ in 0..this.c.relay_count {
|
||||||
|
relay_states.push(None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn calc_relay_states(
|
async fn calc_relay_states(
|
||||||
|
relay_states: &mut RelayStates,
|
||||||
this: &mut Controller,
|
this: &mut Controller,
|
||||||
app_state: &Addr<AppState>,
|
app_state: &Addr<AppState>,
|
||||||
now: &EmgauwaNow,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
) -> Result<(), EmgauwaError> {
|
||||||
|
let now = Local::now().time();
|
||||||
|
let now_pulse = Instant::now();
|
||||||
|
|
||||||
this.relays
|
this.relays
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.for_each(|relay| {
|
.zip(relay_states.iter_mut())
|
||||||
relay.reload_active_schedule(now.weekday);
|
.for_each(|(relay, state)| {
|
||||||
relay.is_on = Some(
|
relay.is_on = Some(
|
||||||
relay.is_on(&now.time)
|
relay.active_schedule.is_on(&now) || relay.check_pulsing(&now_pulse).is_some(),
|
||||||
|| relay.check_pulsing(&now.instant).is_some(),
|
|
||||||
);
|
);
|
||||||
|
*state = relay.is_on;
|
||||||
});
|
});
|
||||||
|
utils::app_state_update_relays_on(app_state, relay_states.clone()).await
|
||||||
app_state
|
|
||||||
.send(app_state::UpdateRelayStates {
|
|
||||||
relay_states: this.get_relay_states()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(EmgauwaError::from)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_next_duration(
|
fn get_next_duration(this: &Controller, duration_override: &mut Option<Duration>) -> Duration {
|
||||||
this: &mut Controller,
|
|
||||||
duration_override: &mut Option<Duration>,
|
|
||||||
now: &EmgauwaNow,
|
|
||||||
) -> Duration {
|
|
||||||
if let Some(duration) = duration_override {
|
if let Some(duration) = duration_override {
|
||||||
log::debug!("Duration override. Waiting for {}s", duration.as_secs());
|
log::debug!("Duration override. Waiting for {}s", duration.as_secs());
|
||||||
return *duration;
|
return *duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
let next_time = this
|
let now = Local::now().time();
|
||||||
.check_next_time(now)
|
let now_in_s = now.num_seconds_from_midnight();
|
||||||
.unwrap_or(now.midnight);
|
let next_timestamp = this
|
||||||
|
.get_next_time(&now)
|
||||||
|
.map_or(86400, |t| t.num_seconds_from_midnight());
|
||||||
|
|
||||||
// 86400 is the number of seconds in a day
|
let duration_to_next = Duration::from_secs((next_timestamp - now_in_s) as u64);
|
||||||
// If the next timestamp is before the current time, we need to wait until the next day
|
|
||||||
let mut seconds_to_next = (86400 + next_time.num_seconds_from_midnight() - now.num_seconds_from_midnight()) % 86400;
|
|
||||||
if seconds_to_next == 0 {
|
|
||||||
seconds_to_next = 86400;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
let duration_to_next = Duration::from_secs(seconds_to_next as u64);
|
|
||||||
|
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"Next time: {}; Waiting for {}s",
|
"Next timestamp: {}; Waiting for {}s",
|
||||||
next_time,
|
next_timestamp,
|
||||||
duration_to_next.as_secs()
|
duration_to_next.as_secs()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -149,13 +137,12 @@ async fn check_weekday(
|
||||||
app_state: &Addr<AppState>,
|
app_state: &Addr<AppState>,
|
||||||
last_weekday: &mut Weekday,
|
last_weekday: &mut Weekday,
|
||||||
changed: &mut bool,
|
changed: &mut bool,
|
||||||
now: &EmgauwaNow,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
) -> Result<(), EmgauwaError> {
|
||||||
let current_weekday = now.weekday;
|
let current_weekday = emgauwa_common::utils::get_weekday();
|
||||||
if current_weekday.ne(last_weekday) {
|
if current_weekday.ne(last_weekday) {
|
||||||
log::debug!("Weekday changed");
|
log::debug!("Weekday changed");
|
||||||
*last_weekday = current_weekday;
|
*last_weekday = current_weekday;
|
||||||
AppState::trigger_reload(app_state).await?;
|
utils::app_state_reload(app_state).await?;
|
||||||
*changed = true;
|
*changed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
use chrono::NaiveTime;
|
|
||||||
use emgauwa_common::errors::EmgauwaError;
|
use emgauwa_common::errors::EmgauwaError;
|
||||||
use emgauwa_common::settings;
|
use emgauwa_common::settings;
|
||||||
use rppal_pfd::PiFaceDigital;
|
use rppal_pfd::PiFaceDigital;
|
||||||
use serde_derive::Deserialize;
|
use serde_derive::Deserialize;
|
||||||
|
|
||||||
use crate::drivers;
|
use crate::drivers;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
@ -28,7 +26,6 @@ pub struct Settings {
|
||||||
pub logging: settings::Logging,
|
pub logging: settings::Logging,
|
||||||
|
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub midnight: NaiveTime,
|
|
||||||
pub relays: Vec<Relay>,
|
pub relays: Vec<Relay>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +38,6 @@ impl Default for Settings {
|
||||||
logging: settings::Logging::default(),
|
logging: settings::Logging::default(),
|
||||||
|
|
||||||
name: String::from("Emgauwa Controller"),
|
name: String::from("Emgauwa Controller"),
|
||||||
midnight: NaiveTime::default(),
|
|
||||||
relays: Vec::new(),
|
relays: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,17 +69,13 @@ impl Settings {
|
||||||
&self,
|
&self,
|
||||||
pfd: &mut Option<PiFaceDigital>,
|
pfd: &mut Option<PiFaceDigital>,
|
||||||
initial_states: Vec<bool>,
|
initial_states: Vec<bool>,
|
||||||
) -> Result<Vec<Box<dyn drivers::RelayDriver>>, EmgauwaError> {
|
) -> Result<Vec<Box<dyn drivers::RelayDriver>>, EmgauwaError> {
|
||||||
let mut drivers = Vec::new();
|
let mut drivers = Vec::new();
|
||||||
let result: Result<(), EmgauwaError> =
|
let result: Result<(), EmgauwaError> = self.relays.iter().zip(initial_states.into_iter()).try_for_each(|(relay, state)| {
|
||||||
self.relays
|
let driver = relay.make_driver(pfd, state)?;
|
||||||
.iter()
|
drivers.push(driver);
|
||||||
.zip(initial_states)
|
Ok(())
|
||||||
.try_for_each(|(relay, state)| {
|
});
|
||||||
let driver = relay.make_driver(pfd, state)?;
|
|
||||||
drivers.push(driver);
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
result?;
|
result?;
|
||||||
Ok(drivers)
|
Ok(drivers)
|
||||||
}
|
}
|
||||||
|
|
66
src/utils.rs
Normal file
66
src/utils.rs
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use actix::Addr;
|
||||||
|
use emgauwa_common::errors::EmgauwaError;
|
||||||
|
use emgauwa_common::models::Controller;
|
||||||
|
use emgauwa_common::types::RelayStates;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
|
use crate::app_state;
|
||||||
|
use crate::app_state::AppState;
|
||||||
|
|
||||||
|
pub async fn app_state_get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
|
||||||
|
app_state
|
||||||
|
.send(app_state::GetThis {})
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn app_state_get_relay_notifier(
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
) -> Result<Arc<Notify>, EmgauwaError> {
|
||||||
|
app_state
|
||||||
|
.send(app_state::GetRelayNotifier {})
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn app_state_get_controller_notifier(
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
) -> Result<Arc<Notify>, EmgauwaError> {
|
||||||
|
app_state
|
||||||
|
.send(app_state::GetControllerNotifier {})
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn app_state_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
|
||||||
|
app_state
|
||||||
|
.send(app_state::Reload {})
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)?
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn app_state_update_relays_on(
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
relay_states: RelayStates,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
app_state
|
||||||
|
.send(app_state::UpdateRelayStates { relay_states })
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn app_state_relay_pulse(
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
relay_number: i64,
|
||||||
|
duration: Option<u32>,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
app_state
|
||||||
|
.send(app_state::RelayPulse {
|
||||||
|
relay_number,
|
||||||
|
duration,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(EmgauwaError::from)?
|
||||||
|
}
|
|
@ -1,180 +0,0 @@
|
||||||
use actix::Addr;
|
|
||||||
use sqlx::{Pool, Sqlite};
|
|
||||||
use sqlx::pool::PoolConnection;
|
|
||||||
use tokio_tungstenite::tungstenite;
|
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
|
||||||
|
|
||||||
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
|
|
||||||
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
|
|
||||||
use emgauwa_common::models::{Controller, Relay};
|
|
||||||
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
|
|
||||||
|
|
||||||
use crate::app_state::AppState;
|
|
||||||
use crate::app_state;
|
|
||||||
|
|
||||||
pub async fn handle_message(
|
|
||||||
pool: Pool<Sqlite>,
|
|
||||||
app_state: &Addr<AppState>,
|
|
||||||
message_result: Result<Message, tungstenite::Error>,
|
|
||||||
) {
|
|
||||||
let msg = match message_result {
|
|
||||||
Ok(msg) => msg,
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Error reading message: {}", err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Message::Text(text) = msg {
|
|
||||||
match serde_json::from_str(&text) {
|
|
||||||
Ok(action) => {
|
|
||||||
log::debug!("Received action: {:?}", action);
|
|
||||||
let mut pool_conn = match pool.acquire().await {
|
|
||||||
Ok(conn) => conn,
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Failed to acquire database connection: {:?}", err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let action_res = handle_action(&mut pool_conn, app_state, action).await;
|
|
||||||
if let Err(e) = action_res {
|
|
||||||
log::error!("Error handling action: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::error!("Error deserializing action: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle_action(
|
|
||||||
conn: &mut PoolConnection<Sqlite>,
|
|
||||||
app_state: &Addr<AppState>,
|
|
||||||
action: ControllerWsAction,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
|
||||||
let this = AppState::get_this(app_state).await?;
|
|
||||||
|
|
||||||
match action {
|
|
||||||
ControllerWsAction::Controller(controller) => {
|
|
||||||
handle_controller(conn, &this, controller).await?
|
|
||||||
}
|
|
||||||
ControllerWsAction::Relays(relays) => handle_relays(conn, app_state, &this, relays).await?,
|
|
||||||
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
|
|
||||||
ControllerWsAction::RelayPulse((relay_num, duration)) => {
|
|
||||||
handle_relay_pulse(app_state, relay_num, duration).await?
|
|
||||||
}
|
|
||||||
_ => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
AppState::trigger_reload(app_state).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_controller(
|
|
||||||
conn: &mut PoolConnection<Sqlite>,
|
|
||||||
this: &Controller,
|
|
||||||
controller: Controller,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
|
||||||
if controller.c.uid != this.c.uid {
|
|
||||||
return Err(EmgauwaError::Other(String::from(
|
|
||||||
"Controller UID mismatch during update",
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
DbController::get_by_uid(conn, &controller.c.uid)
|
|
||||||
.await?
|
|
||||||
.ok_or(DatabaseError::NotFound)?
|
|
||||||
.update(conn, controller.c.name.as_str(), this.c.relay_count)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_schedules(
|
|
||||||
conn: &mut PoolConnection<Sqlite>,
|
|
||||||
schedules: Vec<DbSchedule>,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
|
||||||
let mut handled_uids = vec![
|
|
||||||
// on and off schedules are always present and should not be updated
|
|
||||||
ScheduleUid::On,
|
|
||||||
ScheduleUid::Off,
|
|
||||||
];
|
|
||||||
for schedule in schedules {
|
|
||||||
if handled_uids.contains(&schedule.uid) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
handled_uids.push(schedule.uid.clone());
|
|
||||||
|
|
||||||
log::debug!("Handling schedule: {:?}", schedule);
|
|
||||||
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
|
|
||||||
|
|
||||||
if let Some(schedule_db) = schedule_db {
|
|
||||||
schedule_db
|
|
||||||
.update(conn, schedule.name.as_str(), &schedule.periods)
|
|
||||||
.await?;
|
|
||||||
} else {
|
|
||||||
DbSchedule::create(
|
|
||||||
conn,
|
|
||||||
schedule.uid.clone(),
|
|
||||||
schedule.name.as_str(),
|
|
||||||
&schedule.periods,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_relays(
|
|
||||||
conn: &mut PoolConnection<Sqlite>,
|
|
||||||
app_state: &Addr<AppState>,
|
|
||||||
this: &Controller,
|
|
||||||
relays: Vec<Relay>,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
|
||||||
for relay in relays {
|
|
||||||
if relay.r.controller_uid != this.c.uid {
|
|
||||||
return Err(EmgauwaError::Other(String::from(
|
|
||||||
"Controller UID mismatch during relay update",
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
|
|
||||||
.await?
|
|
||||||
.ok_or(DatabaseError::NotFound)?;
|
|
||||||
|
|
||||||
db_relay.update(conn, relay.r.name.as_str()).await?;
|
|
||||||
|
|
||||||
handle_schedules(conn, relay.schedules.clone()).await?;
|
|
||||||
|
|
||||||
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
|
|
||||||
for schedule in &relay.schedules {
|
|
||||||
schedules.push(
|
|
||||||
DbSchedule::get_by_uid(conn, &schedule.uid)
|
|
||||||
.await?
|
|
||||||
.ok_or(DatabaseError::NotFound)?,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
app_state
|
|
||||||
.send(app_state::RelayOverrideSchedule {
|
|
||||||
relay_number: relay.r.number,
|
|
||||||
schedule: relay.override_schedule.clone()
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_relay_pulse(
|
|
||||||
app_state: &Addr<AppState>,
|
|
||||||
relay_number: i64,
|
|
||||||
duration: Option<u32>,
|
|
||||||
) -> Result<(), EmgauwaError> {
|
|
||||||
app_state
|
|
||||||
.send(app_state::RelayPulse {
|
|
||||||
relay_number,
|
|
||||||
duration,
|
|
||||||
})
|
|
||||||
.await?
|
|
||||||
}
|
|
178
src/ws/mod.rs
178
src/ws/mod.rs
|
@ -1,18 +1,19 @@
|
||||||
mod handlers;
|
|
||||||
|
|
||||||
use actix::Addr;
|
use actix::Addr;
|
||||||
use emgauwa_common::constants::WEBSOCKET_RETRY_TIMEOUT;
|
use emgauwa_common::constants::WEBSOCKET_RETRY_TIMEOUT;
|
||||||
use emgauwa_common::errors::EmgauwaError;
|
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
|
||||||
use emgauwa_common::types::ControllerWsAction;
|
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
|
||||||
|
use emgauwa_common::models::{Controller, Relay};
|
||||||
|
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
|
||||||
use futures::{future, pin_mut, SinkExt, StreamExt};
|
use futures::{future, pin_mut, SinkExt, StreamExt};
|
||||||
|
use sqlx::pool::PoolConnection;
|
||||||
use sqlx::{Pool, Sqlite};
|
use sqlx::{Pool, Sqlite};
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio_tungstenite::connect_async;
|
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
use crate::app_state;
|
use tokio_tungstenite::{connect_async, tungstenite};
|
||||||
|
|
||||||
use crate::app_state::AppState;
|
use crate::app_state::AppState;
|
||||||
use crate::ws::handlers::handle_message;
|
use crate::utils;
|
||||||
|
use crate::utils::{app_state_get_relay_notifier, app_state_get_this};
|
||||||
|
|
||||||
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
|
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
|
||||||
log::debug!("Spawned ws loop");
|
log::debug!("Spawned ws loop");
|
||||||
|
@ -43,7 +44,7 @@ async fn run_websocket(
|
||||||
|
|
||||||
let (mut write, read) = ws_stream.split();
|
let (mut write, read) = ws_stream.split();
|
||||||
|
|
||||||
let ws_action = ControllerWsAction::Register(AppState::get_this(app_state).await?);
|
let ws_action = ControllerWsAction::Register(app_state_get_this(app_state).await?);
|
||||||
|
|
||||||
let ws_action_json = serde_json::to_string(&ws_action)?;
|
let ws_action_json = serde_json::to_string(&ws_action)?;
|
||||||
if let Err(err) = write.send(Message::text(ws_action_json)).await {
|
if let Err(err) = write.send(Message::text(ws_action_json)).await {
|
||||||
|
@ -73,14 +74,11 @@ async fn read_app_state(
|
||||||
app_state: Addr<AppState>,
|
app_state: Addr<AppState>,
|
||||||
tx: futures_channel::mpsc::UnboundedSender<Message>,
|
tx: futures_channel::mpsc::UnboundedSender<Message>,
|
||||||
) -> Result<(), EmgauwaError> {
|
) -> Result<(), EmgauwaError> {
|
||||||
let notifier = &*app_state
|
let notifier = &*app_state_get_relay_notifier(&app_state).await?;
|
||||||
.send(app_state::GetRelayNotifier {})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
notifier.notified().await;
|
notifier.notified().await;
|
||||||
log::debug!("Relay change detected");
|
log::debug!("Relay change detected");
|
||||||
let this = AppState::get_this(&app_state).await?;
|
let this = app_state_get_this(&app_state).await?;
|
||||||
let relay_states = this.get_relay_states();
|
let relay_states = this.get_relay_states();
|
||||||
let ws_action = ControllerWsAction::RelayStates((this.c.uid, relay_states));
|
let ws_action = ControllerWsAction::RelayStates((this.c.uid, relay_states));
|
||||||
|
|
||||||
|
@ -93,3 +91,157 @@ async fn read_app_state(
|
||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_message(
|
||||||
|
pool: Pool<Sqlite>,
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
message_result: Result<Message, tungstenite::Error>,
|
||||||
|
) {
|
||||||
|
let msg = match message_result {
|
||||||
|
Ok(msg) => msg,
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Error reading message: {}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Message::Text(text) = msg {
|
||||||
|
match serde_json::from_str(&text) {
|
||||||
|
Ok(action) => {
|
||||||
|
log::debug!("Received action: {:?}", action);
|
||||||
|
let mut pool_conn = match pool.acquire().await {
|
||||||
|
Ok(conn) => conn,
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Failed to acquire database connection: {:?}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let action_res = handle_action(&mut pool_conn, app_state, action).await;
|
||||||
|
if let Err(e) = action_res {
|
||||||
|
log::error!("Error handling action: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Error deserializing action: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_action(
|
||||||
|
conn: &mut PoolConnection<Sqlite>,
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
action: ControllerWsAction,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
let this = app_state_get_this(app_state).await?;
|
||||||
|
|
||||||
|
match action {
|
||||||
|
ControllerWsAction::Controller(controller) => {
|
||||||
|
handle_controller(conn, &this, controller).await?
|
||||||
|
}
|
||||||
|
ControllerWsAction::Relays(relays) => handle_relays(conn, &this, relays).await?,
|
||||||
|
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
|
||||||
|
ControllerWsAction::RelayPulse((relay_num, duration)) => {
|
||||||
|
handle_relay_pulse(app_state, relay_num, duration).await?
|
||||||
|
}
|
||||||
|
_ => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
utils::app_state_reload(app_state).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_controller(
|
||||||
|
conn: &mut PoolConnection<Sqlite>,
|
||||||
|
this: &Controller,
|
||||||
|
controller: Controller,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
if controller.c.uid != this.c.uid {
|
||||||
|
return Err(EmgauwaError::Other(String::from(
|
||||||
|
"Controller UID mismatch during update",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
DbController::get_by_uid(conn, &controller.c.uid)
|
||||||
|
.await?
|
||||||
|
.ok_or(DatabaseError::NotFound)?
|
||||||
|
.update(conn, controller.c.name.as_str(), this.c.relay_count)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_schedules(
|
||||||
|
conn: &mut PoolConnection<Sqlite>,
|
||||||
|
schedules: Vec<DbSchedule>,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
let mut handled_uids = vec![
|
||||||
|
// on and off schedules are always present and should not be updated
|
||||||
|
ScheduleUid::On,
|
||||||
|
ScheduleUid::Off,
|
||||||
|
];
|
||||||
|
for schedule in schedules {
|
||||||
|
if handled_uids.contains(&schedule.uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
handled_uids.push(schedule.uid.clone());
|
||||||
|
|
||||||
|
log::debug!("Handling schedule: {:?}", schedule);
|
||||||
|
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
|
||||||
|
|
||||||
|
if let Some(schedule_db) = schedule_db {
|
||||||
|
schedule_db
|
||||||
|
.update(conn, schedule.name.as_str(), &schedule.periods)
|
||||||
|
.await?;
|
||||||
|
} else {
|
||||||
|
DbSchedule::create(
|
||||||
|
conn,
|
||||||
|
schedule.uid.clone(),
|
||||||
|
schedule.name.as_str(),
|
||||||
|
&schedule.periods,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_relays(
|
||||||
|
conn: &mut PoolConnection<Sqlite>,
|
||||||
|
this: &Controller,
|
||||||
|
relays: Vec<Relay>,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
for relay in relays {
|
||||||
|
if relay.controller.uid != this.c.uid {
|
||||||
|
return Err(EmgauwaError::Other(String::from(
|
||||||
|
"Controller UID mismatch during relay update",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
|
||||||
|
.await?
|
||||||
|
.ok_or(DatabaseError::NotFound)?;
|
||||||
|
|
||||||
|
db_relay.update(conn, relay.r.name.as_str()).await?;
|
||||||
|
|
||||||
|
handle_schedules(conn, relay.schedules.clone()).await?;
|
||||||
|
|
||||||
|
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
|
||||||
|
for schedule in relay.schedules {
|
||||||
|
schedules.push(
|
||||||
|
DbSchedule::get_by_uid(conn, &schedule.uid)
|
||||||
|
.await?
|
||||||
|
.ok_or(DatabaseError::NotFound)?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_relay_pulse(
|
||||||
|
app_state: &Addr<AppState>,
|
||||||
|
relay_num: i64,
|
||||||
|
duration: Option<u32>,
|
||||||
|
) -> Result<(), EmgauwaError> {
|
||||||
|
utils::app_state_relay_pulse(app_state, relay_num, duration).await
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue