use std::time::Duration; use actix::Addr; use chrono::Timelike; use emgauwa_common::constants::RELAYS_RETRY_TIMEOUT; use emgauwa_common::errors::EmgauwaError; use emgauwa_common::models::Controller; use emgauwa_common::types::{EmgauwaNow, RelayState, RelayStates, Weekday}; use emgauwa_common::utils::printable_relay_states; use futures::pin_mut; use tokio::time; use tokio::time::timeout; use utils::app_state_get_controller_notifier; use crate::app_state::AppState; use crate::utils; pub async fn run_relays_loop(app_state: Addr<AppState>) { log::debug!("Spawned relays loop"); loop { let run_result = run_relays(&app_state).await; if let Err(err) = run_result { log::error!("Error running relays: {}", err); } time::sleep(RELAYS_RETRY_TIMEOUT).await; } } async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> { let notifier = &*app_state_get_controller_notifier(app_state).await?; let mut last_weekday = emgauwa_common::utils::get_weekday(); let mut this = utils::app_state_get_this(app_state).await?; let mut relay_states: RelayStates = Vec::new(); let now = EmgauwaNow::now(); init_relay_states(&mut relay_states, &this); calc_relay_states(&mut relay_states, &mut this, app_state, &now).await?; let mut duration_override = None; loop { let now = EmgauwaNow::now(); log::debug!( "Relay loop at {}: {}", now.time, printable_relay_states(&this.get_relay_states()) ); let notifier_future = notifier.notified(); pin_mut!(notifier_future); let mut changed = timeout( get_next_duration(&mut this, &mut duration_override, &now), &mut notifier_future, ) .await .is_ok(); check_weekday(app_state, &mut last_weekday, &mut changed, &now).await?; if changed { log::debug!("Reloading controller in relay loop"); this = utils::app_state_get_this(app_state).await?; } duration_override = this .relays .iter_mut() .filter_map(|relay| match relay.check_pulsing(&now.instant) { None => None, Some(pulse) => { let dur = pulse - now.instant; log::debug!( "Pulsing relay {} for {}s until {:?} ", relay.r.number, dur.as_secs(), pulse ); Some(dur) } }) .min(); calc_relay_states(&mut relay_states, &mut this, app_state, &now).await?; } } fn init_relay_states(relay_states: &mut RelayStates, this: &Controller) { relay_states.clear(); this.relays.iter().for_each(|r| { relay_states.push(RelayState { active_schedule: r.active_schedule.clone(), is_on: None }); }); } async fn calc_relay_states( relay_states: &mut RelayStates, this: &mut Controller, app_state: &Addr<AppState>, now: &EmgauwaNow, ) -> Result<(), EmgauwaError> { this.relays .iter_mut() .zip(relay_states.iter_mut()) .for_each(|(relay, state)| { relay.reload_active_schedule(now.weekday); relay.is_on = Some( relay.active_schedule.is_on(&now.time) || relay.check_pulsing(&now.instant).is_some(), ); state.is_on = relay.is_on; state.active_schedule = relay.active_schedule.clone(); }); utils::app_state_update_relays_on(app_state, relay_states.clone()).await } fn get_next_duration( this: &mut Controller, duration_override: &mut Option<Duration>, now: &EmgauwaNow, ) -> Duration { if let Some(duration) = duration_override { log::debug!("Duration override. Waiting for {}s", duration.as_secs()); return *duration; } let next_timestamp = this .check_next_time(now) .map_or(86400, |t| t.num_seconds_from_midnight()); let duration_to_next = Duration::from_secs((next_timestamp - now.time_in_s()) as u64); log::debug!( "Next timestamp: {}; Waiting for {}s", next_timestamp, duration_to_next.as_secs() ); duration_to_next } async fn check_weekday( app_state: &Addr<AppState>, last_weekday: &mut Weekday, changed: &mut bool, now: &EmgauwaNow, ) -> Result<(), EmgauwaError> { let current_weekday = now.weekday; if current_weekday.ne(last_weekday) { log::debug!("Weekday changed"); *last_weekday = current_weekday; utils::app_state_reload(app_state).await?; *changed = true; } Ok(()) }