diff --git a/emgauwa-controller/src/app_state.rs b/emgauwa-controller/src/app_state.rs index a1d31e9..8eb56fb 100644 --- a/emgauwa-controller/src/app_state.rs +++ b/emgauwa-controller/src/app_state.rs @@ -1,6 +1,8 @@ use std::sync::Arc; +use std::time::{Duration, Instant}; use actix::{Actor, Context, Handler, Message}; +use emgauwa_lib::constants; use emgauwa_lib::errors::EmgauwaError; use emgauwa_lib::models::Controller; use emgauwa_lib::types::RelayStates; @@ -8,6 +10,8 @@ use futures::executor::block_on; use sqlx::{Pool, Sqlite}; use tokio::sync::Notify; +use crate::settings::Settings; + #[derive(Message)] #[rtype(result = "Result<(), EmgauwaError>")] pub struct Reload {} @@ -18,6 +22,13 @@ pub struct UpdateRelayStates { pub relay_states: RelayStates, } +#[derive(Message)] +#[rtype(result = "Result<(), EmgauwaError>")] +pub struct RelayPulse { + pub relay_number: i64, + pub duration: Option, +} + #[derive(Message)] #[rtype(result = "Controller")] pub struct GetThis {} @@ -33,15 +44,17 @@ pub struct GetRelayNotifier {} pub struct AppState { pub pool: Pool, pub this: Controller, + pub settings: Settings, pub controller_notifier: Arc, pub relay_notifier: Arc, } impl AppState { - pub fn new(pool: Pool, this: Controller) -> AppState { + pub fn new(pool: Pool, this: Controller, settings: Settings) -> AppState { AppState { pool, this, + settings, controller_notifier: Arc::new(Notify::new()), relay_notifier: Arc::new(Notify::new()), } @@ -85,6 +98,40 @@ impl Handler for AppState { } } +impl Handler for AppState { + type Result = Result<(), EmgauwaError>; + + fn handle(&mut self, msg: RelayPulse, _ctx: &mut Self::Context) -> Self::Result { + let relay_num = msg.relay_number; + + let duration = Duration::from_secs( + match msg.duration { + None => { + self.settings + .get_relay(relay_num) + .ok_or(EmgauwaError::Other(String::from( + "Relay not found in settings", + )))? + .pulse + } + Some(dur) => Some(dur as u64), + } + .unwrap_or(constants::RELAY_PULSE_DURATION), + ); + let now = Instant::now(); + let until = now + duration; + + self.this.relay_pulse(relay_num, until)?; + log::debug!( + "Pulsing relay {} for {} seconds until {:?}", + relay_num, + duration.as_secs(), + until + ); + Ok(()) + } +} + impl Handler for AppState { type Result = Controller; diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index 119b69c..e6c329f 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -104,13 +104,12 @@ async fn main() -> Result<(), std::io::Error> { let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?; - let app_state = app_state::AppState::new(pool.clone(), this).start(); - let url = format!( "ws://{}:{}/api/v1/ws/controllers", settings.server.host, settings.server.port ); + let app_state = app_state::AppState::new(pool.clone(), this, settings).start(); let _ = tokio::join!( tokio::spawn(run_relays_loop(app_state.clone())), diff --git a/emgauwa-controller/src/relay_loop.rs b/emgauwa-controller/src/relay_loop.rs index 2f40aa1..2bf2bb5 100644 --- a/emgauwa-controller/src/relay_loop.rs +++ b/emgauwa-controller/src/relay_loop.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use actix::Addr; use chrono::{Local, Timelike}; @@ -35,6 +35,8 @@ async fn run_relays(app_state: &Addr) -> Result<(), EmgauwaError> { init_relay_states(&mut relay_states, &this); calc_relay_states(&mut relay_states, &mut this, app_state).await?; + let mut duration_override = None; + loop { log::debug!( "Relay loop at {}: {}", @@ -44,9 +46,12 @@ async fn run_relays(app_state: &Addr) -> Result<(), EmgauwaError> { let notifier_future = notifier.notified(); pin_mut!(notifier_future); - let mut changed = timeout(get_next_duration(&this), &mut notifier_future) - .await - .is_ok(); + let mut changed = timeout( + get_next_duration(&this, &mut duration_override), + &mut notifier_future, + ) + .await + .is_ok(); check_weekday(app_state, &mut last_weekday, &mut changed).await?; @@ -55,6 +60,25 @@ async fn run_relays(app_state: &Addr) -> Result<(), EmgauwaError> { this = utils::app_state_get_this(app_state).await?; } + let now_pulse = Instant::now(); + duration_override = this + .relays + .iter_mut() + .filter_map(|relay| match relay.check_pulsing(&now_pulse) { + None => None, + Some(pulse) => { + let dur = pulse - now_pulse; + log::debug!( + "Pulsing relay {} for {}s until {:?} ", + relay.r.number, + dur.as_secs(), + pulse + ); + Some(dur) + } + }) + .min(); + calc_relay_states(&mut relay_states, &mut this, app_state).await?; } } @@ -72,18 +96,26 @@ async fn calc_relay_states( app_state: &Addr, ) -> Result<(), EmgauwaError> { let now = Local::now().time(); + let now_pulse = Instant::now(); this.relays .iter_mut() .zip(relay_states.iter_mut()) .for_each(|(relay, state)| { - relay.is_on = Some(relay.active_schedule.is_on(&now)); + relay.is_on = Some( + relay.active_schedule.is_on(&now) || relay.check_pulsing(&now_pulse).is_some(), + ); *state = relay.is_on; }); utils::app_state_update_relays_on(app_state, relay_states.clone()).await } -fn get_next_duration(this: &Controller) -> Duration { +fn get_next_duration(this: &Controller, duration_override: &mut Option) -> Duration { + if let Some(duration) = duration_override { + log::debug!("Duration override. Waiting for {}s", duration.as_secs()); + return *duration; + } + let now = Local::now().time(); let now_in_s = now.num_seconds_from_midnight(); let next_timestamp = this diff --git a/emgauwa-controller/src/settings.rs b/emgauwa-controller/src/settings.rs index 7330c38..0c4572e 100644 --- a/emgauwa-controller/src/settings.rs +++ b/emgauwa-controller/src/settings.rs @@ -13,6 +13,7 @@ pub struct Relay { pub number: Option, pub pin: u8, pub inverted: bool, + pub pulse: Option, } #[derive(Clone, Debug, Deserialize)] @@ -50,6 +51,7 @@ impl Default for Relay { name: String::from("Relay"), pin: 0, inverted: false, + pulse: None, } } } @@ -65,3 +67,9 @@ pub fn init() -> Result { Ok(settings) } + +impl Settings { + pub fn get_relay(&self, number: i64) -> Option<&Relay> { + self.relays.iter().find(|r| r.number == Some(number)) + } +} diff --git a/emgauwa-controller/src/utils.rs b/emgauwa-controller/src/utils.rs index f09af81..674a561 100644 --- a/emgauwa-controller/src/utils.rs +++ b/emgauwa-controller/src/utils.rs @@ -50,3 +50,17 @@ pub async fn app_state_update_relays_on( .await .map_err(EmgauwaError::from) } + +pub async fn app_state_relay_pulse( + app_state: &Addr, + relay_number: i64, + duration: Option, +) -> Result<(), EmgauwaError> { + app_state + .send(app_state::RelayPulse { + relay_number, + duration, + }) + .await + .map_err(EmgauwaError::from)? +} diff --git a/emgauwa-controller/src/ws/mod.rs b/emgauwa-controller/src/ws/mod.rs index 559dc2e..83f91b0 100644 --- a/emgauwa-controller/src/ws/mod.rs +++ b/emgauwa-controller/src/ws/mod.rs @@ -140,6 +140,9 @@ pub async fn handle_action( } ControllerWsAction::Relays(relays) => handle_relays(conn, &this, relays).await?, ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?, + ControllerWsAction::RelayPulse((relay_num, duration)) => { + handle_relay_pulse(app_state, relay_num, duration).await? + } _ => return Ok(()), }; @@ -234,3 +237,11 @@ async fn handle_relays( Ok(()) } + +async fn handle_relay_pulse( + app_state: &Addr, + relay_num: i64, + duration: Option, +) -> Result<(), EmgauwaError> { + utils::app_state_relay_pulse(app_state, relay_num, duration).await +} diff --git a/emgauwa-core/src/handlers/v1/relays.rs b/emgauwa-core/src/handlers/v1/relays.rs index 93b0db3..dd7a3d8 100644 --- a/emgauwa-core/src/handlers/v1/relays.rs +++ b/emgauwa-core/src/handlers/v1/relays.rs @@ -1,9 +1,11 @@ use actix::Addr; -use actix_web::{get, put, web, HttpResponse}; +use actix_web::{get, post, put, web, HttpResponse}; use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbTag}; use emgauwa_lib::errors::{DatabaseError, EmgauwaError}; use emgauwa_lib::models::{convert_db_list, FromDbModel, Relay}; -use emgauwa_lib::types::{ControllerUid, ControllerWsAction, RequestRelayUpdate}; +use emgauwa_lib::types::{ + ControllerUid, ControllerWsAction, RequestRelayPulse, RequestRelayUpdate, +}; use emgauwa_lib::utils; use sqlx::{Pool, Sqlite}; @@ -149,3 +151,35 @@ pub async fn update_for_controller( Ok(HttpResponse::Ok().json(return_relay)) } + +#[post("/controllers/{controller_id}/relays/{relay_num}/pulse")] +pub async fn pulse( + pool: web::Data>, + app_state: web::Data>, + path: web::Path<(String, i64)>, + data: web::Json, +) -> Result { + let mut pool_conn = pool.acquire().await?; + + let (controller_uid, relay_num) = path.into_inner(); + let uid = ControllerUid::try_from(controller_uid.as_str())?; + + let controller = DbController::get_by_uid(&mut pool_conn, &uid) + .await? + .ok_or(DatabaseError::NotFound)?; + + let relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num) + .await? + .ok_or(DatabaseError::NotFound)?; + + let duration = data.duration.filter(|&d| d > 0); + + app_state + .send(app_state::Action { + controller_uid: uid, + action: ControllerWsAction::RelayPulse((relay.number, duration)), + }) + .await??; + + Ok(HttpResponse::Ok().finish()) // TODO add a message? +} diff --git a/emgauwa-core/src/main.rs b/emgauwa-core/src/main.rs index 5fca1e8..cc1bf87 100644 --- a/emgauwa-core/src/main.rs +++ b/emgauwa-core/src/main.rs @@ -96,6 +96,7 @@ async fn main() -> Result<(), std::io::Error> { .service(handlers::v1::relays::index_for_controller) .service(handlers::v1::relays::show_for_controller) .service(handlers::v1::relays::update_for_controller) + .service(handlers::v1::relays::pulse) .service(handlers::v1::schedules::index) .service(handlers::v1::schedules::tagged) .service(handlers::v1::schedules::show) diff --git a/emgauwa-lib/src/constants.rs b/emgauwa-lib/src/constants.rs index b4e93cb..92874a2 100644 --- a/emgauwa-lib/src/constants.rs +++ b/emgauwa-lib/src/constants.rs @@ -6,3 +6,5 @@ pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15); pub const WEBSOCKET_RETRY_TIMEOUT: Duration = Duration::from_secs(5); pub const RELAYS_RETRY_TIMEOUT: Duration = Duration::from_secs(5); + +pub const RELAY_PULSE_DURATION: u64 = 3; diff --git a/emgauwa-lib/src/db/model_utils.rs b/emgauwa-lib/src/db/model_utils.rs index 9420e4d..4116365 100644 --- a/emgauwa-lib/src/db/model_utils.rs +++ b/emgauwa-lib/src/db/model_utils.rs @@ -65,13 +65,13 @@ impl Period { let end_after_now = self.end.gt(now); let start_before_end = self.start.lt(&self.end); - return match (start_after_now, end_after_now, start_before_end) { + match (start_after_now, end_after_now, start_before_end) { (false, false, _) => None, // both before now (true, false, _) => Some(self.start), // only start after now (false, true, _) => Some(self.end), // only end after now (true, true, true) => Some(self.start), // both after now but start first (true, true, false) => Some(self.end), // both after now but end first - }; + } } } diff --git a/emgauwa-lib/src/models/controller.rs b/emgauwa-lib/src/models/controller.rs index e7983d7..96d7b00 100644 --- a/emgauwa-lib/src/models/controller.rs +++ b/emgauwa-lib/src/models/controller.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use actix::MessageResponse; use chrono::NaiveTime; use futures::executor::block_on; @@ -70,4 +72,15 @@ impl Controller { .filter_map(|r| r.active_schedule.get_next_time(now)) .min() } + + pub fn relay_pulse(&mut self, relay_num: i64, until: Instant) -> Result<(), EmgauwaError> { + let relay = self + .relays + .iter_mut() + .find(|r| r.r.number == relay_num) + .ok_or(EmgauwaError::Other(String::from("Relay not found")))?; + + relay.pulsing = Some(until); + Ok(()) + } } diff --git a/emgauwa-lib/src/models/relay.rs b/emgauwa-lib/src/models/relay.rs index 28199e9..2be2137 100644 --- a/emgauwa-lib/src/models/relay.rs +++ b/emgauwa-lib/src/models/relay.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use chrono::NaiveTime; use futures::executor::block_on; use serde_derive::{Deserialize, Serialize}; @@ -19,6 +21,10 @@ pub struct Relay { pub active_schedule: DbSchedule, pub is_on: Option, pub tags: Vec, + + // for internal use only. + #[serde(skip)] + pub pulsing: Option, } @@ -55,6 +61,7 @@ impl FromDbModel for Relay { active_schedule, is_on, tags, + pulsing: None, }) } } @@ -83,4 +90,18 @@ impl Relay { pub fn get_next_time(&self, now: &NaiveTime) -> Option { self.active_schedule.get_next_time(now) } + + pub fn check_pulsing(&mut self, now: &Instant) -> Option { + match self.pulsing { + Some(dur_instant) => { + if dur_instant.lt(now) { + self.pulsing = None; + None + } else { + Some(dur_instant) + } + } + None => None, + } + } } diff --git a/emgauwa-lib/src/types/mod.rs b/emgauwa-lib/src/types/mod.rs index f131e16..be2942b 100644 --- a/emgauwa-lib/src/types/mod.rs +++ b/emgauwa-lib/src/types/mod.rs @@ -25,4 +25,5 @@ pub enum ControllerWsAction { Relays(Vec), Controller(Controller), RelayStates((ControllerUid, RelayStates)), + RelayPulse((i64, Option)), } diff --git a/emgauwa-lib/src/types/request.rs b/emgauwa-lib/src/types/request.rs index c4d20e7..e1cfced 100644 --- a/emgauwa-lib/src/types/request.rs +++ b/emgauwa-lib/src/types/request.rs @@ -28,6 +28,11 @@ pub struct RequestRelayUpdate { pub tags: Option>, } +#[derive(Debug, Serialize, Deserialize)] +pub struct RequestRelayPulse { + pub duration: Option, +} + #[derive(Debug, Serialize, Deserialize)] pub struct RequestScheduleId { pub id: ScheduleUid,