Add ControllerWsAction

This commit is contained in:
Tobias Reisinger 2023-11-27 17:21:40 +01:00
parent cb47dcda5c
commit 3b596de06f
Signed by: serguzim
GPG key ID: 13AD60C237A28DFE
10 changed files with 134 additions and 34 deletions

View file

@ -2,13 +2,12 @@ openapi: 3.0.0
info:
contact:
name: Tobias Reisinger
url: 'https://serguzim.me'
url: 'https://git.serguzim.me/emgauwa/'
title: Emgauwa API v1
version: 0.0.1
version: 0.5.0
description: Server API to manage an Emgauwa system.
servers:
- url: 'http://emgauwa-test-raspi.fritz.box'
- url: 'http://localhost:5000'
- url: 'http://localhost:4419'
tags:
- name: schedules
- name: relays
@ -319,7 +318,7 @@ paths:
'404':
description: Not Found
operationId: delete-controllers-controller_id
description: Delete a single controller. To recover the controller you need to use the conbtrollers/discover feature.
description: Delete a single controller. To recover the controller you need to use the controllers/discover feature.
'/api/v1/controllers/{controller_id}/relays':
parameters:
- schema:

View file

@ -3,6 +3,7 @@ use std::str;
use crate::relay_loop::run_relay_loop;
use crate::settings::Settings;
use emgauwa_lib::db::{DbController, DbRelay};
use emgauwa_lib::handlers::v1::ws::controllers::ControllerWsAction;
use emgauwa_lib::models::convert_db_list;
use emgauwa_lib::types::ControllerUid;
use emgauwa_lib::{db, models};
@ -96,10 +97,6 @@ async fn main() {
relays,
};
let this_json = serde_json::to_string(&this).unwrap();
println!("{}", this_json);
let url = format!(
"ws://{}:{}/api/v1/ws/controllers",
settings.core.host, settings.core.port
@ -112,7 +109,11 @@ async fn main() {
let (mut write, read) = ws_stream.split();
write.send(Message::text(this_json)).await.unwrap();
let ws_action = ControllerWsAction::Register(this);
println!("Sending action: {:?}", ws_action);
let ws_action_json = serde_json::to_string(&ws_action).unwrap();
println!("Sending json: {}", ws_action_json);
write.send(Message::text(ws_action_json)).await.unwrap();
let ws_to_stdout = read.for_each(handle_message);
let stdin_to_ws = stdin_rx.map(Ok).forward(write);

View file

@ -7,7 +7,6 @@ use tokio::time;
pub async fn run_relay_loop(settings: Settings) {
let default_duration = Duration::from_millis(1000);
loop {
// naivetime timestamp for now
let next_timestamp = Local::now().naive_local().time() + default_duration;
time::sleep(default_duration).await;
println!("Relay loop: {}", next_timestamp)

View file

@ -1,4 +1,4 @@
use serde_derive::Serialize;
use serde_derive::{Deserialize, Serialize};
use std::ops::DerefMut;
use sqlx::pool::PoolConnection;
@ -8,9 +8,11 @@ use crate::db::errors::DatabaseError;
use crate::db::DbTag;
use crate::types::ControllerUid;
#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbController {
#[serde(skip)]
pub id: i64,
#[serde(rename = "id")]
pub uid: ControllerUid,
pub name: String,
pub relay_count: i64,
@ -51,6 +53,19 @@ impl DbController {
.map_err(DatabaseError::from)
}
pub async fn get_by_uid_or_create(
conn: &mut PoolConnection<Sqlite>,
uid: &ControllerUid,
new_name: &str,
new_relay_count: i64,
new_active: bool,
) -> Result<DbController, DatabaseError> {
match DbController::get_by_uid(conn, uid).await? {
Some(tag) => Ok(tag),
None => DbController::create(conn, uid, new_name, new_relay_count, new_active).await,
}
}
pub async fn get_by_tag(
conn: &mut PoolConnection<Sqlite>,
tag: &DbTag,

View file

@ -1,4 +1,4 @@
use serde_derive::Serialize;
use serde_derive::{Deserialize, Serialize};
use std::ops::DerefMut;
use crate::db::DbController;
@ -8,7 +8,7 @@ use sqlx::Sqlite;
use crate::db::errors::DatabaseError;
use crate::db::DbTag;
#[derive(Debug, Serialize, Clone, sqlx::FromRow)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbRelay {
#[serde(skip)]
pub id: i64,
@ -51,6 +51,18 @@ impl DbRelay {
.await?)
}
pub async fn get_by_controller_and_num_or_create(
conn: &mut PoolConnection<Sqlite>,
controller: &DbController,
number: i64,
new_name: &str,
) -> Result<DbRelay, DatabaseError> {
match DbRelay::get_by_controller_and_num(conn, controller, number).await? {
Some(relay) => Ok(relay),
None => DbRelay::create(conn, new_name, number, controller).await,
}
}
pub async fn get_by_tag(
conn: &mut PoolConnection<Sqlite>,
tag: &DbTag,

View file

@ -10,11 +10,11 @@ use crate::db::model_utils::Period;
use crate::db::DbTag;
use crate::types::ScheduleUid;
#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbSchedule {
#[serde(skip)]
pub id: i64,
#[serde(rename(serialize = "id"))]
#[serde(rename = "id")]
pub uid: ScheduleUid,
pub name: String,
pub periods: DbPeriods,

View file

@ -1,12 +1,22 @@
use crate::db::DbSchedule;
use crate::db::errors::DatabaseError;
use crate::db::{DbController, DbRelay};
use crate::handlers::errors::ApiError;
use crate::models::Controller;
use actix::{Actor, StreamHandler};
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use actix_web_actors::ws::ProtocolError;
use futures::FutureExt;
use serde_derive::{Deserialize, Serialize};
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use ws::Message;
#[derive(Debug, Serialize, Deserialize)]
pub enum ControllerWsAction {
Register(Controller),
}
struct ControllerWs {
pub pool: Pool<Sqlite>,
}
@ -15,24 +25,57 @@ impl Actor for ControllerWs {
type Context = ws::WebsocketContext<Self>;
}
async fn get_schedules(pool: &mut Pool<Sqlite>) -> Result<Vec<DbSchedule>, ApiError> {
let mut pool_conn = pool.acquire().await?;
Ok(DbSchedule::get_all(&mut pool_conn).await?)
}
impl StreamHandler<Result<Message, ProtocolError>> for ControllerWs {
fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
let schedules = futures::executor::block_on(get_schedules(&mut self.pool)).unwrap();
let schedules_json = serde_json::to_string(&schedules).unwrap();
let mut pool_conn = futures::executor::block_on(self.pool.acquire()).unwrap();
match msg {
Ok(Message::Ping(msg)) => ctx.pong(&msg),
Ok(Message::Text(text)) => {
println!("Got text: {}", text);
ctx.text(schedules_json)
let action: ControllerWsAction = serde_json::from_str(&text).unwrap();
let action_res = futures::executor::block_on(handle_action(&mut pool_conn, action));
if let Err(e) = action_res {
log::error!("Error handling action: {:?}", e);
ctx.text(serde_json::to_string(&e).unwrap());
}
}
_ => {}
}
//let schedules = futures::executor::block_on(DbSchedule::get_all(&mut pool_conn)).unwrap();
//let schedules_json = serde_json::to_string(&schedules).unwrap();
//ctx.text(schedules_json);
}
}
pub async fn handle_action(
conn: &mut PoolConnection<Sqlite>,
action: ControllerWsAction,
) -> Result<(), DatabaseError> {
match action {
ControllerWsAction::Register(controller) => {
log::info!("Registering controller: {:?}", controller);
let c = &controller.controller;
let controller_db =
DbController::get_by_uid_or_create(conn, &c.uid, &c.name, c.relay_count, c.active)
.await?;
println!("Controller: {:?}", controller_db);
for relay in &controller.relays {
let r = &relay.relay;
let relay_db = DbRelay::get_by_controller_and_num_or_create(
conn,
&controller_db,
r.number,
&r.name,
)
.await?;
println!("Controller relay: {:?}", relay_db);
}
Ok(())
}
}
}

View file

@ -1,8 +1,9 @@
use crate::db;
use crate::db::errors::DatabaseError;
use crate::db::{DbRelay, DbSchedule};
use crate::types::ControllerUid;
use futures::executor;
use serde_derive::Serialize;
use serde_derive::{Deserialize, Serialize};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
@ -17,22 +18,23 @@ pub trait FromDbModel {
Self: Sized;
}
#[derive(Serialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Schedule {
#[serde(flatten)]
pub schedule: DbSchedule,
pub tags: Vec<String>,
}
#[derive(Serialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Relay {
#[serde(flatten)]
pub relay: DbRelay,
pub controller: db::DbController,
pub controller_id: ControllerUid,
pub tags: Vec<String>,
}
#[derive(Serialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Controller {
#[serde(flatten)]
pub controller: db::DbController,
@ -62,11 +64,13 @@ impl FromDbModel for Relay {
) -> Result<Self, DatabaseError> {
let relay = db_model.clone();
let controller = executor::block_on(relay.get_controller(conn))?;
let controller_id = controller.uid.clone();
let tags = executor::block_on(relay.get_tags(conn))?;
Ok(Relay {
relay,
controller,
controller_id,
tags,
})
}

View file

@ -1,9 +1,10 @@
use serde::{Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sqlx::database::HasArguments;
use sqlx::encode::IsNull;
use sqlx::error::BoxDynError;
use sqlx::sqlite::{SqliteTypeInfo, SqliteValueRef};
use sqlx::{Decode, Encode, Sqlite, Type};
use std::str::FromStr;
use uuid::Uuid;
#[derive(Clone, Debug)]
@ -24,6 +25,16 @@ impl Serialize for ControllerUid {
}
}
impl<'de> Deserialize<'de> for ControllerUid {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Self::try_from(String::deserialize(deserializer)?.as_str())
.map_err(|_| serde::de::Error::custom("invalid controller uid"))
}
}
impl From<&ControllerUid> for String {
fn from(uid: &ControllerUid) -> String {
uid.0.as_hyphenated().to_string()
@ -55,6 +66,12 @@ impl<'r> Decode<'r, Sqlite> for ControllerUid {
}
}
impl From<&str> for ControllerUid {
fn from(value: &str) -> Self {
Self(Uuid::from_str(value).unwrap())
}
}
impl From<&[u8]> for ControllerUid {
fn from(value: &[u8]) -> Self {
Self(Uuid::from_slice(value).unwrap())

View file

@ -2,7 +2,7 @@ use std::convert::TryFrom;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use serde::{Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sqlx::database::HasArguments;
use sqlx::encode::IsNull;
use sqlx::error::BoxDynError;
@ -75,6 +75,16 @@ impl Serialize for ScheduleUid {
}
}
impl<'de> Deserialize<'de> for ScheduleUid {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Self::try_from(String::deserialize(deserializer)?.as_str())
.map_err(|_| serde::de::Error::custom("invalid schedule uid"))
}
}
impl From<Uuid> for ScheduleUid {
fn from(uid: Uuid) -> Self {
match uid.as_u128() {