From c8f40284efe8128ced8a17ff47351f6da7abd456 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Thu, 30 Nov 2023 02:40:28 +0100 Subject: [PATCH] Add feature to import missing schedules --- emgauwa-controller/src/relay_loop.rs | 7 ++- .../src/handlers/v1/ws/controllers.rs | 63 ++++++++++++++----- emgauwa-lib/src/db/errors.rs | 6 +- emgauwa-lib/src/db/relays.rs | 9 ++- emgauwa-lib/src/db/schedules.rs | 15 +++++ emgauwa-lib/src/types/controller_uid.rs | 7 +++ 6 files changed, 81 insertions(+), 26 deletions(-) diff --git a/emgauwa-controller/src/relay_loop.rs b/emgauwa-controller/src/relay_loop.rs index 612fd6a..b9ad749 100644 --- a/emgauwa-controller/src/relay_loop.rs +++ b/emgauwa-controller/src/relay_loop.rs @@ -2,6 +2,7 @@ use std::time::Duration; use chrono::Local; use tokio::time; +use tokio::time::Instant; use crate::settings::Settings; @@ -9,8 +10,8 @@ use crate::settings::Settings; pub async fn run_relay_loop(settings: Settings) { let default_duration = Duration::from_millis(1000); loop { - let next_timestamp = Local::now().naive_local().time() + default_duration; - time::sleep(default_duration).await; - log::debug!("Relay loop: {}", next_timestamp) + let next_timestamp = Instant::now() + default_duration; + time::sleep_until(next_timestamp).await; + log::debug!("Relay loop: {}", Local::now().naive_local().time()) } } diff --git a/emgauwa-core/src/handlers/v1/ws/controllers.rs b/emgauwa-core/src/handlers/v1/ws/controllers.rs index d4e6bf7..bf9933f 100644 --- a/emgauwa-core/src/handlers/v1/ws/controllers.rs +++ b/emgauwa-core/src/handlers/v1/ws/controllers.rs @@ -5,9 +5,10 @@ use actix_web_actors::ws; use actix_web_actors::ws::ProtocolError; use emgauwa_lib::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}; use emgauwa_lib::db::errors::DatabaseError; -use emgauwa_lib::db::{DbController, DbRelay}; +use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule}; use emgauwa_lib::models::{Controller, FromDbModel}; use emgauwa_lib::types::{ConnectedControllersType, ControllerUid, ControllerWsAction}; +use futures::executor::block_on; use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use ws::Message; @@ -28,12 +29,17 @@ impl Actor for ControllerWs { fn stopped(&mut self, _ctx: &mut Self::Context) { if let Some(controller_uid) = &self.controller_uid { - let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); + let mut pool_conn = block_on(self.pool.acquire()).unwrap(); let mut data = self.connected_controllers.lock().unwrap(); if let Some(controller) = data.remove(controller_uid) { - futures::executor::block_on(controller.c.update_active(&mut pool_conn, false)) - .unwrap(); + if let Err(err) = block_on(controller.c.update_active(&mut pool_conn, false)) { + log::error!( + "Failed to mark controller {} as inactive: {:?}", + controller.c.uid, + err + ) + } } } } @@ -49,24 +55,47 @@ impl ControllerWs { ControllerWsAction::Register(controller) => { log::info!("Registering controller: {:?}", controller); let c = &controller.c; - let controller_db = futures::executor::block_on( - DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count), - )?; - futures::executor::block_on(controller_db.update_active(conn, true))?; + let controller_db = block_on(DbController::get_by_uid_or_create( + conn, + &c.uid, + &c.name, + c.relay_count, + ))?; + block_on(controller_db.update_active(conn, true))?; for relay in &controller.relays { - let r = &relay.r; - futures::executor::block_on(DbRelay::get_by_controller_and_num_or_create( - conn, - &controller_db, - r.number, - &r.name, - ))?; + let (new_relay, created) = + block_on(DbRelay::get_by_controller_and_num_or_create( + conn, + &controller_db, + relay.r.number, + &relay.r.name, + ))?; + if created { + let mut relay_schedules = Vec::new(); + for schedule in &relay.schedules { + let (new_schedule, _) = block_on(DbSchedule::get_by_uid_or_create( + conn, + schedule.uid.clone(), + &schedule.name, + &schedule.periods, + ))?; + relay_schedules.push(new_schedule); + } + + block_on(DbJunctionRelaySchedule::set_schedules( + conn, + &new_relay, + relay_schedules.iter().collect(), + ))?; + } } + let controller_uid = &controller.c.uid; + let controller_db = block_on(DbController::get_by_uid(conn, controller_uid))? + .ok_or(DatabaseError::InsertGetError)?; let controller = Controller::from_db_model(conn, controller_db)?; - let controller_uid = &controller.c.uid; self.controller_uid = Some(controller_uid.clone()); let mut data = self.connected_controllers.lock().unwrap(); @@ -97,7 +126,7 @@ impl ControllerWs { impl StreamHandler> for ControllerWs { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); + let mut pool_conn = block_on(self.pool.acquire()).unwrap(); let msg = match msg { Err(_) => { diff --git a/emgauwa-lib/src/db/errors.rs b/emgauwa-lib/src/db/errors.rs index d486c36..c221f11 100644 --- a/emgauwa-lib/src/db/errors.rs +++ b/emgauwa-lib/src/db/errors.rs @@ -13,7 +13,7 @@ pub enum DatabaseError { Protected, UpdateError, UpdateGetError, - Unknown, + Unknown(String), } impl DatabaseError { @@ -53,7 +53,7 @@ impl From<&DatabaseError> for String { DatabaseError::UpdateGetError => { "error on retrieving updated model from database (your entry was saved)" } - DatabaseError::Unknown => "unknown error", + DatabaseError::Unknown(_) => "unknown error", }) } } @@ -68,7 +68,7 @@ impl From for DatabaseError { fn from(value: Error) -> Self { match value { Error::RowNotFound => DatabaseError::NotFound, - _ => DatabaseError::Unknown, + _ => DatabaseError::Unknown(value.to_string()), } } } diff --git a/emgauwa-lib/src/db/relays.rs b/emgauwa-lib/src/db/relays.rs index ec6fe60..04456a7 100644 --- a/emgauwa-lib/src/db/relays.rs +++ b/emgauwa-lib/src/db/relays.rs @@ -56,10 +56,13 @@ impl DbRelay { controller: &DbController, number: i64, new_name: &str, - ) -> Result { + ) -> Result<(DbRelay, bool), DatabaseError> { match DbRelay::get_by_controller_and_num(conn, controller, number).await? { - Some(relay) => Ok(relay), - None => DbRelay::create(conn, new_name, number, controller).await, + Some(relay) => Ok((relay, false)), + None => { + let relay = DbRelay::create(conn, new_name, number, controller).await?; + Ok((relay, true)) + } } } diff --git a/emgauwa-lib/src/db/schedules.rs b/emgauwa-lib/src/db/schedules.rs index caf74d6..102df08 100644 --- a/emgauwa-lib/src/db/schedules.rs +++ b/emgauwa-lib/src/db/schedules.rs @@ -104,6 +104,21 @@ impl DbSchedule { .ok_or(DatabaseError::InsertGetError) } + pub async fn get_by_uid_or_create( + conn: &mut PoolConnection, + uid: ScheduleUid, + name: &str, + periods: &DbPeriods, + ) -> Result<(DbSchedule, bool), DatabaseError> { + match DbSchedule::get_by_uid(conn, &uid).await? { + Some(schedule) => Ok((schedule, false)), + None => { + let schedule = DbSchedule::create(conn, uid, name, periods).await?; + Ok((schedule, true)) + } + } + } + pub async fn get_on(conn: &mut PoolConnection) -> Result { if let Some(schedule) = DbSchedule::get_by_uid(conn, &ScheduleUid::On).await? { return Ok(schedule); diff --git a/emgauwa-lib/src/types/controller_uid.rs b/emgauwa-lib/src/types/controller_uid.rs index 35793fb..7ab51bc 100644 --- a/emgauwa-lib/src/types/controller_uid.rs +++ b/emgauwa-lib/src/types/controller_uid.rs @@ -1,3 +1,4 @@ +use std::fmt::{Display, Formatter}; use std::str::FromStr; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -17,6 +18,12 @@ impl Default for ControllerUid { } } +impl Display for ControllerUid { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", String::from(self)) + } +} + impl Serialize for ControllerUid { fn serialize(&self, serializer: S) -> Result where