From 6536ff07927ff732a306ad68a2fb45ae4c56768b Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Tue, 28 Nov 2023 20:20:12 +0100 Subject: [PATCH] Add connected controllers hashmap for controller-ws --- Cargo.lock | Bin 74753 -> 65712 bytes emgauwa-controller/Cargo.toml | 2 +- emgauwa-controller/src/main.rs | 42 ++--- emgauwa-core/src/main.rs | 9 +- emgauwa-lib/Cargo.toml | 2 +- emgauwa-lib/src/constants.rs | 4 + emgauwa-lib/src/db/controllers.rs | 16 +- emgauwa-lib/src/db/relays.rs | 29 ++-- emgauwa-lib/src/handlers/v1/controllers.rs | 26 +++ emgauwa-lib/src/handlers/v1/mod.rs | 1 + emgauwa-lib/src/handlers/v1/relays.rs | 135 --------------- emgauwa-lib/src/handlers/v1/schedules.rs | 14 +- emgauwa-lib/src/handlers/v1/ws/controllers.rs | 155 +++++++++++------- emgauwa-lib/src/handlers/v1/ws/mod.rs | 30 ++++ emgauwa-lib/src/models/mod.rs | 31 +++- emgauwa-lib/src/types/controller_uid.rs | 2 +- emgauwa-lib/src/types/mod.rs | 7 + 17 files changed, 245 insertions(+), 260 deletions(-) create mode 100644 emgauwa-lib/src/handlers/v1/controllers.rs diff --git a/Cargo.lock b/Cargo.lock index 7313ed16d450275aacd3bc5f191c9584d6f73d69..d8a4a3f523e35cf57b4b3a27911714d818110fdd 100644 GIT binary patch delta 352 zcmV-m0iXVX$ON!~1h8}qvw91q1hXX&Zw#_1*#WbiDzFB#t1=x4v$s4k8j}ZE6{GuE z3$t}+q6V`rZ>7qcDjp%1fv|AYaz!vO&t1D9|E0S34D1pxsAm*@!rz_(uw0iqU-DT#3a delta 4836 zcma)AYm8h~9Z%`fcH7biZD-r=QkSZbn$B?E50UN?BNBzQ+nNvz0_RzVZfDBwY@soR zhZrR>k#eF567hlf!Klzo(4xfHez8UqMF}A>(Nq!>`Cw8L6TTq+?w#GbyA<%#&fd9m z&pH49@AbdWKRopCxuNHK>sF1EmTbv>Z{4#)3!isv>aJfM?s1(?nl86bx(P$A0iNbAC&^*B(zZv+ZtY z{D$$}tVPTDtEVO-*9PlGYbjQbN(YuH)yc8Ii6honqeUd11+T2giAh0tFj6ZXNU*7F zzP)kN|FK2hqAhZXy*FojGp?5opV+;(oZP>)9NE03JbuOYtLlxX2U3{pb!V3BoVIj1 zvuICH2N!#8_DKEjB^!`kweLB;=h7D_+e?xn%ya?5QW{LtWOho`&VO*b!j zAl5p0^5A5qUS%0|%uM^ptT4i6lTuJBg&c4iN(~h;YbUDv1Xac{ZiP_VYpu%EhQ@=7 z+dWQOcE03;JzQM8LeSt+ySwPbbs1R8%>T$&|F=7b>pPYl4u#_LH=*ty#&pyoVw}I1T&gx z2AxPYnjk138&Z%maV3Q%IT0=l1(q|_EK9;5Br#+V7ylVnHm_-Hy_6zv2 z-Y$l+IEG)G%#R#ASmjF-wKW!ZGGllEMXmC^%Y9eRQP%$k<<3tm}1; zwnMYVXL)GEO;SnV0TPU(kQL1m;7FZ{B2hRNjige1VzdfqyE4RbSc40!TG)Y|^^@=3 zb!yeN1<_c%e_Fb04oxV>fft@k3=lMh7&PO71dOl*VKeiAO0F4Mmn+xkDLt{Zl&eJstCeZ$xEH+^{5@D)pJx7@kb zO+c}$+`K3zv&W?KG3ZIvXVoNob`A z4q{MF4ww-yG_jdwLE%Hd9+!^7cO!U&6hvhZ$_K1>u(5H$=I6Z>5oHYAUw!4#bTr&E zGeFJ2EpXnQYc4AOsr|c$s;!m7H>1SdaGXv_>VlUL_u!*}b8~o_4aUO$q>G4ETDc7M zO<)4`PAUTrNWN0{O-s>Rho$$}NhPi$&YB+HFcA1H^^U%4uX@!@vY8l*md(qgX&8Gr%4Nt_tF?utZcy-lYr} zTSSja^d#`8aNGcU4Xvxn*?V7I1E=Sv#b|l?xvka3 z{%?*P7+SwX^k4h5n;8QWJa4MlnxFqEfx+bAH$+%M~#1$T;SkL?B;piT&k62Z`ENN1od>Xf+k z-cvLy0O94-)!Y03>FpURuUxZb__&*%OHJWxVT?nl0 zZe6)AKJmn5TLwvPF#+oIH7Pu@CWA8tr!C|Vb>x`LKtiNao^?_T8?7R1h@gaqxN~kW zFF~x4Q7dI?6!OTr3E%GJX?J)QIfJ)Yy|wqtqSI|3>Km&bLqf`{&uy&HwB_q>ZWyX! zW})W}+>jhL$CY+4UYapNCfE=Uf)E#D7Whaqss(yrTENDw$}n&NJ}VZ>)hkBIKcCvX z9fRY{OlOe!7t^Nv_WPsN5X)O;0nO##&s^+%rl*7~w@?$CtIuyU0HLKl*| z6pX|EjCC9Wi+oV$G%Ap0IdY`+*?0^xaCL#N1mXgcS$XRV!>jIZcigdd+5E%N{6|%C(!WDi5Byy#L*ouED@wk80&>kByYS{Cd?p=v&@8H{9QK?&;MF)BH=nzj;M} z{U4v&II_TxW$IGyyW`*_NX8OoPZlHGGAaS42wi>F#3KqsiQ`j{9`s?*5+S5!ifBh+ zw2;6$IsM@LmsT_ev%<19>WDs>(P{WbmY{$^_y(5)QWG;-F~B?n zP~o^mXjgU+&-T2&Yh`)x*2a*)_);#g1N*RP9~ksNWt(Y69-{_qHpUW2h9XSGY~YF* z`Mq}03d)Ba(L)Z8xY}jQ2nbePrf;OqMUhbw7m8D`u#KifRqnX7>}|Mde#-KE5`8!BlTd6(aplmFUMu6t%`(gmm} z`p*&IaKwnnhL`~%#!$y>%IGSls{ncCB6FV*O|r@!l|m5HUR4O$@vggASmp*zSz=5Z zfwT^|(UfS2ta6d;G+vNMp{Pp+6wG+H1EEWJae@~M)N+LDjjxZEO>eI1U-`z = settings - .relays - .iter() - .map(|relay| { - futures::executor::block_on(async { - match DbRelay::get_by_controller_and_num( - &mut conn, - &db_controller, - relay.number.unwrap(), - ) - .await - .expect("Failed to get relay from database") - { - None => create_this_relay(&mut conn, &db_controller, relay).await, - Some(relay) => relay, - } - }) - }) - .collect(); + for relay in &settings.relays { + if DbRelay::get_by_controller_and_num(&mut conn, &db_controller, relay.number.unwrap()) + .await + .expect("Failed to get relay from database") + .is_none() + { + create_this_relay(&mut conn, &db_controller, relay).await; + } + } let db_controller = db_controller - .update(&mut conn, &db_controller.name, db_relays.len() as i64) + .update(&mut conn, &db_controller.name, settings.relays.len() as i64) .await .unwrap(); - let relays = convert_db_list(&mut conn, db_relays).expect("Failed to convert relays"); - - let this = models::Controller { - controller: db_controller, - relays, - }; + let this = Controller::from_db_model(&mut conn, db_controller) + .expect("Failed to convert database models"); let url = format!( "ws://{}:{}/api/v1/ws/controllers", diff --git a/emgauwa-core/src/main.rs b/emgauwa-core/src/main.rs index f41b27c..e2e797e 100644 --- a/emgauwa-core/src/main.rs +++ b/emgauwa-core/src/main.rs @@ -1,12 +1,15 @@ use actix_cors::Cors; +use std::collections::HashMap; use std::net::TcpListener; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use crate::utils::drop_privileges; use actix_web::middleware::TrailingSlash; use actix_web::{middleware, web, App, HttpServer}; use emgauwa_lib::db::DbController; use emgauwa_lib::handlers; +use emgauwa_lib::types::ConnectedControllersType; use log::{trace, LevelFilter}; use simple_logger::SimpleLogger; @@ -41,6 +44,8 @@ async fn main() -> std::io::Result<()> { .expect("Error setting all controllers inactive"); } + let connected_controllers: ConnectedControllersType = Arc::new(Mutex::new(HashMap::new())); + log::info!("Starting server on {}:{}", settings.host, settings.port); HttpServer::new(move || { let cors = Cors::default().allow_any_method().allow_any_header(); @@ -59,6 +64,8 @@ async fn main() -> std::io::Result<()> { .wrap(middleware::NormalizePath::new(TrailingSlash::Trim)) .app_data(web::JsonConfig::default().error_handler(handlers::json_error_handler)) .app_data(web::Data::new(pool.clone())) + .app_data(web::Data::new(connected_controllers.clone())) + .service(handlers::v1::controllers::index) .service(handlers::v1::relays::index) .service(handlers::v1::schedules::index) .service(handlers::v1::schedules::tagged) @@ -67,7 +74,7 @@ async fn main() -> std::io::Result<()> { .service(handlers::v1::schedules::add_list) .service(handlers::v1::schedules::update) .service(handlers::v1::schedules::delete) - .service(handlers::v1::ws::controllers::index) + .service(handlers::v1::ws::ws_controllers) }) .listen(listener)? .run() diff --git a/emgauwa-lib/Cargo.toml b/emgauwa-lib/Cargo.toml index f79008e..c3bac8f 100644 --- a/emgauwa-lib/Cargo.toml +++ b/emgauwa-lib/Cargo.toml @@ -18,7 +18,7 @@ config = "0.13" chrono = { version = "0.4", features = ["serde"] } -sqlx = { version = "0.7", features = ["sqlite", "runtime-async-std", "macros", "chrono"] } +sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] } libsqlite3-sys = { version = "*", features = ["bundled"] } log = "0.4" diff --git a/emgauwa-lib/src/constants.rs b/emgauwa-lib/src/constants.rs index 649705b..e6d9657 100644 --- a/emgauwa-lib/src/constants.rs +++ b/emgauwa-lib/src/constants.rs @@ -1 +1,5 @@ +use std::time::Duration; + pub const DEFAULT_PORT: u16 = 4419; +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15); diff --git a/emgauwa-lib/src/db/controllers.rs b/emgauwa-lib/src/db/controllers.rs index bc35dda..38325d5 100644 --- a/emgauwa-lib/src/db/controllers.rs +++ b/emgauwa-lib/src/db/controllers.rs @@ -5,7 +5,7 @@ use sqlx::pool::PoolConnection; use sqlx::Sqlite; use crate::db::errors::DatabaseError; -use crate::db::DbTag; +use crate::db::{DbRelay, DbTag}; use crate::types::ControllerUid; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -144,6 +144,20 @@ impl DbController { .ok_or(DatabaseError::UpdateGetError) } + pub async fn get_relays( + &self, + conn: &mut PoolConnection, + ) -> Result, DatabaseError> { + sqlx::query_as!( + DbRelay, + "SELECT * FROM relays WHERE controller_id = ?", + self.id + ) + .fetch_all(conn.deref_mut()) + .await + .map_err(DatabaseError::from) + } + pub async fn all_inactive(conn: &mut PoolConnection) -> Result<(), DatabaseError> { sqlx::query!("UPDATE controllers SET active = 0") .execute(conn.deref_mut()) diff --git a/emgauwa-lib/src/db/relays.rs b/emgauwa-lib/src/db/relays.rs index 0142ebf..310d020 100644 --- a/emgauwa-lib/src/db/relays.rs +++ b/emgauwa-lib/src/db/relays.rs @@ -20,20 +20,20 @@ pub struct DbRelay { impl DbRelay { pub async fn get_all(conn: &mut PoolConnection) -> Result, DatabaseError> { - Ok(sqlx::query_as!(DbRelay, "SELECT * FROM relays") + sqlx::query_as!(DbRelay, "SELECT * FROM relays") .fetch_all(conn.deref_mut()) - .await?) + .await + .map_err(DatabaseError::from) } pub async fn get( conn: &mut PoolConnection, id: i64, ) -> Result, DatabaseError> { - Ok( - sqlx::query_as!(DbRelay, "SELECT * FROM relays WHERE id = ?", id) - .fetch_optional(conn.deref_mut()) - .await?, - ) + sqlx::query_as!(DbRelay, "SELECT * FROM relays WHERE id = ?", id) + .fetch_optional(conn.deref_mut()) + .await + .map_err(DatabaseError::from) } pub async fn get_by_controller_and_num( @@ -41,14 +41,15 @@ impl DbRelay { controller: &DbController, number: i64, ) -> Result, DatabaseError> { - Ok(sqlx::query_as!( + sqlx::query_as!( DbRelay, "SELECT * FROM relays WHERE controller_id = ? AND number = ?", controller.id, number ) .fetch_optional(conn.deref_mut()) - .await?) + .await + .map_err(DatabaseError::from) } pub async fn get_by_controller_and_num_or_create( @@ -67,9 +68,10 @@ impl DbRelay { conn: &mut PoolConnection, tag: &DbTag, ) -> Result, DatabaseError> { - Ok(sqlx::query_as!(DbRelay, "SELECT schedule.* FROM relays AS schedule INNER JOIN junction_tag ON junction_tag.schedule_id = schedule.id WHERE junction_tag.tag_id = ?", tag.id) + sqlx::query_as!(DbRelay, "SELECT schedule.* FROM relays AS schedule INNER JOIN junction_tag ON junction_tag.schedule_id = schedule.id WHERE junction_tag.tag_id = ?", tag.id) .fetch_all(conn.deref_mut()) - .await?) + .await + .map_err(DatabaseError::from) } pub async fn create( @@ -135,9 +137,10 @@ impl DbRelay { &self, conn: &mut PoolConnection, ) -> Result, DatabaseError> { - Ok(sqlx::query_scalar!("SELECT tag FROM tags INNER JOIN junction_tag ON junction_tag.tag_id = tags.id WHERE junction_tag.relay_id = ?", self.id) + sqlx::query_scalar!("SELECT tag FROM tags INNER JOIN junction_tag ON junction_tag.tag_id = tags.id WHERE junction_tag.relay_id = ?", self.id) .fetch_all(conn.deref_mut()) - .await?) + .await + .map_err(DatabaseError::from) } pub async fn set_tags( diff --git a/emgauwa-lib/src/handlers/v1/controllers.rs b/emgauwa-lib/src/handlers/v1/controllers.rs new file mode 100644 index 0000000..0333ba6 --- /dev/null +++ b/emgauwa-lib/src/handlers/v1/controllers.rs @@ -0,0 +1,26 @@ +use actix_web::{get, web, HttpResponse}; + +use sqlx::{Pool, Sqlite}; + +use crate::db::DbController; + +use crate::handlers::errors::ApiError; +use crate::models::{convert_db_list, Controller}; +use crate::types::ConnectedControllersType; + +#[get("/api/v1/controllers")] +pub async fn index( + pool: web::Data>, + connected_controllers: web::Data, +) -> Result { + let mut pool_conn = pool.acquire().await?; + + let db_controllers = DbController::get_all(&mut pool_conn).await?; + + let controllers: Vec = convert_db_list(&mut pool_conn, db_controllers)?; + + let data = connected_controllers.lock().unwrap(); + println!("{:?}", *data); + + Ok(HttpResponse::Ok().json(controllers)) +} diff --git a/emgauwa-lib/src/handlers/v1/mod.rs b/emgauwa-lib/src/handlers/v1/mod.rs index 711bef9..e221ca3 100644 --- a/emgauwa-lib/src/handlers/v1/mod.rs +++ b/emgauwa-lib/src/handlers/v1/mod.rs @@ -1,3 +1,4 @@ +pub mod controllers; pub mod relays; pub mod schedules; pub mod ws; diff --git a/emgauwa-lib/src/handlers/v1/relays.rs b/emgauwa-lib/src/handlers/v1/relays.rs index 457d66b..2319ce1 100644 --- a/emgauwa-lib/src/handlers/v1/relays.rs +++ b/emgauwa-lib/src/handlers/v1/relays.rs @@ -24,138 +24,3 @@ pub async fn index(pool: web::Data>) -> Result>, -// path: web::Path<(String,)>, -//) -> Result { -// let mut pool_conn = pool.acquire().await?; -// -// let (tag,) = path.into_inner(); -// let tag_db = Tag::get_by_tag(&mut pool_conn, &tag).await?; -// -// let relays = Relay::get_by_tag(&mut pool_conn, &tag_db).await?; -// -// let return_relays: Vec = -// relays.into_iter().map(|s| ReturnRelay::from_relay(s, &mut pool_conn)).collect(); -// -// Ok(HttpResponse::Ok().json(return_relays)) -//} -// -//#[get("/api/v1/tags/{relay_id}")] -//pub async fn show( -// pool: web::Data>, -// path: web::Path<(String,)>, -//) -> Result { -// let mut pool_conn = pool.acquire().await?; -// -// let (relay_uid,) = path.into_inner(); -// let uid = RelayUid::try_from(relay_uid.as_str()).or(Err(ApiError::BadUid))?; -// -// let relay = Relay::get_by_uid(&mut pool_conn, &uid).await?; -// -// let return_relay = ReturnRelay::from_relay(relay, &mut pool_conn); -// Ok(HttpResponse::Ok().json(return_relay)) -//} -// -//#[post("/api/v1/tags")] -//pub async fn add( -// pool: web::Data>, -// data: web::Json, -//) -> Result { -// let mut pool_conn = pool.acquire().await?; -// -// let new_relay = Relay::create(&mut pool_conn, &data.name, &data.periods).await?; -// -// new_relay -// .set_tags(&mut pool_conn, data.tags.as_slice()) -// .await?; -// -// let return_relay = ReturnRelay::from_relay(new_relay, &mut pool_conn); -// Ok(HttpResponse::Created().json(return_relay)) -//} -// -//async fn add_list_single( -// conn: &mut PoolConnection, -// request_relay: &RequestRelay, -//) -> Result { -// let new_relay = -// Relay::create(conn, &request_relay.name, &request_relay.periods).await?; -// -// new_relay -// .set_tags(conn, request_relay.tags.as_slice()) -// .await?; -// -// Ok(new_relay) -//} -// -//#[post("/api/v1/tags/list")] -//pub async fn add_list( -// pool: web::Data>, -// data: web::Json>, -//) -> Result { -// let mut pool_conn = pool.acquire().await?; -// -// let result: Vec> = data -// .as_slice() -// .iter() -// .map(|request_relay| { -// futures::executor::block_on(add_list_single(&mut pool_conn, request_relay)) -// }) -// .collect(); -// -// let mut return_relays: Vec = Vec::new(); -// for relay in result { -// match relay { -// Ok(relay) => return_relays.push(ReturnRelay::from_relay(relay, &mut pool_conn)), -// Err(e) => return Ok(HttpResponse::from(e)), -// } -// } -// Ok(HttpResponse::Created().json(return_relays)) -//} -// -//#[put("/api/v1/tags/{relay_id}")] -//pub async fn update( -// pool: web::Data>, -// path: web::Path<(String,)>, -// data: web::Json, -//) -> Result { -// let mut pool_conn = pool.acquire().await?; -// -// let (relay_uid,) = path.into_inner(); -// let uid = RelayUid::try_from(relay_uid.as_str()).or(Err(ApiError::BadUid))?; -// -// let relay = Relay::get_by_uid(&mut pool_conn, &uid).await?; -// -// let relay = relay -// .update(&mut pool_conn, data.name.as_str(), &data.periods) -// .await?; -// -// relay -// .set_tags(&mut pool_conn, data.tags.as_slice()) -// .await?; -// -// let return_relay = ReturnRelay::from_relay(relay, &mut pool_conn); -// Ok(HttpResponse::Ok().json(return_relay)) -//} -// -//#[delete("/api/v1/tags/{relay_id}")] -//pub async fn delete( -// pool: web::Data>, -// path: web::Path<(String,)>, -//) -> Result { -// let mut pool_conn = pool.acquire().await?; -// -// let (relay_uid,) = path.into_inner(); -// let uid = RelayUid::try_from(relay_uid.as_str()).or(Err(ApiError::BadUid))?; -// -// match uid { -// RelayUid::Off => Err(ApiError::ProtectedRelay), -// RelayUid::On => Err(ApiError::ProtectedRelay), -// RelayUid::Any(_) => { -// Relay::delete_by_uid(&mut pool_conn, uid).await?; -// Ok(HttpResponse::Ok().json("relay got deleted")) -// } -// } -//} diff --git a/emgauwa-lib/src/handlers/v1/schedules.rs b/emgauwa-lib/src/handlers/v1/schedules.rs index b0f5afd..bf754a0 100644 --- a/emgauwa-lib/src/handlers/v1/schedules.rs +++ b/emgauwa-lib/src/handlers/v1/schedules.rs @@ -104,16 +104,10 @@ pub async fn add_list( let mut pool_conn = pool.acquire().await?; let mut db_schedules: Vec = Vec::new(); - data.iter().try_for_each(|s| { - let new_s = futures::executor::block_on(add_list_single(&mut pool_conn, s)); - match new_s { - Ok(new_s) => { - db_schedules.push(new_s); - Ok(()) - } - Err(e) => Err(e), - } - })?; + for s in data.iter() { + let new_s = futures::executor::block_on(add_list_single(&mut pool_conn, s))?; + db_schedules.push(new_s); + } let schedules: Vec = convert_db_list(&mut pool_conn, db_schedules)?; Ok(HttpResponse::Created().json(schedules)) diff --git a/emgauwa-lib/src/handlers/v1/ws/controllers.rs b/emgauwa-lib/src/handlers/v1/ws/controllers.rs index bfbceb7..bb5d7c6 100644 --- a/emgauwa-lib/src/handlers/v1/ws/controllers.rs +++ b/emgauwa-lib/src/handlers/v1/ws/controllers.rs @@ -1,14 +1,15 @@ +use crate::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}; use crate::db::errors::DatabaseError; use crate::db::{DbController, DbRelay}; -use crate::handlers::errors::ApiError; -use crate::models::Controller; -use actix::{Actor, StreamHandler}; -use actix_web::{get, web, HttpRequest, HttpResponse}; +use crate::models::{Controller, FromDbModel}; +use crate::types::{ConnectedControllersType, ControllerUid}; +use actix::{Actor, ActorContext, AsyncContext, StreamHandler}; use actix_web_actors::ws; use actix_web_actors::ws::ProtocolError; use serde_derive::{Deserialize, Serialize}; use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; +use std::time::Instant; use ws::Message; #[derive(Debug, Serialize, Deserialize)] @@ -16,48 +17,35 @@ pub enum ControllerWsAction { Register(Controller), } -struct ControllerWs { +pub struct ControllerWs { pub pool: Pool, - pub controller: Option, + pub controller_uid: Option, + pub connected_controllers: ConnectedControllersType, + pub hb: Instant, } impl Actor for ControllerWs { type Context = ws::WebsocketContext; - fn stopped(&mut self, _ctx: &mut Self::Context) { - if let Some(controller) = &self.controller { - let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); - futures::executor::block_on(controller.update_active(&mut pool_conn, false)).unwrap(); - } + fn started(&mut self, ctx: &mut Self::Context) { + self.hb(ctx); } -} -impl StreamHandler> for ControllerWs { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); + fn stopped(&mut self, _ctx: &mut Self::Context) { + if let Some(controller_uid) = &self.controller_uid { + let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); - match msg { - Ok(Message::Ping(msg)) => ctx.pong(&msg), - Ok(Message::Text(text)) => { - let action: ControllerWsAction = serde_json::from_str(&text).unwrap(); - let action_res = - futures::executor::block_on(self.handle_action(&mut pool_conn, action)); - if let Err(e) = action_res { - log::error!("Error handling action: {:?}", e); - ctx.text(serde_json::to_string(&e).unwrap()); - } + let mut data = self.connected_controllers.lock().unwrap(); + if let Some(controller) = data.remove(controller_uid) { + futures::executor::block_on(controller.c.update_active(&mut pool_conn, false)) + .unwrap(); } - _ => {} } - - //let schedules = futures::executor::block_on(DbSchedule::get_all(&mut pool_conn)).unwrap(); - //let schedules_json = serde_json::to_string(&schedules).unwrap(); - //ctx.text(schedules_json); } } impl ControllerWs { - pub async fn handle_action( + pub fn handle_action( &mut self, conn: &mut PoolConnection, action: ControllerWsAction, @@ -65,49 +53,94 @@ impl ControllerWs { match action { ControllerWsAction::Register(controller) => { log::info!("Registering controller: {:?}", controller); - let c = &controller.controller; - let controller_db = - DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count) - .await?; - controller_db.update_active(conn, true).await?; - - println!("Controller: {:?}", controller_db); + let c = &controller.c; + let controller_db = futures::executor::block_on( + DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count), + )?; + futures::executor::block_on(controller_db.update_active(conn, true))?; for relay in &controller.relays { - let r = &relay.relay; - let relay_db = DbRelay::get_by_controller_and_num_or_create( + let r = &relay.r; + futures::executor::block_on(DbRelay::get_by_controller_and_num_or_create( conn, &controller_db, r.number, &r.name, - ) - .await?; - println!("Controller relay: {:?}", relay_db); + ))?; } - self.controller = Some(controller_db); + let controller = Controller::from_db_model(conn, controller_db)?; + + let controller_uid = &controller.c.uid; + self.controller_uid = Some(controller_uid.clone()); + + let mut data = self.connected_controllers.lock().unwrap(); + data.insert(controller_uid.clone(), controller); Ok(()) } } } + + /// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL). + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > HEARTBEAT_TIMEOUT { + log::warn!("Websocket Controller heartbeat failed, disconnecting!"); + ctx.stop(); + // don't try to send a ping + return; + } + + ctx.ping(&[]); + }); + } } -#[get("/api/v1/ws/controllers")] -pub async fn index( - pool: web::Data>, - req: HttpRequest, - stream: web::Payload, -) -> Result { - let resp = ws::start( - ControllerWs { - pool: pool.get_ref().clone(), - controller: None, - }, - &req, - stream, - ) - .map_err(|_| ApiError::InternalError(String::from("error starting websocket"))); - println!("{:?}", resp); - resp +impl StreamHandler> for ControllerWs { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap(); + + let msg = match msg { + Err(_) => { + ctx.stop(); + return; + } + Ok(msg) => msg, + }; + + match msg { + Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg) + } + Message::Pong(_) => { + self.hb = Instant::now(); + } + Message::Text(text) => { + let action: ControllerWsAction = serde_json::from_str(&text).unwrap(); + let action_res = self.handle_action(&mut pool_conn, action); + if let Err(e) = action_res { + log::error!("Error handling action: {:?}", e); + ctx.text(serde_json::to_string(&e).unwrap()); + } + } + Message::Binary(_) => log::warn!("Received unexpected binary in controller ws"), + Message::Close(reason) => { + ctx.close(reason); + ctx.stop(); + } + Message::Continuation(_) => { + ctx.stop(); + } + Message::Nop => (), + } + + //let schedules = futures::executor::block_on(DbSchedule::get_all(&mut pool_conn)).unwrap(); + //let schedules_json = serde_json::to_string(&schedules).unwrap(); + //ctx.text(schedules_json); + } } diff --git a/emgauwa-lib/src/handlers/v1/ws/mod.rs b/emgauwa-lib/src/handlers/v1/ws/mod.rs index f916674..4022c41 100644 --- a/emgauwa-lib/src/handlers/v1/ws/mod.rs +++ b/emgauwa-lib/src/handlers/v1/ws/mod.rs @@ -1 +1,31 @@ +use crate::handlers::errors::ApiError; +use crate::handlers::v1::ws::controllers::ControllerWs; +use crate::types::ConnectedControllersType; +use actix_web::{get, web, HttpRequest, HttpResponse}; +use actix_web_actors::ws; +use sqlx::{Pool, Sqlite}; +use std::time::Instant; + pub mod controllers; + +#[get("/api/v1/ws/controllers")] +pub async fn ws_controllers( + pool: web::Data>, + connected_controllers: web::Data, + req: HttpRequest, + stream: web::Payload, +) -> Result { + let resp = ws::start( + ControllerWs { + pool: pool.get_ref().clone(), + controller_uid: None, + connected_controllers: connected_controllers.get_ref().clone(), + hb: Instant::now(), + }, + &req, + stream, + ) + .map_err(|_| ApiError::InternalError(String::from("error starting websocket"))); + println!("{:?}", resp); + resp +} diff --git a/emgauwa-lib/src/models/mod.rs b/emgauwa-lib/src/models/mod.rs index 2a930e3..5e91ca2 100644 --- a/emgauwa-lib/src/models/mod.rs +++ b/emgauwa-lib/src/models/mod.rs @@ -1,6 +1,5 @@ -use crate::db; use crate::db::errors::DatabaseError; -use crate::db::{DbRelay, DbSchedule}; +use crate::db::{DbController, DbRelay, DbSchedule}; use crate::types::ControllerUid; use futures::executor; use serde_derive::{Deserialize, Serialize}; @@ -21,15 +20,15 @@ pub trait FromDbModel { #[derive(Serialize, Deserialize, Debug)] pub struct Schedule { #[serde(flatten)] - pub schedule: DbSchedule, + pub s: DbSchedule, pub tags: Vec, } #[derive(Serialize, Deserialize, Debug)] pub struct Relay { #[serde(flatten)] - pub relay: DbRelay, - pub controller: db::DbController, + pub r: DbRelay, + pub controller: DbController, pub controller_id: ControllerUid, pub tags: Vec, } @@ -37,7 +36,7 @@ pub struct Relay { #[derive(Serialize, Deserialize, Debug)] pub struct Controller { #[serde(flatten)] - pub controller: db::DbController, + pub c: DbController, pub relays: Vec, } @@ -51,7 +50,7 @@ impl FromDbModel for Schedule { let schedule = db_model.clone(); let tags = executor::block_on(schedule.get_tags(conn))?; - Ok(Schedule { schedule, tags }) + Ok(Schedule { s: schedule, tags }) } } @@ -68,7 +67,7 @@ impl FromDbModel for Relay { let tags = executor::block_on(relay.get_tags(conn))?; Ok(Relay { - relay, + r: relay, controller, controller_id, tags, @@ -76,6 +75,22 @@ impl FromDbModel for Relay { } } +impl FromDbModel for Controller { + type DbModel = DbController; + + fn from_db_model( + conn: &mut PoolConnection, + db_model: Self::DbModel, + ) -> Result { + let relays_db = executor::block_on(db_model.get_relays(conn))?; + let relays = convert_db_list(conn, relays_db)?; + Ok(Controller { + c: db_model, + relays, + }) + } +} + pub fn convert_db_list( conn: &mut PoolConnection, db_models: Vec, diff --git a/emgauwa-lib/src/types/controller_uid.rs b/emgauwa-lib/src/types/controller_uid.rs index 06086d1..a4e709a 100644 --- a/emgauwa-lib/src/types/controller_uid.rs +++ b/emgauwa-lib/src/types/controller_uid.rs @@ -7,7 +7,7 @@ use sqlx::{Decode, Encode, Sqlite, Type}; use std::str::FromStr; use uuid::Uuid; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct ControllerUid(Uuid); impl Default for ControllerUid { diff --git a/emgauwa-lib/src/types/mod.rs b/emgauwa-lib/src/types/mod.rs index 0af949c..e163fae 100644 --- a/emgauwa-lib/src/types/mod.rs +++ b/emgauwa-lib/src/types/mod.rs @@ -1,5 +1,12 @@ mod controller_uid; mod schedule_uid; +use crate::models::Controller; + pub use controller_uid::ControllerUid; pub use schedule_uid::ScheduleUid; + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +pub type ConnectedControllersType = Arc>>;