Add app_state reload util and add handler for weekday change
This commit is contained in:
		
							parent
							
								
									97d9222a39
								
							
						
					
					
						commit
						6414083af0
					
				
					 5 changed files with 51 additions and 38 deletions
				
			
		| 
						 | 
					@ -27,13 +27,22 @@ async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
 | 
				
			||||||
	let default_duration = Duration::new(10, 0);
 | 
						let default_duration = Duration::new(10, 0);
 | 
				
			||||||
	let notifier = &*app_state_get_notifier(app_state).await?;
 | 
						let notifier = &*app_state_get_notifier(app_state).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						let mut last_weekday = emgauwa_lib::utils::get_weekday();
 | 
				
			||||||
	let mut this = utils::app_state_get_this(app_state).await?;
 | 
						let mut this = utils::app_state_get_this(app_state).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	loop {
 | 
						loop {
 | 
				
			||||||
		let notifier_future = notifier.notified();
 | 
							let notifier_future = notifier.notified();
 | 
				
			||||||
		pin_mut!(notifier_future);
 | 
							pin_mut!(notifier_future);
 | 
				
			||||||
		let timeout_result = timeout(default_duration, &mut notifier_future).await;
 | 
							let timeout_result = timeout(default_duration, &mut notifier_future).await;
 | 
				
			||||||
		let changed = timeout_result.is_ok();
 | 
							let mut changed = timeout_result.is_ok();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							let current_weekday = emgauwa_lib::utils::get_weekday();
 | 
				
			||||||
 | 
							if current_weekday != last_weekday {
 | 
				
			||||||
 | 
								log::debug!("Weekday changed");
 | 
				
			||||||
 | 
								last_weekday = current_weekday;
 | 
				
			||||||
 | 
								utils::app_state_reload(app_state).await?;
 | 
				
			||||||
 | 
								changed = true;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if changed {
 | 
							if changed {
 | 
				
			||||||
			log::debug!("Reloading controller in relay loop");
 | 
								log::debug!("Reloading controller in relay loop");
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -23,3 +23,10 @@ pub async fn app_state_get_notifier(
 | 
				
			||||||
		.await
 | 
							.await
 | 
				
			||||||
		.map_err(EmgauwaError::from)
 | 
							.map_err(EmgauwaError::from)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub async fn app_state_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
 | 
				
			||||||
 | 
						app_state
 | 
				
			||||||
 | 
							.send(app_state::Reload {})
 | 
				
			||||||
 | 
							.await
 | 
				
			||||||
 | 
							.map_err(EmgauwaError::from)?
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -11,9 +11,9 @@ use tokio::time;
 | 
				
			||||||
use tokio_tungstenite::tungstenite::Message;
 | 
					use tokio_tungstenite::tungstenite::Message;
 | 
				
			||||||
use tokio_tungstenite::{connect_async, tungstenite};
 | 
					use tokio_tungstenite::{connect_async, tungstenite};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::app_state;
 | 
					 | 
				
			||||||
use crate::app_state::AppState;
 | 
					use crate::app_state::AppState;
 | 
				
			||||||
use crate::utils::app_state_get_this;
 | 
					use crate::utils::app_state_get_this;
 | 
				
			||||||
 | 
					use crate::{app_state, utils};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
 | 
					pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
 | 
				
			||||||
	log::debug!("Spawned ws loop");
 | 
						log::debug!("Spawned ws loop");
 | 
				
			||||||
| 
						 | 
					@ -109,19 +109,18 @@ pub async fn handle_action(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	match action {
 | 
						match action {
 | 
				
			||||||
		ControllerWsAction::Controller(controller) => {
 | 
							ControllerWsAction::Controller(controller) => {
 | 
				
			||||||
			handle_controller(conn, app_state, &this, controller).await
 | 
								handle_controller(conn, &this, controller).await?
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		ControllerWsAction::Relays(relays) => handle_relays(conn, app_state, &this, relays).await,
 | 
					 | 
				
			||||||
		ControllerWsAction::Schedules(schedules) => {
 | 
					 | 
				
			||||||
			handle_schedules(conn, app_state, schedules).await
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		_ => Ok(()),
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							ControllerWsAction::Relays(relays) => handle_relays(conn, &this, relays).await?,
 | 
				
			||||||
 | 
							ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
 | 
				
			||||||
 | 
							_ => return Ok(()),
 | 
				
			||||||
 | 
						};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						utils::app_state_reload(app_state).await
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn handle_controller(
 | 
					async fn handle_controller(
 | 
				
			||||||
	conn: &mut PoolConnection<Sqlite>,
 | 
						conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
	app_state: &Addr<AppState>,
 | 
					 | 
				
			||||||
	this: &Controller,
 | 
						this: &Controller,
 | 
				
			||||||
	controller: Controller,
 | 
						controller: Controller,
 | 
				
			||||||
) -> Result<(), EmgauwaError> {
 | 
					) -> Result<(), EmgauwaError> {
 | 
				
			||||||
| 
						 | 
					@ -136,14 +135,11 @@ async fn handle_controller(
 | 
				
			||||||
		.update(conn, controller.c.name.as_str(), this.c.relay_count)
 | 
							.update(conn, controller.c.name.as_str(), this.c.relay_count)
 | 
				
			||||||
		.await?;
 | 
							.await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	app_state.send(app_state::Reload {}).await??;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	Ok(())
 | 
						Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn handle_schedules(
 | 
					async fn handle_schedules(
 | 
				
			||||||
	conn: &mut PoolConnection<Sqlite>,
 | 
						conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
	app_state: &Addr<AppState>,
 | 
					 | 
				
			||||||
	schedules: Vec<DbSchedule>,
 | 
						schedules: Vec<DbSchedule>,
 | 
				
			||||||
) -> Result<(), EmgauwaError> {
 | 
					) -> Result<(), EmgauwaError> {
 | 
				
			||||||
	let mut handled_uids = vec![
 | 
						let mut handled_uids = vec![
 | 
				
			||||||
| 
						 | 
					@ -175,14 +171,11 @@ async fn handle_schedules(
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	app_state.send(app_state::Reload {}).await??;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	Ok(())
 | 
						Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn handle_relays(
 | 
					async fn handle_relays(
 | 
				
			||||||
	conn: &mut PoolConnection<Sqlite>,
 | 
						conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
	app_state: &Addr<AppState>,
 | 
					 | 
				
			||||||
	this: &Controller,
 | 
						this: &Controller,
 | 
				
			||||||
	relays: Vec<Relay>,
 | 
						relays: Vec<Relay>,
 | 
				
			||||||
) -> Result<(), EmgauwaError> {
 | 
					) -> Result<(), EmgauwaError> {
 | 
				
			||||||
| 
						 | 
					@ -198,7 +191,7 @@ async fn handle_relays(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		db_relay.update(conn, relay.r.name.as_str()).await?;
 | 
							db_relay.update(conn, relay.r.name.as_str()).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		handle_schedules(conn, app_state, relay.schedules.clone()).await?;
 | 
							handle_schedules(conn, relay.schedules.clone()).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
 | 
							let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
 | 
				
			||||||
		for schedule in relay.schedules {
 | 
							for schedule in relay.schedules {
 | 
				
			||||||
| 
						 | 
					@ -212,7 +205,5 @@ async fn handle_relays(
 | 
				
			||||||
		DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
 | 
							DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	app_state.send(app_state::Reload {}).await??;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	Ok(())
 | 
						Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,11 +1,14 @@
 | 
				
			||||||
use std::ops::DerefMut;
 | 
					use std::ops::DerefMut;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use futures::executor::block_on;
 | 
				
			||||||
use serde_derive::{Deserialize, Serialize};
 | 
					use serde_derive::{Deserialize, Serialize};
 | 
				
			||||||
use sqlx::pool::PoolConnection;
 | 
					use sqlx::pool::PoolConnection;
 | 
				
			||||||
use sqlx::Sqlite;
 | 
					use sqlx::Sqlite;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::db::{DbController, DbJunctionTag, DbTag};
 | 
					use crate::db::{DbController, DbJunctionRelaySchedule, DbJunctionTag, DbSchedule, DbTag};
 | 
				
			||||||
use crate::errors::DatabaseError;
 | 
					use crate::errors::DatabaseError;
 | 
				
			||||||
 | 
					use crate::types::Weekday;
 | 
				
			||||||
 | 
					use crate::utils;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
					#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
				
			||||||
pub struct DbRelay {
 | 
					pub struct DbRelay {
 | 
				
			||||||
| 
						 | 
					@ -162,4 +165,14 @@ impl DbRelay {
 | 
				
			||||||
			.await?
 | 
								.await?
 | 
				
			||||||
			.ok_or(DatabaseError::NotFound)
 | 
								.ok_or(DatabaseError::NotFound)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pub async fn get_active_schedule(
 | 
				
			||||||
 | 
							&self,
 | 
				
			||||||
 | 
							conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
 | 
						) -> Result<DbSchedule, DatabaseError> {
 | 
				
			||||||
 | 
							let weekday = utils::get_weekday();
 | 
				
			||||||
 | 
							DbJunctionRelaySchedule::get_schedule(conn, &self, weekday as Weekday)
 | 
				
			||||||
 | 
								.await?
 | 
				
			||||||
 | 
								.ok_or(DatabaseError::NotFound)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -6,8 +6,7 @@ use sqlx::Sqlite;
 | 
				
			||||||
use crate::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
 | 
					use crate::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
 | 
				
			||||||
use crate::errors::DatabaseError;
 | 
					use crate::errors::DatabaseError;
 | 
				
			||||||
use crate::models::FromDbModel;
 | 
					use crate::models::FromDbModel;
 | 
				
			||||||
use crate::types::{ControllerUid, Weekday};
 | 
					use crate::types::ControllerUid;
 | 
				
			||||||
use crate::utils;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
 | 
					#[derive(Serialize, Deserialize, Debug, Clone)]
 | 
				
			||||||
pub struct Relay {
 | 
					pub struct Relay {
 | 
				
			||||||
| 
						 | 
					@ -42,14 +41,7 @@ impl FromDbModel for Relay {
 | 
				
			||||||
		let controller_id = cache.uid.clone();
 | 
							let controller_id = cache.uid.clone();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		let schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?;
 | 
							let schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &db_model))?;
 | 
				
			||||||
 | 
							let active_schedule = block_on(db_model.get_active_schedule(conn))?;
 | 
				
			||||||
		let weekday = utils::get_weekday();
 | 
					 | 
				
			||||||
		let active_schedule = block_on(DbJunctionRelaySchedule::get_schedule(
 | 
					 | 
				
			||||||
			conn,
 | 
					 | 
				
			||||||
			&db_model,
 | 
					 | 
				
			||||||
			weekday as Weekday,
 | 
					 | 
				
			||||||
		))?
 | 
					 | 
				
			||||||
		.ok_or(DatabaseError::NotFound)?;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		Ok(Relay {
 | 
							Ok(Relay {
 | 
				
			||||||
			r: db_model,
 | 
								r: db_model,
 | 
				
			||||||
| 
						 | 
					@ -66,15 +58,16 @@ impl Relay {
 | 
				
			||||||
	pub fn reload(&mut self, conn: &mut PoolConnection<Sqlite>) -> Result<(), DatabaseError> {
 | 
						pub fn reload(&mut self, conn: &mut PoolConnection<Sqlite>) -> Result<(), DatabaseError> {
 | 
				
			||||||
		self.r = block_on(self.r.reload(conn))?;
 | 
							self.r = block_on(self.r.reload(conn))?;
 | 
				
			||||||
		self.schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &self.r))?;
 | 
							self.schedules = block_on(DbJunctionRelaySchedule::get_schedules(conn, &self.r))?;
 | 
				
			||||||
 | 
							self.reload_active_schedule(conn)?;
 | 
				
			||||||
		let weekday = utils::get_weekday();
 | 
					 | 
				
			||||||
		self.active_schedule = block_on(DbJunctionRelaySchedule::get_schedule(
 | 
					 | 
				
			||||||
			conn,
 | 
					 | 
				
			||||||
			&self.r,
 | 
					 | 
				
			||||||
			weekday as Weekday,
 | 
					 | 
				
			||||||
		))?
 | 
					 | 
				
			||||||
		.ok_or(DatabaseError::NotFound)?;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		Ok(())
 | 
							Ok(())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pub fn reload_active_schedule(
 | 
				
			||||||
 | 
							&mut self,
 | 
				
			||||||
 | 
							conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
 | 
						) -> Result<(), DatabaseError> {
 | 
				
			||||||
 | 
							self.active_schedule = block_on(self.r.get_active_schedule(conn))?;
 | 
				
			||||||
 | 
							Ok(())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue