From 6459804e1f9e8a9cf78f7e254e3843226b35c304 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Tue, 28 Nov 2023 00:19:15 +0100 Subject: [PATCH] Improve active handling for controllers --- .gitattributes | 1 + Makefile | 8 ++- emgauwa-controller/src/main.rs | 3 +- emgauwa-core/src/main.rs | 9 +++ emgauwa-lib/src/db/controllers.rs | 36 ++++++++--- emgauwa-lib/src/handlers/v1/ws/controllers.rs | 64 ++++++++++++------- migrations/20231120000000_init.up.sql | 1 + 7 files changed, 87 insertions(+), 35 deletions(-) diff --git a/.gitattributes b/.gitattributes index 718cf25..d722d84 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,3 +1,4 @@ * text=auto Cargo.lock -diff +.sqlx/query-*.json -diff diff --git a/Makefile b/Makefile index c5184dd..98b1e63 100644 --- a/Makefile +++ b/Makefile @@ -3,10 +3,16 @@ build: cargo build sqlx: - rm ./emgauwa-dev.sqlite + rm ./emgauwa-dev.sqlite || true cargo sqlx database create cargo sqlx migrate run cargo sqlx prepare --workspace build-rpi: cross build --target arm-unknown-linux-gnueabihf + +clean-db: + rm ./emgauwa-dev.sqlite || true + rm ./emgauwa-core.sqlite || true + rm ./emgauwa-controller.sqlite || true + $(MAKE) sqlx diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index ec47a93..a9de829 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -28,7 +28,6 @@ async fn create_this_controller( &ControllerUid::default(), &settings.name, i64::try_from(settings.relays.len()).expect("Too many relays"), - true, ) .await .expect("Failed to create controller") @@ -86,7 +85,7 @@ async fn main() { .collect(); let db_controller = db_controller - .update(&mut conn, &db_controller.name, db_relays.len() as i64, true) + .update(&mut conn, &db_controller.name, db_relays.len() as i64) .await .unwrap(); diff --git a/emgauwa-core/src/main.rs b/emgauwa-core/src/main.rs index f07aca9..f41b27c 100644 --- a/emgauwa-core/src/main.rs +++ b/emgauwa-core/src/main.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use crate::utils::drop_privileges; use actix_web::middleware::TrailingSlash; use actix_web::{middleware, web, App, HttpServer}; +use emgauwa_lib::db::DbController; use emgauwa_lib::handlers; use log::{trace, LevelFilter}; use simple_logger::SimpleLogger; @@ -32,6 +33,14 @@ async fn main() -> std::io::Result<()> { let pool = emgauwa_lib::db::init(&settings.database).await; + // This block is to ensure that the connection is dropped after use. + { + let mut conn = pool.acquire().await.unwrap(); + DbController::all_inactive(&mut conn) + .await + .expect("Error setting all controllers inactive"); + } + log::info!("Starting server on {}:{}", settings.host, settings.port); HttpServer::new(move || { let cors = Cors::default().allow_any_method().allow_any_header(); diff --git a/emgauwa-lib/src/db/controllers.rs b/emgauwa-lib/src/db/controllers.rs index ee33800..bc35dda 100644 --- a/emgauwa-lib/src/db/controllers.rs +++ b/emgauwa-lib/src/db/controllers.rs @@ -58,11 +58,10 @@ impl DbController { uid: &ControllerUid, new_name: &str, new_relay_count: i64, - new_active: bool, ) -> Result { match DbController::get_by_uid(conn, uid).await? { Some(tag) => Ok(tag), - None => DbController::create(conn, uid, new_name, new_relay_count, new_active).await, + None => DbController::create(conn, uid, new_name, new_relay_count).await, } } @@ -94,15 +93,13 @@ impl DbController { new_uid: &ControllerUid, new_name: &str, new_relay_count: i64, - new_active: bool, ) -> Result { sqlx::query_as!( DbController, - "INSERT INTO controllers (uid, name, relay_count, active) VALUES (?, ?, ?, ?) RETURNING *", + "INSERT INTO controllers (uid, name, relay_count) VALUES (?, ?, ?) RETURNING *", new_uid, new_name, new_relay_count, - new_active, ) .fetch_optional(conn.deref_mut()) .await? @@ -114,12 +111,28 @@ impl DbController { conn: &mut PoolConnection, new_name: &str, new_relay_count: i64, + ) -> Result { + sqlx::query!( + "UPDATE controllers SET name = ?, relay_count = ? WHERE id = ?", + new_name, + new_relay_count, + self.id, + ) + .execute(conn.deref_mut()) + .await?; + + Self::get(conn, self.id) + .await? + .ok_or(DatabaseError::UpdateGetError) + } + + pub async fn update_active( + &self, + conn: &mut PoolConnection, new_active: bool, ) -> Result { sqlx::query!( - "UPDATE controllers SET name = ?, relay_count = ?, active = ? WHERE id = ?", - new_name, - new_relay_count, + "UPDATE controllers SET active = ? WHERE id = ?", new_active, self.id, ) @@ -130,4 +143,11 @@ impl DbController { .await? .ok_or(DatabaseError::UpdateGetError) } + + pub async fn all_inactive(conn: &mut PoolConnection) -> Result<(), DatabaseError> { + sqlx::query!("UPDATE controllers SET active = 0") + .execute(conn.deref_mut()) + .await?; + Ok(()) + } } diff --git a/emgauwa-lib/src/handlers/v1/ws/controllers.rs b/emgauwa-lib/src/handlers/v1/ws/controllers.rs index 60b7174..bfbceb7 100644 --- a/emgauwa-lib/src/handlers/v1/ws/controllers.rs +++ b/emgauwa-lib/src/handlers/v1/ws/controllers.rs @@ -18,10 +18,18 @@ pub enum ControllerWsAction { struct ControllerWs { pub pool: Pool, + pub controller: Option, } impl Actor for ControllerWs { type Context = ws::WebsocketContext; + + fn stopped(&mut self, _ctx: &mut Self::Context) { + if let Some(controller) = &self.controller { + let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); + futures::executor::block_on(controller.update_active(&mut pool_conn, false)).unwrap(); + } + } } impl StreamHandler> for ControllerWs { @@ -32,7 +40,8 @@ impl StreamHandler> for ControllerWs { Ok(Message::Ping(msg)) => ctx.pong(&msg), Ok(Message::Text(text)) => { let action: ControllerWsAction = serde_json::from_str(&text).unwrap(); - let action_res = futures::executor::block_on(handle_action(&mut pool_conn, action)); + let action_res = + futures::executor::block_on(self.handle_action(&mut pool_conn, action)); if let Err(e) = action_res { log::error!("Error handling action: {:?}", e); ctx.text(serde_json::to_string(&e).unwrap()); @@ -47,33 +56,39 @@ impl StreamHandler> for ControllerWs { } } -pub async fn handle_action( - conn: &mut PoolConnection, - action: ControllerWsAction, -) -> Result<(), DatabaseError> { - match action { - ControllerWsAction::Register(controller) => { - log::info!("Registering controller: {:?}", controller); - let c = &controller.controller; - let controller_db = - DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count, c.active) +impl ControllerWs { + pub async fn handle_action( + &mut self, + conn: &mut PoolConnection, + action: ControllerWsAction, + ) -> Result<(), DatabaseError> { + match action { + ControllerWsAction::Register(controller) => { + log::info!("Registering controller: {:?}", controller); + let c = &controller.controller; + let controller_db = + DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count) + .await?; + controller_db.update_active(conn, true).await?; + + println!("Controller: {:?}", controller_db); + + for relay in &controller.relays { + let r = &relay.relay; + let relay_db = DbRelay::get_by_controller_and_num_or_create( + conn, + &controller_db, + r.number, + &r.name, + ) .await?; + println!("Controller relay: {:?}", relay_db); + } - println!("Controller: {:?}", controller_db); + self.controller = Some(controller_db); - for relay in &controller.relays { - let r = &relay.relay; - let relay_db = DbRelay::get_by_controller_and_num_or_create( - conn, - &controller_db, - r.number, - &r.name, - ) - .await?; - println!("Controller relay: {:?}", relay_db); + Ok(()) } - - Ok(()) } } } @@ -87,6 +102,7 @@ pub async fn index( let resp = ws::start( ControllerWs { pool: pool.get_ref().clone(), + controller: None, }, &req, stream, diff --git a/migrations/20231120000000_init.up.sql b/migrations/20231120000000_init.up.sql index 108c5a6..e370f41 100644 --- a/migrations/20231120000000_init.up.sql +++ b/migrations/20231120000000_init.up.sql @@ -18,6 +18,7 @@ CREATE TABLE controllers active BOOLEAN NOT NULL + DEFAULT false ); CREATE TABLE relays