diff --git a/emgauwa-controller/src/main.rs b/emgauwa-controller/src/main.rs index c9327dd..b226b50 100644 --- a/emgauwa-controller/src/main.rs +++ b/emgauwa-controller/src/main.rs @@ -10,10 +10,12 @@ use tokio_tungstenite::tungstenite::Error; use emgauwa_lib::db; use emgauwa_lib::db::Controller; use emgauwa_lib::db::types::ControllerUid; +use crate::relay_loop::run_relay_loop; use crate::settings::Settings; mod settings; mod driver; +mod relay_loop; fn create_this_controller(conn: &mut PoolConnection, settings: &Settings) -> Controller { futures::executor::block_on(async { @@ -43,8 +45,9 @@ async fn main() { let this_json = serde_json::to_string(&this).unwrap(); + println!("{:?}", settings.relays); println!("{:?}", this); - println!("{:?}", this_json); + println!("{}", this_json); let url = format!( "ws://{}:{}/api/v1/ws/controllers", @@ -61,10 +64,12 @@ async fn main() { write.send(Message::text(this_json)).await.unwrap(); let ws_to_stdout = read.for_each(handle_message); + let stdin_to_ws = stdin_rx.map(Ok).forward(write); - ws_to_stdout.await; - //pin_mut!(stdin_to_ws, ws_to_stdout); - //future::select(stdin_to_ws, ws_to_stdout).await; + tokio::spawn(run_relay_loop(settings)); + + pin_mut!(stdin_to_ws, ws_to_stdout); + future::select(stdin_to_ws, ws_to_stdout).await; } // Our helper method which will read data from stdin and send it along the diff --git a/emgauwa-controller/src/relay_loop.rs b/emgauwa-controller/src/relay_loop.rs new file mode 100644 index 0000000..22890ff --- /dev/null +++ b/emgauwa-controller/src/relay_loop.rs @@ -0,0 +1,9 @@ +use chrono::Local; +use crate::settings::Settings; + +#[allow(unused_variables)] +pub async fn run_relay_loop(settings: Settings) { + let next_timestamp = Local::now().naive_local(); + loop { + } +} \ No newline at end of file diff --git a/emgauwa-core/src/main.rs b/emgauwa-core/src/main.rs index bfd365b..62a4681 100644 --- a/emgauwa-core/src/main.rs +++ b/emgauwa-core/src/main.rs @@ -46,6 +46,7 @@ async fn main() -> std::io::Result<()> { .wrap(middleware::NormalizePath::new(TrailingSlash::Trim)) .app_data(web::JsonConfig::default().error_handler(handlers::json_error_handler)) .app_data(web::Data::new(pool.clone())) + .service(handlers::v1::relays::index) .service(handlers::v1::schedules::index) .service(handlers::v1::schedules::tagged) .service(handlers::v1::schedules::show) diff --git a/emgauwa-lib/src/db/controllers.rs b/emgauwa-lib/src/db/controllers.rs index b427c63..c1d683c 100644 --- a/emgauwa-lib/src/db/controllers.rs +++ b/emgauwa-lib/src/db/controllers.rs @@ -1,12 +1,11 @@ -use serde_derive::{Deserialize, Serialize}; +use serde_derive::Serialize; use std::ops::DerefMut; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use crate::db::errors::DatabaseError; -use crate::db::model_utils::Period; -use crate::db::tag::Tag; +use crate::db::Tag; use crate::db::types::ControllerUid; #[derive(Debug, Serialize, Clone)] @@ -18,9 +17,6 @@ pub struct Controller { pub active: bool, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] -pub struct Periods(pub Vec); - impl Controller { pub async fn get_all( conn: &mut PoolConnection, @@ -117,6 +113,6 @@ impl Controller { .execute(conn.deref_mut()) .await?; - Controller::get_by_uid(conn, &self.uid).await + Self::get(conn, self.id).await } } diff --git a/emgauwa-lib/src/db/mod.rs b/emgauwa-lib/src/db/mod.rs index 054efa7..192822b 100644 --- a/emgauwa-lib/src/db/mod.rs +++ b/emgauwa-lib/src/db/mod.rs @@ -10,14 +10,16 @@ use crate::db::types::ScheduleUid; pub mod errors; mod model_utils; -mod models; mod schedules; -pub mod tag; +mod tag; pub mod types; mod controllers; +mod relays; pub use controllers::Controller; +pub use relays::Relay; pub use schedules::{Periods, Schedule}; +pub use tag::Tag; static MIGRATOR: Migrator = sqlx::migrate!("../migrations"); // defaults to "./migrations" diff --git a/emgauwa-lib/src/db/models.rs b/emgauwa-lib/src/db/models.rs deleted file mode 100644 index 6176d2f..0000000 --- a/emgauwa-lib/src/db/models.rs +++ /dev/null @@ -1,8 +0,0 @@ -use serde::Serialize; - -#[derive(Debug, Serialize)] -pub struct Relay { - #[serde(skip)] - pub id: i64, - // TODO -} diff --git a/emgauwa-lib/src/db/relays.rs b/emgauwa-lib/src/db/relays.rs new file mode 100644 index 0000000..8709b1c --- /dev/null +++ b/emgauwa-lib/src/db/relays.rs @@ -0,0 +1,130 @@ +use serde_derive::Serialize; +use std::ops::DerefMut; + +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; +use crate::db::Controller; + +use crate::db::errors::DatabaseError; +use crate::db::Tag; + +#[derive(Debug, Serialize, Clone)] +#[derive(sqlx::FromRow)] +pub struct Relay { + #[serde(skip)] + pub id: i64, + pub name: String, + pub number: i64, + #[serde(skip)] + pub controller_id: i64, +} + +impl Relay { + pub async fn get_all( + conn: &mut PoolConnection, + ) -> Result, DatabaseError> { + Ok(sqlx::query_as!(Relay, "SELECT * FROM relays") + .fetch_all(conn.deref_mut()) + .await?) + } + + pub async fn get( + conn: &mut PoolConnection, + id: i64, + ) -> Result { + sqlx::query_as!( + Relay, + "SELECT * FROM relays WHERE id = ?", + id + ) + .fetch_optional(conn.deref_mut()) + .await + .map(|s| s.ok_or(DatabaseError::NotFound))? + } + + pub async fn get_by_tag( + conn: &mut PoolConnection, + tag: &Tag, + ) -> Result, DatabaseError> { + Ok(sqlx::query_as!(Relay, "SELECT schedule.* FROM relays AS schedule INNER JOIN junction_tag ON junction_tag.schedule_id = schedule.id WHERE junction_tag.tag_id = ?", tag.id) + .fetch_all(conn.deref_mut()) + .await?) + } + + pub async fn create( + conn: &mut PoolConnection, + new_name: &str, + new_number: i64, + new_controller: &Controller, + ) -> Result { + sqlx::query_as!( + Relay, + "INSERT INTO relays (name, number, controller_id) VALUES (?, ?, ?) RETURNING *", + new_name, + new_number, + new_controller.id, + ) + .fetch_optional(conn.deref_mut()) + .await? + .ok_or(DatabaseError::InsertGetError) + } + + pub async fn delete( + &self, + conn: &mut PoolConnection, + ) -> Result<(), DatabaseError> { + sqlx::query!("DELETE FROM relays WHERE id = ?", self.id) + .execute(conn.deref_mut()) + .await + .map(|res| match res.rows_affected() { + 0 => Err(DatabaseError::DeleteError), + _ => Ok(()), + })? + } + + pub async fn update( + &self, + conn: &mut PoolConnection, + new_name: &str, + new_number: i64, + new_controller: &Controller, + ) -> Result { + sqlx::query!( + "UPDATE relays SET name = ?, number = ?, controller_id = ? WHERE id = ?", + new_name, + new_number, + new_controller.id, + self.id, + ) + .execute(conn.deref_mut()) + .await?; + + Relay::get(conn, self.id).await + } + + pub async fn get_controller(&self, conn: &mut PoolConnection) -> Result { + Controller::get(conn, self.controller_id).await + } + + pub async fn get_tags(&self, conn: &mut PoolConnection) -> Result, DatabaseError> { + Ok(sqlx::query_scalar!("SELECT tag FROM tags INNER JOIN junction_tag ON junction_tag.tag_id = tags.id WHERE junction_tag.relay_id = ?", self.id) + .fetch_all(conn.deref_mut()) + .await?) + } + + pub async fn set_tags( + &self, + conn: &mut PoolConnection, + new_tags: &[String], + ) -> Result<(), DatabaseError> { + sqlx::query!("DELETE FROM junction_tag WHERE relay_id = ?", self.id) + .execute(conn.deref_mut()) + .await?; + + for new_tag in new_tags { + let tag: Tag = Tag::get_by_tag_or_create(conn, new_tag).await?; + tag.link_relay(conn, self).await?; + } + Ok(()) + } +} diff --git a/emgauwa-lib/src/db/schedules.rs b/emgauwa-lib/src/db/schedules.rs index b353969..b1c8004 100644 --- a/emgauwa-lib/src/db/schedules.rs +++ b/emgauwa-lib/src/db/schedules.rs @@ -7,7 +7,7 @@ use sqlx::Sqlite; use crate::db::errors::DatabaseError; use crate::db::model_utils::Period; -use crate::db::tag::Tag; +use crate::db::Tag; use crate::db::types::ScheduleUid; #[derive(Debug, Serialize, Clone)] @@ -34,7 +34,7 @@ impl Schedule { pub async fn get( conn: &mut PoolConnection, - id: &i64, + id: i64, ) -> Result { sqlx::query_as!( Schedule, @@ -127,7 +127,7 @@ impl Schedule { .execute(conn.deref_mut()) .await?; - Schedule::get_by_uid(conn, &self.uid).await + Schedule::get(conn, self.id).await } pub async fn get_tags( @@ -149,16 +149,7 @@ impl Schedule { .await?; for new_tag in new_tags { - let tag: Option = - sqlx::query_as!(Tag, "SELECT * FROM tags WHERE tag = ?", new_tag) - .fetch_optional(conn.deref_mut()) - .await?; - - let tag = match tag { - Some(id) => id, - None => Tag::create(conn, new_tag).await?, - }; - + let tag: Tag = Tag::get_by_tag_or_create(conn, new_tag).await?; tag.link_schedule(conn, self).await?; } Ok(()) diff --git a/emgauwa-lib/src/db/tag.rs b/emgauwa-lib/src/db/tag.rs index 5eb192b..08b4aa3 100644 --- a/emgauwa-lib/src/db/tag.rs +++ b/emgauwa-lib/src/db/tag.rs @@ -5,8 +5,7 @@ use sqlx::pool::PoolConnection; use sqlx::Sqlite; use crate::db::errors::DatabaseError; -use crate::db::models::*; -use crate::db::Schedule; +use crate::db::{Relay, Schedule}; #[derive(Debug, Serialize, Clone)] pub struct Tag { @@ -46,6 +45,17 @@ impl Tag { .map(|t| t.ok_or(DatabaseError::NotFound))? } + pub async fn get_by_tag_or_create( + conn: &mut PoolConnection, + target_tag: &str, + ) -> Result { + match Tag::get_by_tag(conn, target_tag).await { + Ok(tag) => Ok(tag), + Err(DatabaseError::NotFound) => Tag::create(conn, target_tag).await, + Err(e) => Err(e), + } + } + pub async fn get_by_tag( conn: &mut PoolConnection, target_tag: &str, @@ -56,7 +66,6 @@ impl Tag { .map(|t| t.ok_or(DatabaseError::NotFound))? } - #[allow(dead_code)] pub async fn link_relay( &self, conn: &mut PoolConnection, diff --git a/emgauwa-lib/src/handlers/v1/mod.rs b/emgauwa-lib/src/handlers/v1/mod.rs index 54b6498..a85414c 100644 --- a/emgauwa-lib/src/handlers/v1/mod.rs +++ b/emgauwa-lib/src/handlers/v1/mod.rs @@ -1,2 +1,3 @@ pub mod schedules; pub mod ws; +pub mod relays; diff --git a/emgauwa-lib/src/handlers/v1/relays.rs b/emgauwa-lib/src/handlers/v1/relays.rs new file mode 100644 index 0000000..98824c6 --- /dev/null +++ b/emgauwa-lib/src/handlers/v1/relays.rs @@ -0,0 +1,163 @@ +use actix_web::{delete, get, post, put, web, HttpResponse}; +use serde::{Deserialize, Serialize}; +use sqlx::pool::PoolConnection; +use sqlx::{Pool, Sqlite}; + +use crate::db::errors::DatabaseError; +use crate::db::Relay; +use crate::db::Tag; +use crate::handlers::errors::ApiError; +use crate::return_models::ReturnRelay; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RequestRelay { + name: String, + tags: Vec, +} + +#[get("/api/v1/relays")] +pub async fn index(pool: web::Data>) -> Result { + let mut pool_conn = pool.acquire().await?; + + let relays = Relay::get_all(&mut pool_conn).await?; + + let return_relays: Vec = + relays.iter().map(|s| ReturnRelay::from_relay_ref(s, &mut pool_conn)).collect(); + + Ok(HttpResponse::Ok().json(return_relays)) +} + +//#[get("/api/v1/tags/tag/{tag}")] +//pub async fn tagged( +// pool: web::Data>, +// path: web::Path<(String,)>, +//) -> Result { +// let mut pool_conn = pool.acquire().await?; +// +// let (tag,) = path.into_inner(); +// let tag_db = Tag::get_by_tag(&mut pool_conn, &tag).await?; +// +// let relays = Relay::get_by_tag(&mut pool_conn, &tag_db).await?; +// +// let return_relays: Vec = +// relays.iter().map(|s| ReturnRelay::from_relay_ref(s, &mut pool_conn)).collect(); +// +// Ok(HttpResponse::Ok().json(return_relays)) +//} +// +//#[get("/api/v1/tags/{relay_id}")] +//pub async fn show( +// pool: web::Data>, +// path: web::Path<(String,)>, +//) -> Result { +// let mut pool_conn = pool.acquire().await?; +// +// let (relay_uid,) = path.into_inner(); +// let uid = RelayUid::try_from(relay_uid.as_str()).or(Err(ApiError::BadUid))?; +// +// let relay = Relay::get_by_uid(&mut pool_conn, &uid).await?; +// +// let return_relay = ReturnRelay::from_relay(relay, &mut pool_conn); +// Ok(HttpResponse::Ok().json(return_relay)) +//} +// +//#[post("/api/v1/tags")] +//pub async fn add( +// pool: web::Data>, +// data: web::Json, +//) -> Result { +// let mut pool_conn = pool.acquire().await?; +// +// let new_relay = Relay::create(&mut pool_conn, &data.name, &data.periods).await?; +// +// new_relay +// .set_tags(&mut pool_conn, data.tags.as_slice()) +// .await?; +// +// let return_relay = ReturnRelay::from_relay(new_relay, &mut pool_conn); +// Ok(HttpResponse::Created().json(return_relay)) +//} +// +//async fn add_list_single( +// conn: &mut PoolConnection, +// request_relay: &RequestRelay, +//) -> Result { +// let new_relay = +// Relay::create(conn, &request_relay.name, &request_relay.periods).await?; +// +// new_relay +// .set_tags(conn, request_relay.tags.as_slice()) +// .await?; +// +// Ok(new_relay) +//} +// +//#[post("/api/v1/tags/list")] +//pub async fn add_list( +// pool: web::Data>, +// data: web::Json>, +//) -> Result { +// let mut pool_conn = pool.acquire().await?; +// +// let result: Vec> = data +// .as_slice() +// .iter() +// .map(|request_relay| { +// futures::executor::block_on(add_list_single(&mut pool_conn, request_relay)) +// }) +// .collect(); +// +// let mut return_relays: Vec = Vec::new(); +// for relay in result { +// match relay { +// Ok(relay) => return_relays.push(ReturnRelay::from_relay(relay, &mut pool_conn)), +// Err(e) => return Ok(HttpResponse::from(e)), +// } +// } +// Ok(HttpResponse::Created().json(return_relays)) +//} +// +//#[put("/api/v1/tags/{relay_id}")] +//pub async fn update( +// pool: web::Data>, +// path: web::Path<(String,)>, +// data: web::Json, +//) -> Result { +// let mut pool_conn = pool.acquire().await?; +// +// let (relay_uid,) = path.into_inner(); +// let uid = RelayUid::try_from(relay_uid.as_str()).or(Err(ApiError::BadUid))?; +// +// let relay = Relay::get_by_uid(&mut pool_conn, &uid).await?; +// +// let relay = relay +// .update(&mut pool_conn, data.name.as_str(), &data.periods) +// .await?; +// +// relay +// .set_tags(&mut pool_conn, data.tags.as_slice()) +// .await?; +// +// let return_relay = ReturnRelay::from_relay(relay, &mut pool_conn); +// Ok(HttpResponse::Ok().json(return_relay)) +//} +// +//#[delete("/api/v1/tags/{relay_id}")] +//pub async fn delete( +// pool: web::Data>, +// path: web::Path<(String,)>, +//) -> Result { +// let mut pool_conn = pool.acquire().await?; +// +// let (relay_uid,) = path.into_inner(); +// let uid = RelayUid::try_from(relay_uid.as_str()).or(Err(ApiError::BadUid))?; +// +// match uid { +// RelayUid::Off => Err(ApiError::ProtectedRelay), +// RelayUid::On => Err(ApiError::ProtectedRelay), +// RelayUid::Any(_) => { +// Relay::delete_by_uid(&mut pool_conn, uid).await?; +// Ok(HttpResponse::Ok().json("relay got deleted")) +// } +// } +//} diff --git a/emgauwa-lib/src/handlers/v1/schedules.rs b/emgauwa-lib/src/handlers/v1/schedules.rs index 841e6e4..03bf977 100644 --- a/emgauwa-lib/src/handlers/v1/schedules.rs +++ b/emgauwa-lib/src/handlers/v1/schedules.rs @@ -5,11 +5,10 @@ use sqlx::{Pool, Sqlite}; use crate::db::errors::DatabaseError; use crate::db::{Periods, Schedule}; -use crate::db::tag::Tag; +use crate::db::Tag; use crate::db::types::ScheduleUid; use crate::handlers::errors::ApiError; use crate::return_models::ReturnSchedule; -use crate::utils::vec_has_error; #[derive(Debug, Serialize, Deserialize)] pub struct RequestSchedule { @@ -24,11 +23,8 @@ pub async fn index(pool: web::Data>) -> Result = - schedules.iter().map(ReturnSchedule::from).collect(); - for schedule in return_schedules.iter_mut() { - schedule.load_tags(&mut pool_conn); - } + let return_schedules: Vec = + schedules.iter().map(|s| ReturnSchedule::from_schedule_ref(s, &mut pool_conn)).collect(); Ok(HttpResponse::Ok().json(return_schedules)) } @@ -45,11 +41,9 @@ pub async fn tagged( let schedules = Schedule::get_by_tag(&mut pool_conn, &tag_db).await?; - let mut return_schedules: Vec = - schedules.iter().map(ReturnSchedule::from).collect(); - for schedule in return_schedules.iter_mut() { - schedule.load_tags(&mut pool_conn); - } + let return_schedules: Vec = + schedules.iter().map(|s| ReturnSchedule::from_schedule_ref(s, &mut pool_conn)).collect(); + Ok(HttpResponse::Ok().json(return_schedules)) } @@ -65,8 +59,7 @@ pub async fn show( let schedule = Schedule::get_by_uid(&mut pool_conn, &uid).await?; - let mut return_schedule = ReturnSchedule::from(schedule); - return_schedule.load_tags(&mut pool_conn); + let return_schedule = ReturnSchedule::from_schedule(schedule, &mut pool_conn); Ok(HttpResponse::Ok().json(return_schedule)) } @@ -83,8 +76,7 @@ pub async fn add( .set_tags(&mut pool_conn, data.tags.as_slice()) .await?; - let mut return_schedule = ReturnSchedule::from(new_schedule); - return_schedule.load_tags(&mut pool_conn); + let return_schedule = ReturnSchedule::from_schedule(new_schedule, &mut pool_conn); Ok(HttpResponse::Created().json(return_schedule)) } @@ -117,26 +109,14 @@ pub async fn add_list( }) .collect(); - match vec_has_error(&result) { - true => Ok(HttpResponse::from( - result - .into_iter() - .find(|r| r.is_err()) - .unwrap() - .unwrap_err(), - )), - false => { - let mut return_schedules: Vec = result - .iter() - .map(|s| ReturnSchedule::from(s.as_ref().unwrap())) - .collect(); - - for schedule in return_schedules.iter_mut() { - schedule.load_tags(&mut pool_conn); - } - Ok(HttpResponse::Created().json(return_schedules)) + let mut return_schedules: Vec = Vec::new(); + for schedule in result { + match schedule { + Ok(schedule) => return_schedules.push(ReturnSchedule::from_schedule(schedule, &mut pool_conn)), + Err(e) => return Ok(HttpResponse::from(e)), } } + Ok(HttpResponse::Created().json(return_schedules)) } #[put("/api/v1/schedules/{schedule_id}")] @@ -160,8 +140,7 @@ pub async fn update( .set_tags(&mut pool_conn, data.tags.as_slice()) .await?; - let mut return_schedule = ReturnSchedule::from(schedule); - return_schedule.load_tags(&mut pool_conn); + let return_schedule = ReturnSchedule::from_schedule(schedule, &mut pool_conn); Ok(HttpResponse::Ok().json(return_schedule)) } diff --git a/emgauwa-lib/src/return_models.rs b/emgauwa-lib/src/return_models.rs index aed0ae1..046d942 100644 --- a/emgauwa-lib/src/return_models.rs +++ b/emgauwa-lib/src/return_models.rs @@ -1,8 +1,9 @@ -use crate::db::Schedule; +use crate::db::{Controller, Relay, Schedule}; use futures::executor; use serde::Serialize; use sqlx::pool::PoolConnection; use sqlx::Sqlite; +use crate::db::types::ControllerUid; #[derive(Debug, Serialize)] pub struct ReturnSchedule { @@ -12,22 +13,46 @@ pub struct ReturnSchedule { } impl ReturnSchedule { - pub fn load_tags(&mut self, conn: &mut PoolConnection) { - self.tags = executor::block_on(self.schedule.get_tags(conn)).unwrap(); - } -} + pub fn from_schedule(schedule: Schedule, conn: &mut PoolConnection) -> Self { + let schedule = schedule.clone(); + let tags = executor::block_on(schedule.get_tags(conn)).unwrap(); -impl From for ReturnSchedule { - fn from(schedule: Schedule) -> Self { ReturnSchedule { schedule, - tags: vec![], + tags, } } + + pub fn from_schedule_ref(schedule: &Schedule, conn: &mut PoolConnection) -> Self { + Self::from_schedule(schedule.clone(), conn) + } } -impl From<&Schedule> for ReturnSchedule { - fn from(schedule: &Schedule) -> Self { - ReturnSchedule::from(schedule.clone()) +#[derive(Debug, Serialize)] +pub struct ReturnRelay { + #[serde(flatten)] + pub relay: Relay, + pub controller: Controller, + pub controller_id: ControllerUid, + pub tags: Vec, +} + +impl ReturnRelay { + pub fn from_relay(relay: Relay, conn: &mut PoolConnection) -> Self { + let relay = relay.clone(); + let controller = executor::block_on(Controller::get(conn, relay.controller_id)).unwrap(); + let controller_uid = controller.uid.clone(); + let tags = executor::block_on(relay.get_tags(conn)).unwrap(); + + ReturnRelay { + relay, + controller, + controller_id: controller_uid, + tags, + } + } + + pub fn from_relay_ref(relay: &Relay, conn: &mut PoolConnection) -> Self { + Self::from_relay(relay.clone(), conn) } } diff --git a/emgauwa-lib/src/utils.rs b/emgauwa-lib/src/utils.rs index 0a8e48d..694fe46 100644 --- a/emgauwa-lib/src/utils.rs +++ b/emgauwa-lib/src/utils.rs @@ -1,7 +1,3 @@ -pub fn vec_has_error(target: &[Result]) -> bool { - target.iter().any(|t| t.is_err()) -} - pub fn load_settings(config_name: &str, env_prefix: &str) -> T where for<'de> T: serde::Deserialize<'de>