Refactor initialization of on/off schedule
This commit is contained in:
		
							parent
							
								
									5b54f40ec0
								
							
						
					
					
						commit
						a90ea25b87
					
				
					 4 changed files with 47 additions and 58 deletions
				
			
		| 
						 | 
					@ -27,7 +27,7 @@ async fn create_this_controller(
 | 
				
			||||||
		conn,
 | 
							conn,
 | 
				
			||||||
		&ControllerUid::default(),
 | 
							&ControllerUid::default(),
 | 
				
			||||||
		&settings.name,
 | 
							&settings.name,
 | 
				
			||||||
		i64::try_from(settings.relays.len()).expect("Too many relays"),
 | 
							settings.relays.len() as i64,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	.await
 | 
						.await
 | 
				
			||||||
	.expect("Failed to create controller")
 | 
						.expect("Failed to create controller")
 | 
				
			||||||
| 
						 | 
					@ -91,15 +91,15 @@ async fn main() {
 | 
				
			||||||
	tokio::spawn(run_relay_loop(settings));
 | 
						tokio::spawn(run_relay_loop(settings));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	loop {
 | 
						loop {
 | 
				
			||||||
 | 
							log::info!(
 | 
				
			||||||
 | 
								"Trying to connect in {} seconds...",
 | 
				
			||||||
 | 
								WEBSOCKET_RETRY_TIMEOUT.as_secs()
 | 
				
			||||||
 | 
							);
 | 
				
			||||||
		time::sleep(WEBSOCKET_RETRY_TIMEOUT).await;
 | 
							time::sleep(WEBSOCKET_RETRY_TIMEOUT).await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		let connect_result = connect_async(&url).await;
 | 
							let connect_result = connect_async(&url).await;
 | 
				
			||||||
		if let Err(err) = connect_result {
 | 
							if let Err(err) = connect_result {
 | 
				
			||||||
			log::warn!(
 | 
								log::warn!("Failed to connect to websocket: {}", err,);
 | 
				
			||||||
				"Failed to connect to websocket: {}. Retrying in {} seconds...",
 | 
					 | 
				
			||||||
				err,
 | 
					 | 
				
			||||||
				WEBSOCKET_RETRY_TIMEOUT.as_secs()
 | 
					 | 
				
			||||||
			);
 | 
					 | 
				
			||||||
			continue;
 | 
								continue;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		let (ws_stream, _) = connect_result.unwrap();
 | 
							let (ws_stream, _) = connect_result.unwrap();
 | 
				
			||||||
| 
						 | 
					@ -115,10 +115,7 @@ async fn main() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		read_handler.await;
 | 
							read_handler.await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		log::warn!(
 | 
							log::warn!("Lost connection to websocket");
 | 
				
			||||||
			"Lost connection to websocket. Retrying in {} seconds...",
 | 
					 | 
				
			||||||
			WEBSOCKET_RETRY_TIMEOUT.as_secs()
 | 
					 | 
				
			||||||
		);
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -70,7 +70,13 @@ pub async fn add(
 | 
				
			||||||
) -> Result<HttpResponse, ApiError> {
 | 
					) -> Result<HttpResponse, ApiError> {
 | 
				
			||||||
	let mut pool_conn = pool.acquire().await?;
 | 
						let mut pool_conn = pool.acquire().await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	let new_schedule = DbSchedule::create(&mut pool_conn, &data.name, &data.periods).await?;
 | 
						let new_schedule = DbSchedule::create(
 | 
				
			||||||
 | 
							&mut pool_conn,
 | 
				
			||||||
 | 
							ScheduleUid::default(),
 | 
				
			||||||
 | 
							&data.name,
 | 
				
			||||||
 | 
							&data.periods,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						.await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	new_schedule
 | 
						new_schedule
 | 
				
			||||||
		.set_tags(&mut pool_conn, data.tags.as_slice())
 | 
							.set_tags(&mut pool_conn, data.tags.as_slice())
 | 
				
			||||||
| 
						 | 
					@ -84,8 +90,13 @@ async fn add_list_single(
 | 
				
			||||||
	conn: &mut PoolConnection<Sqlite>,
 | 
						conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
	request_schedule: &RequestSchedule,
 | 
						request_schedule: &RequestSchedule,
 | 
				
			||||||
) -> Result<DbSchedule, DatabaseError> {
 | 
					) -> Result<DbSchedule, DatabaseError> {
 | 
				
			||||||
	let new_schedule =
 | 
						let new_schedule = DbSchedule::create(
 | 
				
			||||||
		DbSchedule::create(conn, &request_schedule.name, &request_schedule.periods).await?;
 | 
							conn,
 | 
				
			||||||
 | 
							ScheduleUid::default(),
 | 
				
			||||||
 | 
							&request_schedule.name,
 | 
				
			||||||
 | 
							&request_schedule.periods,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						.await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	new_schedule
 | 
						new_schedule
 | 
				
			||||||
		.set_tags(conn, request_schedule.tags.as_slice())
 | 
							.set_tags(conn, request_schedule.tags.as_slice())
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,14 +1,9 @@
 | 
				
			||||||
use std::str::FromStr;
 | 
					use std::str::FromStr;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use log::{info, trace};
 | 
					 | 
				
			||||||
use sqlx::migrate::Migrator;
 | 
					use sqlx::migrate::Migrator;
 | 
				
			||||||
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
 | 
					use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
 | 
				
			||||||
use sqlx::{Pool, Sqlite};
 | 
					use sqlx::{Pool, Sqlite};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::db::errors::DatabaseError;
 | 
					 | 
				
			||||||
use crate::db::model_utils::Period;
 | 
					 | 
				
			||||||
use crate::types::ScheduleUid;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
mod controllers;
 | 
					mod controllers;
 | 
				
			||||||
pub mod errors;
 | 
					pub mod errors;
 | 
				
			||||||
mod model_utils;
 | 
					mod model_utils;
 | 
				
			||||||
| 
						 | 
					@ -24,36 +19,10 @@ pub use tag::DbTag;
 | 
				
			||||||
static MIGRATOR: Migrator = sqlx::migrate!("../migrations"); // defaults to "./migrations"
 | 
					static MIGRATOR: Migrator = sqlx::migrate!("../migrations"); // defaults to "./migrations"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub async fn run_migrations(pool: &Pool<Sqlite>) {
 | 
					pub async fn run_migrations(pool: &Pool<Sqlite>) {
 | 
				
			||||||
	info!("Running migrations");
 | 
						log::info!("Running migrations");
 | 
				
			||||||
	MIGRATOR.run(pool).await.expect("Failed to run migrations.");
 | 
						MIGRATOR.run(pool).await.expect("Failed to run migrations.");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn init_schedule(
 | 
					 | 
				
			||||||
	pool: &Pool<Sqlite>,
 | 
					 | 
				
			||||||
	uid: &ScheduleUid,
 | 
					 | 
				
			||||||
	name: &str,
 | 
					 | 
				
			||||||
	periods: DbPeriods,
 | 
					 | 
				
			||||||
) -> Result<(), DatabaseError> {
 | 
					 | 
				
			||||||
	trace!("Initializing schedule {:?}", name);
 | 
					 | 
				
			||||||
	match DbSchedule::get_by_uid(&mut pool.acquire().await.unwrap(), uid).await? {
 | 
					 | 
				
			||||||
		Some(_) => Ok(()),
 | 
					 | 
				
			||||||
		None => {
 | 
					 | 
				
			||||||
			trace!("Schedule {:?} not found, inserting", name);
 | 
					 | 
				
			||||||
			sqlx::query_as!(
 | 
					 | 
				
			||||||
				DbSchedule,
 | 
					 | 
				
			||||||
				"INSERT INTO schedules (uid, name, periods) VALUES (?, ?, ?) RETURNING *",
 | 
					 | 
				
			||||||
				uid,
 | 
					 | 
				
			||||||
				name,
 | 
					 | 
				
			||||||
				periods,
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			.fetch_optional(pool)
 | 
					 | 
				
			||||||
			.await?
 | 
					 | 
				
			||||||
			.ok_or(DatabaseError::InsertGetError)
 | 
					 | 
				
			||||||
			.map(|_| ())
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
pub async fn init(db: &str) -> Pool<Sqlite> {
 | 
					pub async fn init(db: &str) -> Pool<Sqlite> {
 | 
				
			||||||
	let options = SqliteConnectOptions::from_str(db)
 | 
						let options = SqliteConnectOptions::from_str(db)
 | 
				
			||||||
		.expect("Error parsing database path")
 | 
							.expect("Error parsing database path")
 | 
				
			||||||
| 
						 | 
					@ -68,18 +37,14 @@ pub async fn init(db: &str) -> Pool<Sqlite> {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	run_migrations(&pool).await;
 | 
						run_migrations(&pool).await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	init_schedule(&pool, &ScheduleUid::Off, "Off", DbPeriods(vec![]))
 | 
						let mut pool_conn = pool.acquire().await.unwrap();
 | 
				
			||||||
		.await
 | 
					 | 
				
			||||||
		.expect("Error initializing schedule Off");
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	init_schedule(
 | 
						DbSchedule::get_on(&mut pool_conn)
 | 
				
			||||||
		&pool,
 | 
							.await
 | 
				
			||||||
		&ScheduleUid::On,
 | 
							.expect("Failed to init 'on' schedule");
 | 
				
			||||||
		"On",
 | 
						DbSchedule::get_off(&mut pool_conn)
 | 
				
			||||||
		DbPeriods(vec![Period::new_on()]),
 | 
							.await
 | 
				
			||||||
	)
 | 
							.expect("Failed to init 'off' schedule");
 | 
				
			||||||
	.await
 | 
					 | 
				
			||||||
	.expect("Error initializing schedule On");
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pool
 | 
						pool
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -88,14 +88,14 @@ impl DbSchedule {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pub async fn create(
 | 
						pub async fn create(
 | 
				
			||||||
		conn: &mut PoolConnection<Sqlite>,
 | 
							conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
 | 
							new_uid: ScheduleUid,
 | 
				
			||||||
		new_name: &str,
 | 
							new_name: &str,
 | 
				
			||||||
		new_periods: &DbPeriods,
 | 
							new_periods: &DbPeriods,
 | 
				
			||||||
	) -> Result<DbSchedule, DatabaseError> {
 | 
						) -> Result<DbSchedule, DatabaseError> {
 | 
				
			||||||
		let uid = ScheduleUid::default();
 | 
					 | 
				
			||||||
		sqlx::query_as!(
 | 
							sqlx::query_as!(
 | 
				
			||||||
			DbSchedule,
 | 
								DbSchedule,
 | 
				
			||||||
			"INSERT INTO schedules (uid, name, periods) VALUES (?, ?, ?) RETURNING *",
 | 
								"INSERT INTO schedules (uid, name, periods) VALUES (?, ?, ?) RETURNING *",
 | 
				
			||||||
			uid,
 | 
								new_uid,
 | 
				
			||||||
			new_name,
 | 
								new_name,
 | 
				
			||||||
			new_periods,
 | 
								new_periods,
 | 
				
			||||||
		)
 | 
							)
 | 
				
			||||||
| 
						 | 
					@ -104,6 +104,22 @@ impl DbSchedule {
 | 
				
			||||||
		.ok_or(DatabaseError::InsertGetError)
 | 
							.ok_or(DatabaseError::InsertGetError)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pub async fn get_on(conn: &mut PoolConnection<Sqlite>) -> Result<DbSchedule, DatabaseError> {
 | 
				
			||||||
 | 
							if let Some(schedule) = DbSchedule::get_by_uid(conn, &ScheduleUid::On).await? {
 | 
				
			||||||
 | 
								return Ok(schedule);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							let periods = DbPeriods(vec![Period::new_on()]);
 | 
				
			||||||
 | 
							Self::create(conn, ScheduleUid::On, "On", &periods).await
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pub async fn get_off(conn: &mut PoolConnection<Sqlite>) -> Result<DbSchedule, DatabaseError> {
 | 
				
			||||||
 | 
							if let Some(schedule) = DbSchedule::get_by_uid(conn, &ScheduleUid::Off).await? {
 | 
				
			||||||
 | 
								return Ok(schedule);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							let periods = DbPeriods(vec![]);
 | 
				
			||||||
 | 
							Self::create(conn, ScheduleUid::Off, "Off", &periods).await
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pub async fn update(
 | 
						pub async fn update(
 | 
				
			||||||
		&self,
 | 
							&self,
 | 
				
			||||||
		conn: &mut PoolConnection<Sqlite>,
 | 
							conn: &mut PoolConnection<Sqlite>,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue