From 02c613e0fd335adba7e0274e803cd83d1bb5bad6 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Thu, 2 May 2024 19:33:25 +0200 Subject: [PATCH] Revert "Add sql transactions" This caused the error "locked database". This reverts commit 9823511b6225e80a1da687762582ba18cfdc2b3c. --- src/app_state.rs | 12 ++-- src/handlers/v1/controllers.rs | 30 ++++----- src/handlers/v1/macros.rs | 58 ++++++++--------- src/handlers/v1/relays.rs | 66 +++++++++----------- src/handlers/v1/schedules.rs | 72 ++++++++++------------ src/handlers/v1/tags.rs | 27 +++----- src/handlers/v1/ws/controllers/handlers.rs | 23 ++++--- src/handlers/v1/ws/controllers/mod.rs | 15 ++++- src/main.rs | 6 +- 9 files changed, 138 insertions(+), 171 deletions(-) diff --git a/src/app_state.rs b/src/app_state.rs index f3eac5d..04747a2 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -63,10 +63,9 @@ impl AppState { } fn get_relays(&self) -> Result, EmgauwaError> { - let mut tx = block_on(self.pool.begin())?; - let db_controllers = block_on(DbController::get_all(&mut tx))?; - let mut controllers: Vec = convert_db_list(&mut tx, db_controllers)?; - block_on(tx.commit())?; + let mut pool_conn = block_on(self.pool.acquire())?; + let db_controllers = block_on(DbController::get_all(&mut pool_conn))?; + let mut controllers: Vec = convert_db_list(&mut pool_conn, db_controllers)?; self.connected_controllers .iter() @@ -114,11 +113,11 @@ impl Handler for AppState { type Result = Result<(), EmgauwaError>; fn handle(&mut self, msg: DisconnectController, _ctx: &mut Self::Context) -> Self::Result { - let mut tx = block_on(self.pool.begin())?; + let mut pool_conn = block_on(self.pool.acquire())?; if let Some((controller, address)) = self.connected_controllers.remove(&msg.controller_uid) { - if let Err(err) = block_on(controller.c.update_active(&mut tx, false)) { + if let Err(err) = block_on(controller.c.update_active(&mut pool_conn, false)) { log::error!( "Failed to mark controller {} as inactive: {:?}", controller.c.uid, @@ -129,7 +128,6 @@ impl Handler for AppState { //block_on(address.send(ControllerWsAction::Disconnect))??; address.do_send(ControllerWsAction::Disconnect); } - block_on(tx.commit())?; self.notify_relay_clients(); Ok(()) } diff --git a/src/handlers/v1/controllers.rs b/src/handlers/v1/controllers.rs index 90c39e6..3fdb931 100644 --- a/src/handlers/v1/controllers.rs +++ b/src/handlers/v1/controllers.rs @@ -11,13 +11,12 @@ use crate::app_state::AppState; #[get("/controllers")] pub async fn index(pool: web::Data>) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let db_controllers = DbController::get_all(&mut tx).await?; + let db_controllers = DbController::get_all(&mut pool_conn).await?; - let controllers: Vec = convert_db_list(&mut tx, db_controllers)?; + let controllers: Vec = convert_db_list(&mut pool_conn, db_controllers)?; - tx.commit().await?; Ok(HttpResponse::Ok().json(controllers)) } @@ -26,18 +25,16 @@ pub async fn show( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; - let controller = DbController::get_by_uid(&mut tx, &uid) + let controller = DbController::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let return_controller = Controller::from_db_model(&mut tx, controller)?; - - tx.commit().await?; + let return_controller = Controller::from_db_model(&mut pool_conn, controller)?; Ok(HttpResponse::Ok().json(return_controller)) } @@ -48,20 +45,20 @@ pub async fn update( path: web::Path<(String,)>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; - let controller = DbController::get_by_uid(&mut tx, &uid) + let controller = DbController::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; let controller = controller - .update(&mut tx, data.name.as_str(), controller.relay_count) + .update(&mut pool_conn, data.name.as_str(), controller.relay_count) .await?; - let return_controller = Controller::from_db_model(&mut tx, controller)?; + let return_controller = Controller::from_db_model(&mut pool_conn, controller)?; app_state .send(app_state::Action { @@ -70,7 +67,6 @@ pub async fn update( }) .await??; - tx.commit().await?; Ok(HttpResponse::Ok().json(return_controller)) } @@ -80,7 +76,7 @@ pub async fn delete( app_state: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; @@ -91,8 +87,6 @@ pub async fn delete( }) .await??; - DbController::delete_by_uid(&mut tx, uid).await?; - - tx.commit().await?; + DbController::delete_by_uid(&mut pool_conn, uid).await?; Ok(HttpResponse::Ok().json("controller got deleted")) } diff --git a/src/handlers/v1/macros.rs b/src/handlers/v1/macros.rs index 9703c7f..0e3fc60 100644 --- a/src/handlers/v1/macros.rs +++ b/src/handlers/v1/macros.rs @@ -14,12 +14,11 @@ use crate::app_state::AppState; #[get("/macros")] pub async fn index(pool: web::Data>) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let db_macros = DbMacro::get_all(&mut tx).await?; - let macros: Vec = convert_db_list(&mut tx, db_macros)?; + let db_macros = DbMacro::get_all(&mut pool_conn).await?; + let macros: Vec = convert_db_list(&mut pool_conn, db_macros)?; - tx.commit().await?; Ok(HttpResponse::Ok().json(macros)) } @@ -28,18 +27,16 @@ pub async fn show( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (macro_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(macro_uid.as_str())?; - let db_macro = DbMacro::get_by_uid(&mut tx, &uid) + let db_macro = DbMacro::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let return_macro = Macro::from_db_model(&mut tx, db_macro)?; - - tx.commit().await?; + let return_macro = Macro::from_db_model(&mut pool_conn, db_macro)?; Ok(HttpResponse::Ok().json(return_macro)) } @@ -48,17 +45,15 @@ pub async fn add( pool: web::Data>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let new_macro = DbMacro::create(&mut tx, EmgauwaUid::default(), &data.name).await?; + let new_macro = DbMacro::create(&mut pool_conn, EmgauwaUid::default(), &data.name).await?; new_macro - .set_actions(&mut tx, data.actions.as_slice()) + .set_actions(&mut pool_conn, data.actions.as_slice()) .await?; - let return_macro = Macro::from_db_model(&mut tx, new_macro)?; - - tx.commit().await?; + let return_macro = Macro::from_db_model(&mut pool_conn, new_macro)?; Ok(HttpResponse::Created().json(return_macro)) } @@ -68,28 +63,26 @@ pub async fn update( path: web::Path<(String,)>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (macro_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(macro_uid.as_str())?; - let db_macro = DbMacro::get_by_uid(&mut tx, &uid) + let db_macro = DbMacro::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; if let Some(name) = &data.name { - db_macro.update(&mut tx, name).await?; + db_macro.update(&mut pool_conn, name).await?; } if let Some(actions) = &data.actions { db_macro - .set_actions(&mut tx, actions.as_slice()) + .set_actions(&mut pool_conn, actions.as_slice()) .await?; } - let return_macro = Macro::from_db_model(&mut tx, db_macro)?; - - tx.commit().await?; + let return_macro = Macro::from_db_model(&mut pool_conn, db_macro)?; Ok(HttpResponse::Ok().json(return_macro)) } @@ -98,14 +91,12 @@ pub async fn delete( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (macro_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(macro_uid.as_str())?; - DbMacro::delete_by_uid(&mut tx, uid).await?; - - tx.commit().await?; + DbMacro::delete_by_uid(&mut pool_conn, uid).await?; Ok(HttpResponse::Ok().json("macro got deleted")) } @@ -116,27 +107,27 @@ pub async fn execute( path: web::Path<(String,)>, query: web::Query, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (macro_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(macro_uid.as_str())?; - let db_macro = DbMacro::get_by_uid(&mut tx, &uid) + let db_macro = DbMacro::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; let actions_db = match query.weekday { - None => db_macro.get_actions(&mut tx).await?, + None => db_macro.get_actions(&mut pool_conn).await?, Some(weekday) => { db_macro - .get_actions_weekday(&mut tx, weekday) + .get_actions_weekday(&mut pool_conn, weekday) .await? } }; - let mut actions: Vec = convert_db_list(&mut tx, actions_db)?; + let mut actions: Vec = convert_db_list(&mut pool_conn, actions_db)?; for action in &actions { - action.execute(&mut tx).await?; + action.execute(&mut pool_conn).await?; } let affected_controller_uids: Vec = actions @@ -153,7 +144,7 @@ pub async fn execute( if affected_relay_ids.contains(&action.relay.r.id) { continue; } - action.relay.reload(&mut tx)?; + action.relay.reload(&mut pool_conn)?; affected_relays.push(action.relay.clone()); affected_relay_ids.push(action.relay.r.id); } @@ -166,6 +157,5 @@ pub async fn execute( .await??; } - tx.commit().await?; Ok(HttpResponse::Ok().finish()) // TODO add a message? } diff --git a/src/handlers/v1/relays.rs b/src/handlers/v1/relays.rs index 96c1e15..f07c081 100644 --- a/src/handlers/v1/relays.rs +++ b/src/handlers/v1/relays.rs @@ -14,13 +14,12 @@ use crate::app_state::AppState; #[get("/relays")] pub async fn index(pool: web::Data>) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let db_relays = DbRelay::get_all(&mut tx).await?; + let db_relays = DbRelay::get_all(&mut pool_conn).await?; - let relays: Vec = convert_db_list(&mut tx, db_relays)?; + let relays: Vec = convert_db_list(&mut pool_conn, db_relays)?; - tx.commit().await?; Ok(HttpResponse::Ok().json(relays)) } @@ -29,17 +28,16 @@ pub async fn tagged( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (tag,) = path.into_inner(); - let tag_db = DbTag::get_by_tag(&mut tx, &tag) + let tag_db = DbTag::get_by_tag(&mut pool_conn, &tag) .await? .ok_or(DatabaseError::NotFound)?; - let db_relays = DbRelay::get_by_tag(&mut tx, &tag_db).await?; - let relays: Vec = convert_db_list(&mut tx, db_relays)?; + let db_relays = DbRelay::get_by_tag(&mut pool_conn, &tag_db).await?; + let relays: Vec = convert_db_list(&mut pool_conn, db_relays)?; - tx.commit().await?; Ok(HttpResponse::Ok().json(relays)) } @@ -48,20 +46,18 @@ pub async fn index_for_controller( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid,) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; - let controller = DbController::get_by_uid(&mut tx, &uid) + let controller = DbController::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let db_relays = controller.get_relays(&mut tx).await?; + let db_relays = controller.get_relays(&mut pool_conn).await?; - let relays: Vec = convert_db_list(&mut tx, db_relays)?; - - tx.commit().await?; + let relays: Vec = convert_db_list(&mut pool_conn, db_relays)?; Ok(HttpResponse::Ok().json(relays)) } @@ -70,22 +66,20 @@ pub async fn show_for_controller( pool: web::Data>, path: web::Path<(String, i64)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid, relay_num) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; - let controller = DbController::get_by_uid(&mut tx, &uid) + let controller = DbController::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let relay = DbRelay::get_by_controller_and_num(&mut tx, &controller, relay_num) + let relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num) .await? .ok_or(DatabaseError::NotFound)?; - let return_relay = Relay::from_db_model(&mut tx, relay)?; - - tx.commit().await?; + let return_relay = Relay::from_db_model(&mut pool_conn, relay)?; Ok(HttpResponse::Ok().json(return_relay)) } @@ -96,32 +90,32 @@ pub async fn update_for_controller( path: web::Path<(String, i64)>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid, relay_num) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; - let controller = DbController::get_by_uid(&mut tx, &uid) + let controller = DbController::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let mut relay = DbRelay::get_by_controller_and_num(&mut tx, &controller, relay_num) + let mut relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num) .await? .ok_or(DatabaseError::NotFound)?; if let Some(name) = &data.name { - relay = relay.update(&mut tx, name.as_str()).await?; + relay = relay.update(&mut pool_conn, name.as_str()).await?; } if let Some(schedule_uids) = &data.schedules { if schedule_uids.len() == 7 { let mut schedules = Vec::new(); for s_uid in schedule_uids { - schedules.push(s_uid.get_schedule(&mut tx).await?); + schedules.push(s_uid.get_schedule(&mut pool_conn).await?); } DbJunctionRelaySchedule::set_schedules( - &mut tx, + &mut pool_conn, &relay, schedules.iter().collect(), ) @@ -130,9 +124,9 @@ pub async fn update_for_controller( } if let Some(s_uid) = &data.active_schedule { - let schedule = s_uid.get_schedule(&mut tx).await?; + let schedule = s_uid.get_schedule(&mut pool_conn).await?; DbJunctionRelaySchedule::set_schedule( - &mut tx, + &mut pool_conn, &relay, &schedule, utils::get_weekday(), @@ -141,12 +135,12 @@ pub async fn update_for_controller( } if let Some(tags) = &data.tags { - relay.set_tags(&mut tx, tags.as_slice()).await?; + relay.set_tags(&mut pool_conn, tags.as_slice()).await?; } - let relay = relay.reload(&mut tx).await?; + let relay = relay.reload(&mut pool_conn).await?; - let return_relay = Relay::from_db_model(&mut tx, relay)?; + let return_relay = Relay::from_db_model(&mut pool_conn, relay)?; app_state .send(app_state::Action { @@ -155,7 +149,6 @@ pub async fn update_for_controller( }) .await??; - tx.commit().await?; Ok(HttpResponse::Ok().json(return_relay)) } @@ -166,16 +159,16 @@ pub async fn pulse( path: web::Path<(String, i64)>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (controller_uid, relay_num) = path.into_inner(); let uid = EmgauwaUid::try_from(controller_uid.as_str())?; - let controller = DbController::get_by_uid(&mut tx, &uid) + let controller = DbController::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let relay = DbRelay::get_by_controller_and_num(&mut tx, &controller, relay_num) + let relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num) .await? .ok_or(DatabaseError::NotFound)?; @@ -188,6 +181,5 @@ pub async fn pulse( }) .await??; - tx.commit().await?; Ok(HttpResponse::Ok().finish()) // TODO add a message? } diff --git a/src/handlers/v1/schedules.rs b/src/handlers/v1/schedules.rs index 548610e..8e329a1 100644 --- a/src/handlers/v1/schedules.rs +++ b/src/handlers/v1/schedules.rs @@ -7,7 +7,7 @@ use emgauwa_common::types::{ ControllerWsAction, RequestScheduleCreate, RequestScheduleUpdate, ScheduleUid, }; use itertools::Itertools; -use sqlx::Transaction; +use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use crate::app_state; @@ -15,12 +15,11 @@ use crate::app_state::AppState; #[get("/schedules")] pub async fn index(pool: web::Data>) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let db_schedules = DbSchedule::get_all(&mut tx).await?; - let schedules: Vec = convert_db_list(&mut tx, db_schedules)?; + let db_schedules = DbSchedule::get_all(&mut pool_conn).await?; + let schedules: Vec = convert_db_list(&mut pool_conn, db_schedules)?; - tx.commit().await?; Ok(HttpResponse::Ok().json(schedules)) } @@ -29,17 +28,16 @@ pub async fn tagged( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (tag,) = path.into_inner(); - let tag_db = DbTag::get_by_tag(&mut tx, &tag) + let tag_db = DbTag::get_by_tag(&mut pool_conn, &tag) .await? .ok_or(DatabaseError::NotFound)?; - let db_schedules = DbSchedule::get_by_tag(&mut tx, &tag_db).await?; - let schedules: Vec = convert_db_list(&mut tx, db_schedules)?; + let db_schedules = DbSchedule::get_by_tag(&mut pool_conn, &tag_db).await?; + let schedules: Vec = convert_db_list(&mut pool_conn, db_schedules)?; - tx.commit().await?; Ok(HttpResponse::Ok().json(schedules)) } @@ -48,18 +46,16 @@ pub async fn show( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (schedule_uid,) = path.into_inner(); let uid = ScheduleUid::try_from(schedule_uid.as_str())?; - let schedule = DbSchedule::get_by_uid(&mut tx, &uid) + let schedule = DbSchedule::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; - let return_schedule = Schedule::from_db_model(&mut tx, schedule)?; - - tx.commit().await?; + let return_schedule = Schedule::from_db_model(&mut pool_conn, schedule)?; Ok(HttpResponse::Ok().json(return_schedule)) } @@ -68,10 +64,10 @@ pub async fn add( pool: web::Data>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let new_schedule = DbSchedule::create( - &mut tx, + &mut pool_conn, ScheduleUid::default(), &data.name, &data.periods, @@ -80,22 +76,20 @@ pub async fn add( if let Some(tags) = &data.tags { new_schedule - .set_tags(&mut tx, tags.as_slice()) + .set_tags(&mut pool_conn, tags.as_slice()) .await?; } - let return_schedule = Schedule::from_db_model(&mut tx, new_schedule)?; - - tx.commit().await?; + let return_schedule = Schedule::from_db_model(&mut pool_conn, new_schedule)?; Ok(HttpResponse::Created().json(return_schedule)) } async fn add_list_single( - tx: &mut Transaction<'_, Sqlite>, + conn: &mut PoolConnection, request_schedule: &RequestScheduleCreate, ) -> Result { let new_schedule = DbSchedule::create( - tx, + conn, ScheduleUid::default(), &request_schedule.name, &request_schedule.periods, @@ -103,7 +97,7 @@ async fn add_list_single( .await?; if let Some(tags) = &request_schedule.tags { - new_schedule.set_tags(tx, tags.as_slice()).await?; + new_schedule.set_tags(conn, tags.as_slice()).await?; } Ok(new_schedule) @@ -114,17 +108,15 @@ pub async fn add_list( pool: web::Data>, data: web::Json>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let mut db_schedules: Vec = Vec::new(); for s in data.iter() { - let new_s = add_list_single(&mut tx, s).await?; + let new_s = futures::executor::block_on(add_list_single(&mut pool_conn, s))?; db_schedules.push(new_s); } - let schedules: Vec = convert_db_list(&mut tx, db_schedules)?; - - tx.commit().await?; + let schedules: Vec = convert_db_list(&mut pool_conn, db_schedules)?; Ok(HttpResponse::Created().json(schedules)) } @@ -135,12 +127,12 @@ pub async fn update( path: web::Path<(String,)>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (schedule_uid,) = path.into_inner(); let uid = ScheduleUid::try_from(schedule_uid.as_str())?; - let schedule = DbSchedule::get_by_uid(&mut tx, &uid) + let schedule = DbSchedule::get_by_uid(&mut pool_conn, &uid) .await? .ok_or(DatabaseError::NotFound)?; @@ -154,13 +146,13 @@ pub async fn update( Some(period) => period, }; - let schedule = schedule.update(&mut tx, name, periods).await?; + let schedule = schedule.update(&mut pool_conn, name, periods).await?; if let Some(tags) = &data.tags { - schedule.set_tags(&mut tx, tags.as_slice()).await?; + schedule.set_tags(&mut pool_conn, tags.as_slice()).await?; } - let controller_ids: Vec = DbJunctionRelaySchedule::get_relays(&mut tx, &schedule) + let controller_ids: Vec = DbJunctionRelaySchedule::get_relays(&mut pool_conn, &schedule) .await? .into_iter() .map(|r| r.controller_id) @@ -168,7 +160,7 @@ pub async fn update( .collect(); for controller_id in controller_ids { - let controller = DbController::get(&mut tx, controller_id) + let controller = DbController::get(&mut pool_conn, controller_id) .await? .ok_or(DatabaseError::NotFound)?; app_state @@ -179,9 +171,7 @@ pub async fn update( .await??; } - let return_schedule = Schedule::from_db_model(&mut tx, schedule)?; - - tx.commit().await?; + let return_schedule = Schedule::from_db_model(&mut pool_conn, schedule)?; Ok(HttpResponse::Ok().json(return_schedule)) } @@ -190,6 +180,8 @@ pub async fn delete( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { + let mut pool_conn = pool.acquire().await?; + let (schedule_uid,) = path.into_inner(); let uid = ScheduleUid::try_from(schedule_uid.as_str())?; @@ -197,9 +189,7 @@ pub async fn delete( ScheduleUid::Off => Err(EmgauwaError::from(ApiError::ProtectedSchedule)), ScheduleUid::On => Err(EmgauwaError::from(ApiError::ProtectedSchedule)), ScheduleUid::Any(_) => { - let mut tx = pool.begin().await?; - DbSchedule::delete_by_uid(&mut tx, uid).await?; - tx.commit().await?; + DbSchedule::delete_by_uid(&mut pool_conn, uid).await?; Ok(HttpResponse::Ok().json("schedule got deleted")) } } diff --git a/src/handlers/v1/tags.rs b/src/handlers/v1/tags.rs index 6dfa221..cf4ef36 100644 --- a/src/handlers/v1/tags.rs +++ b/src/handlers/v1/tags.rs @@ -7,13 +7,12 @@ use sqlx::{Pool, Sqlite}; #[get("/tags")] pub async fn index(pool: web::Data>) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let db_tags = DbTag::get_all(&mut tx).await?; + let db_tags = DbTag::get_all(&mut pool_conn).await?; let tags: Vec = db_tags.iter().map(|t| t.tag.clone()).collect(); - tx.commit().await?; Ok(HttpResponse::Ok().json(tags)) } @@ -22,17 +21,15 @@ pub async fn show( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (tag_name,) = path.into_inner(); - let tag = DbTag::get_by_tag(&mut tx, &tag_name) + let tag = DbTag::get_by_tag(&mut pool_conn, &tag_name) .await? .ok_or(DatabaseError::NotFound)?; - let return_tag = Tag::from_db_model(&mut tx, tag)?; - - tx.commit().await?; + let return_tag = Tag::from_db_model(&mut pool_conn, tag)?; Ok(HttpResponse::Ok().json(return_tag)) } @@ -41,13 +38,11 @@ pub async fn delete( pool: web::Data>, path: web::Path<(String,)>, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; let (tag_name,) = path.into_inner(); - DbTag::delete_by_tag(&mut tx, &tag_name).await?; - - tx.commit().await?; + DbTag::delete_by_tag(&mut pool_conn, &tag_name).await?; Ok(HttpResponse::Ok().json("tag got deleted")) } @@ -56,13 +51,11 @@ pub async fn add( pool: web::Data>, data: web::Json, ) -> Result { - let mut tx = pool.begin().await?; + let mut pool_conn = pool.acquire().await?; - let new_tag = DbTag::create(&mut tx, &data.tag).await?; + let new_tag = DbTag::create(&mut pool_conn, &data.tag).await?; let cache = (Vec::new(), Vec::new()); // a new tag can't have any relays or schedules - let return_tag = Tag::from_db_model_cache(&mut tx, new_tag, cache)?; - - tx.commit().await?; + let return_tag = Tag::from_db_model_cache(&mut pool_conn, new_tag, cache)?; Ok(HttpResponse::Created().json(return_tag)) } diff --git a/src/handlers/v1/ws/controllers/handlers.rs b/src/handlers/v1/ws/controllers/handlers.rs index bb7ff50..9a01a7b 100644 --- a/src/handlers/v1/ws/controllers/handlers.rs +++ b/src/handlers/v1/ws/controllers/handlers.rs @@ -5,6 +5,8 @@ use emgauwa_common::models::{Controller, FromDbModel}; use emgauwa_common::types::{ControllerWsAction, EmgauwaUid, RelayStates}; use emgauwa_common::utils; use futures::executor::block_on; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; use crate::app_state::{Action, ConnectController, UpdateRelayStates}; use crate::handlers::v1::ws::controllers::ControllersWs; @@ -12,6 +14,7 @@ use crate::handlers::v1::ws::controllers::ControllersWs; impl ControllersWs { pub fn handle_register( &mut self, + conn: &mut PoolConnection, ctx: &mut ::Context, controller: Controller, ) -> Result<(), EmgauwaError> { @@ -20,19 +23,16 @@ impl ControllersWs { controller.c.name, controller.c.uid ); - - let mut tx = block_on(self.pool.begin())?; - let c = &controller.c; let controller_db = block_on(DbController::get_by_uid_or_create( - &mut tx, + conn, &c.uid, &c.name, c.relay_count, ))?; - block_on(controller_db.update_active(&mut tx, true))?; + block_on(controller_db.update_active(conn, true))?; // update only the relay count - block_on(controller_db.update(&mut tx, &controller_db.name, c.relay_count))?; + block_on(controller_db.update(conn, &controller_db.name, c.relay_count))?; for relay in &controller.relays { log::debug!( @@ -45,7 +45,7 @@ impl ControllersWs { } ); let (new_relay, created) = block_on(DbRelay::get_by_controller_and_num_or_create( - &mut tx, + conn, &controller_db, relay.r.number, &relay.r.name, @@ -54,7 +54,7 @@ impl ControllersWs { let mut relay_schedules = Vec::new(); for schedule in &relay.schedules { let (new_schedule, _) = block_on(DbSchedule::get_by_uid_or_create( - &mut tx, + conn, schedule.uid.clone(), &schedule.name, &schedule.periods, @@ -63,7 +63,7 @@ impl ControllersWs { } block_on(DbJunctionRelaySchedule::set_schedules( - &mut tx, + conn, &new_relay, relay_schedules.iter().collect(), ))?; @@ -71,9 +71,9 @@ impl ControllersWs { } let controller_uid = &controller.c.uid; - let controller_db = block_on(DbController::get_by_uid(&mut tx, controller_uid))? + let controller_db = block_on(DbController::get_by_uid(conn, controller_uid))? .ok_or(DatabaseError::InsertGetError)?; - let controller = Controller::from_db_model(&mut tx, controller_db)?; + let controller = Controller::from_db_model(conn, controller_db)?; let addr = ctx.address(); self.controller_uid = Some(controller_uid.clone()); @@ -91,7 +91,6 @@ impl ControllersWs { action: ControllerWsAction::Relays(controller.relays), }))??; - block_on(tx.commit())?; log::debug!("Done registering controller"); Ok(()) } diff --git a/src/handlers/v1/ws/controllers/mod.rs b/src/handlers/v1/ws/controllers/mod.rs index ac72561..2b39e5a 100644 --- a/src/handlers/v1/ws/controllers/mod.rs +++ b/src/handlers/v1/ws/controllers/mod.rs @@ -9,6 +9,7 @@ use emgauwa_common::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}; use emgauwa_common::errors::EmgauwaError; use emgauwa_common::types::{ControllerWsAction, EmgauwaUid}; use futures::executor::block_on; +use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use ws::Message; @@ -47,11 +48,12 @@ impl Actor for ControllersWs { impl ControllersWs { pub fn handle_action( &mut self, + conn: &mut PoolConnection, ctx: &mut ::Context, action: ControllerWsAction, ) { let action_res = match action { - ControllerWsAction::Register(controller) => self.handle_register(ctx, controller), + ControllerWsAction::Register(controller) => self.handle_register(conn, ctx, controller), ControllerWsAction::RelayStates((controller_uid, relay_states)) => { self.handle_relay_states(controller_uid, relay_states) } @@ -101,6 +103,15 @@ impl Handler for ControllersWs { impl StreamHandler> for ControllersWs { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + let mut pool_conn = match block_on(self.pool.acquire()) { + Ok(conn) => conn, + Err(err) => { + log::error!("Failed to acquire database connection: {:?}", err); + ctx.stop(); + return; + } + }; + let msg = match msg { Err(_) => { ctx.stop(); @@ -119,7 +130,7 @@ impl StreamHandler> for ControllersWs { } Message::Text(text) => match serde_json::from_str(&text) { Ok(action) => { - self.handle_action(ctx, action); + self.handle_action(&mut pool_conn, ctx, action); } Err(e) => { log::error!("Error deserializing action: {:?}", e); diff --git a/src/main.rs b/src/main.rs index 7748bb3..0ed9f92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,11 +28,11 @@ async fn main() -> Result<(), std::io::Error> { let pool = emgauwa_common::db::init(&settings.database).await?; - let mut tx = pool.begin().await.map_err(EmgauwaError::from)?; - DbController::all_inactive(&mut tx) + let mut conn = pool.acquire().await.map_err(EmgauwaError::from)?; + DbController::all_inactive(&mut conn) .await .map_err(EmgauwaError::from)?; - tx.commit().await.map_err(EmgauwaError::from)?; + conn.close().await.map_err(EmgauwaError::from)?; let app_state_arbiter = Arbiter::with_tokio_rt(|| { tokio::runtime::Builder::new_multi_thread()