Add feature to import missing schedules
This commit is contained in:
		
							parent
							
								
									6400b7745c
								
							
						
					
					
						commit
						c8f40284ef
					
				
					 6 changed files with 81 additions and 26 deletions
				
			
		| 
						 | 
					@ -2,6 +2,7 @@ use std::time::Duration;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use chrono::Local;
 | 
					use chrono::Local;
 | 
				
			||||||
use tokio::time;
 | 
					use tokio::time;
 | 
				
			||||||
 | 
					use tokio::time::Instant;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::settings::Settings;
 | 
					use crate::settings::Settings;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -9,8 +10,8 @@ use crate::settings::Settings;
 | 
				
			||||||
pub async fn run_relay_loop(settings: Settings) {
 | 
					pub async fn run_relay_loop(settings: Settings) {
 | 
				
			||||||
	let default_duration = Duration::from_millis(1000);
 | 
						let default_duration = Duration::from_millis(1000);
 | 
				
			||||||
	loop {
 | 
						loop {
 | 
				
			||||||
		let next_timestamp = Local::now().naive_local().time() + default_duration;
 | 
							let next_timestamp = Instant::now() + default_duration;
 | 
				
			||||||
		time::sleep(default_duration).await;
 | 
							time::sleep_until(next_timestamp).await;
 | 
				
			||||||
		log::debug!("Relay loop: {}", next_timestamp)
 | 
							log::debug!("Relay loop: {}", Local::now().naive_local().time())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -5,9 +5,10 @@ use actix_web_actors::ws;
 | 
				
			||||||
use actix_web_actors::ws::ProtocolError;
 | 
					use actix_web_actors::ws::ProtocolError;
 | 
				
			||||||
use emgauwa_lib::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT};
 | 
					use emgauwa_lib::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT};
 | 
				
			||||||
use emgauwa_lib::db::errors::DatabaseError;
 | 
					use emgauwa_lib::db::errors::DatabaseError;
 | 
				
			||||||
use emgauwa_lib::db::{DbController, DbRelay};
 | 
					use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
 | 
				
			||||||
use emgauwa_lib::models::{Controller, FromDbModel};
 | 
					use emgauwa_lib::models::{Controller, FromDbModel};
 | 
				
			||||||
use emgauwa_lib::types::{ConnectedControllersType, ControllerUid, ControllerWsAction};
 | 
					use emgauwa_lib::types::{ConnectedControllersType, ControllerUid, ControllerWsAction};
 | 
				
			||||||
 | 
					use futures::executor::block_on;
 | 
				
			||||||
use sqlx::pool::PoolConnection;
 | 
					use sqlx::pool::PoolConnection;
 | 
				
			||||||
use sqlx::{Pool, Sqlite};
 | 
					use sqlx::{Pool, Sqlite};
 | 
				
			||||||
use ws::Message;
 | 
					use ws::Message;
 | 
				
			||||||
| 
						 | 
					@ -28,12 +29,17 @@ impl Actor for ControllerWs {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fn stopped(&mut self, _ctx: &mut Self::Context) {
 | 
						fn stopped(&mut self, _ctx: &mut Self::Context) {
 | 
				
			||||||
		if let Some(controller_uid) = &self.controller_uid {
 | 
							if let Some(controller_uid) = &self.controller_uid {
 | 
				
			||||||
			let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap();
 | 
								let mut pool_conn = block_on(self.pool.acquire()).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			let mut data = self.connected_controllers.lock().unwrap();
 | 
								let mut data = self.connected_controllers.lock().unwrap();
 | 
				
			||||||
			if let Some(controller) = data.remove(controller_uid) {
 | 
								if let Some(controller) = data.remove(controller_uid) {
 | 
				
			||||||
				futures::executor::block_on(controller.c.update_active(&mut pool_conn, false))
 | 
									if let Err(err) = block_on(controller.c.update_active(&mut pool_conn, false)) {
 | 
				
			||||||
					.unwrap();
 | 
										log::error!(
 | 
				
			||||||
 | 
											"Failed to mark controller {} as inactive: {:?}",
 | 
				
			||||||
 | 
											controller.c.uid,
 | 
				
			||||||
 | 
											err
 | 
				
			||||||
 | 
										)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -49,24 +55,47 @@ impl ControllerWs {
 | 
				
			||||||
			ControllerWsAction::Register(controller) => {
 | 
								ControllerWsAction::Register(controller) => {
 | 
				
			||||||
				log::info!("Registering controller: {:?}", controller);
 | 
									log::info!("Registering controller: {:?}", controller);
 | 
				
			||||||
				let c = &controller.c;
 | 
									let c = &controller.c;
 | 
				
			||||||
				let controller_db = futures::executor::block_on(
 | 
									let controller_db = block_on(DbController::get_by_uid_or_create(
 | 
				
			||||||
					DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count),
 | 
										conn,
 | 
				
			||||||
				)?;
 | 
										&c.uid,
 | 
				
			||||||
				futures::executor::block_on(controller_db.update_active(conn, true))?;
 | 
										&c.name,
 | 
				
			||||||
 | 
										c.relay_count,
 | 
				
			||||||
 | 
									))?;
 | 
				
			||||||
 | 
									block_on(controller_db.update_active(conn, true))?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				for relay in &controller.relays {
 | 
									for relay in &controller.relays {
 | 
				
			||||||
					let r = &relay.r;
 | 
										let (new_relay, created) =
 | 
				
			||||||
					futures::executor::block_on(DbRelay::get_by_controller_and_num_or_create(
 | 
											block_on(DbRelay::get_by_controller_and_num_or_create(
 | 
				
			||||||
							conn,
 | 
												conn,
 | 
				
			||||||
							&controller_db,
 | 
												&controller_db,
 | 
				
			||||||
						r.number,
 | 
												relay.r.number,
 | 
				
			||||||
						&r.name,
 | 
												&relay.r.name,
 | 
				
			||||||
						))?;
 | 
											))?;
 | 
				
			||||||
 | 
										if created {
 | 
				
			||||||
 | 
											let mut relay_schedules = Vec::new();
 | 
				
			||||||
 | 
											for schedule in &relay.schedules {
 | 
				
			||||||
 | 
												let (new_schedule, _) = block_on(DbSchedule::get_by_uid_or_create(
 | 
				
			||||||
 | 
													conn,
 | 
				
			||||||
 | 
													schedule.uid.clone(),
 | 
				
			||||||
 | 
													&schedule.name,
 | 
				
			||||||
 | 
													&schedule.periods,
 | 
				
			||||||
 | 
												))?;
 | 
				
			||||||
 | 
												relay_schedules.push(new_schedule);
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				let controller = Controller::from_db_model(conn, controller_db)?;
 | 
											block_on(DbJunctionRelaySchedule::set_schedules(
 | 
				
			||||||
 | 
												conn,
 | 
				
			||||||
 | 
												&new_relay,
 | 
				
			||||||
 | 
												relay_schedules.iter().collect(),
 | 
				
			||||||
 | 
											))?;
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				let controller_uid = &controller.c.uid;
 | 
									let controller_uid = &controller.c.uid;
 | 
				
			||||||
 | 
									let controller_db = block_on(DbController::get_by_uid(conn, controller_uid))?
 | 
				
			||||||
 | 
										.ok_or(DatabaseError::InsertGetError)?;
 | 
				
			||||||
 | 
									let controller = Controller::from_db_model(conn, controller_db)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				self.controller_uid = Some(controller_uid.clone());
 | 
									self.controller_uid = Some(controller_uid.clone());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				let mut data = self.connected_controllers.lock().unwrap();
 | 
									let mut data = self.connected_controllers.lock().unwrap();
 | 
				
			||||||
| 
						 | 
					@ -97,7 +126,7 @@ impl ControllerWs {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl StreamHandler<Result<Message, ProtocolError>> for ControllerWs {
 | 
					impl StreamHandler<Result<Message, ProtocolError>> for ControllerWs {
 | 
				
			||||||
	fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
 | 
						fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
 | 
				
			||||||
		let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap();
 | 
							let mut pool_conn = block_on(self.pool.acquire()).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		let msg = match msg {
 | 
							let msg = match msg {
 | 
				
			||||||
			Err(_) => {
 | 
								Err(_) => {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -13,7 +13,7 @@ pub enum DatabaseError {
 | 
				
			||||||
	Protected,
 | 
						Protected,
 | 
				
			||||||
	UpdateError,
 | 
						UpdateError,
 | 
				
			||||||
	UpdateGetError,
 | 
						UpdateGetError,
 | 
				
			||||||
	Unknown,
 | 
						Unknown(String),
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl DatabaseError {
 | 
					impl DatabaseError {
 | 
				
			||||||
| 
						 | 
					@ -53,7 +53,7 @@ impl From<&DatabaseError> for String {
 | 
				
			||||||
			DatabaseError::UpdateGetError => {
 | 
								DatabaseError::UpdateGetError => {
 | 
				
			||||||
				"error on retrieving updated model from database (your entry was saved)"
 | 
									"error on retrieving updated model from database (your entry was saved)"
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			DatabaseError::Unknown => "unknown error",
 | 
								DatabaseError::Unknown(_) => "unknown error",
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -68,7 +68,7 @@ impl From<Error> for DatabaseError {
 | 
				
			||||||
	fn from(value: Error) -> Self {
 | 
						fn from(value: Error) -> Self {
 | 
				
			||||||
		match value {
 | 
							match value {
 | 
				
			||||||
			Error::RowNotFound => DatabaseError::NotFound,
 | 
								Error::RowNotFound => DatabaseError::NotFound,
 | 
				
			||||||
			_ => DatabaseError::Unknown,
 | 
								_ => DatabaseError::Unknown(value.to_string()),
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -56,10 +56,13 @@ impl DbRelay {
 | 
				
			||||||
		controller: &DbController,
 | 
							controller: &DbController,
 | 
				
			||||||
		number: i64,
 | 
							number: i64,
 | 
				
			||||||
		new_name: &str,
 | 
							new_name: &str,
 | 
				
			||||||
	) -> Result<DbRelay, DatabaseError> {
 | 
						) -> Result<(DbRelay, bool), DatabaseError> {
 | 
				
			||||||
		match DbRelay::get_by_controller_and_num(conn, controller, number).await? {
 | 
							match DbRelay::get_by_controller_and_num(conn, controller, number).await? {
 | 
				
			||||||
			Some(relay) => Ok(relay),
 | 
								Some(relay) => Ok((relay, false)),
 | 
				
			||||||
			None => DbRelay::create(conn, new_name, number, controller).await,
 | 
								None => {
 | 
				
			||||||
 | 
									let relay = DbRelay::create(conn, new_name, number, controller).await?;
 | 
				
			||||||
 | 
									Ok((relay, true))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -104,6 +104,21 @@ impl DbSchedule {
 | 
				
			||||||
		.ok_or(DatabaseError::InsertGetError)
 | 
							.ok_or(DatabaseError::InsertGetError)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pub async fn get_by_uid_or_create(
 | 
				
			||||||
 | 
							conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
 | 
							uid: ScheduleUid,
 | 
				
			||||||
 | 
							name: &str,
 | 
				
			||||||
 | 
							periods: &DbPeriods,
 | 
				
			||||||
 | 
						) -> Result<(DbSchedule, bool), DatabaseError> {
 | 
				
			||||||
 | 
							match DbSchedule::get_by_uid(conn, &uid).await? {
 | 
				
			||||||
 | 
								Some(schedule) => Ok((schedule, false)),
 | 
				
			||||||
 | 
								None => {
 | 
				
			||||||
 | 
									let schedule = DbSchedule::create(conn, uid, name, periods).await?;
 | 
				
			||||||
 | 
									Ok((schedule, true))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pub async fn get_on(conn: &mut PoolConnection<Sqlite>) -> Result<DbSchedule, DatabaseError> {
 | 
						pub async fn get_on(conn: &mut PoolConnection<Sqlite>) -> Result<DbSchedule, DatabaseError> {
 | 
				
			||||||
		if let Some(schedule) = DbSchedule::get_by_uid(conn, &ScheduleUid::On).await? {
 | 
							if let Some(schedule) = DbSchedule::get_by_uid(conn, &ScheduleUid::On).await? {
 | 
				
			||||||
			return Ok(schedule);
 | 
								return Ok(schedule);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,3 +1,4 @@
 | 
				
			||||||
 | 
					use std::fmt::{Display, Formatter};
 | 
				
			||||||
use std::str::FromStr;
 | 
					use std::str::FromStr;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
 | 
					use serde::{Deserialize, Deserializer, Serialize, Serializer};
 | 
				
			||||||
| 
						 | 
					@ -17,6 +18,12 @@ impl Default for ControllerUid {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Display for ControllerUid {
 | 
				
			||||||
 | 
						fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 | 
				
			||||||
 | 
							write!(f, "{}", String::from(self))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Serialize for ControllerUid {
 | 
					impl Serialize for ControllerUid {
 | 
				
			||||||
	fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 | 
						fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 | 
				
			||||||
	where
 | 
						where
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue