Split project (keep common)

This commit is contained in:
Tobias Reisinger 2024-04-30 09:13:17 +02:00
parent 9bc75b9627
commit 96a64cf90d
Signed by: serguzim
GPG key ID: 13AD60C237A28DFE
68 changed files with 32 additions and 3502 deletions

View file

@ -1,3 +0,0 @@
[alias]
format = "+nightly fmt"
lint = "clippy --all-targets --all-features -- -D warnings"

View file

@ -1,3 +0,0 @@
#EMGAUWA_CONTROLLER__LOGGING__LEVEL=DEBUG
#EMGAUWA_CORE__LOGGING__LEVEL=DEBUG

View file

@ -1,7 +1,32 @@
[workspace] [package]
resolver = "2" name = "emgauwa-common"
members = [ version = "0.5.0"
"emgauwa-core", edition = "2021"
"emgauwa-controller", authors = ["Tobias Reisinger <tobias@msrg.cc>"]
"emgauwa-common",
]
[dependencies]
actix = "0.13"
actix-web = "4.4"
actix-web-actors = "4.2"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
simple_logger = "4.2"
log = "0.4"
config = "0.13"
chrono = { version = "0.4", features = ["serde"] }
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] }
libsqlite3-sys = { version = "*", features = ["bundled"] }
uuid = "1.6"
futures = "0.3"
libc = "0.2"
rppal = "0.17"
rppal-pfd = "0.0.5"
rppal-mcp23s17 = "0.0.3"

View file

@ -1,5 +0,0 @@
[build]
pre-build = [
"curl -Lo /usr/bin/yq https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64",
"chmod +x /usr/bin/yq"
]

View file

@ -3,40 +3,3 @@ sqlx:
cargo sqlx database create cargo sqlx database create
cargo sqlx migrate run cargo sqlx migrate run
cargo sqlx prepare --workspace cargo sqlx prepare --workspace
build-rpi:
cross build --target arm-unknown-linux-gnueabihf
emgauwa-%.json: config/%.pkl
pkl eval -f json -o $@ $<
configs:
$(MAKE) emgauwa-core.json
$(MAKE) emgauwa-controller.json
clean:
rm -f emgauwa-controller.json
rm -f emgauwa-controller.sqlite
rm -f emgauwa-core.json
rm -f emgauwa-core.sqlite
rm -f emgauwa-dev.sqlite
emgauwa-controller_%:
$(TOOL) build --target $* --release --bin emgauwa-controller
mkdir -p out/releases
cp target/$*/release/emgauwa-controller out/releases/emgauwa-controller_$*
emgauwa-core_%:
$(TOOL) build --target $* --release --bin emgauwa-core
mkdir -p out/releases
cp target/$*/release/emgauwa-core out/releases/emgauwa-core_$*
emgauwa_%:
$(MAKE) emgauwa-controller_$*
$(MAKE) emgauwa-core_$*
releases:
$(MAKE) TOOL=cross emgauwa_arm-unknown-linux-gnueabihf
$(MAKE) TOOL=cargo emgauwa_x86_64-unknown-linux-gnu
$(MAKE) TOOL=cross emgauwa_x86_64-unknown-linux-musl

View file

@ -1,875 +0,0 @@
openapi: 3.0.0
info:
contact:
name: Tobias Reisinger
url: 'https://git.serguzim.me/emgauwa/'
title: Emgauwa API v1
version: 0.5.0
description: Server API to manage an Emgauwa system.
servers:
- url: 'http://localhost:4419'
tags:
- name: schedules
- name: relays
- name: controllers
- name: tags
- name: macros
- name: websocket
paths:
/api/v1/schedules:
get:
summary: get all schedules
description: Receive a list with all available schedules.
tags:
- schedules
responses:
'200':
description: OK
headers: { }
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/schedule'
operationId: get-api-v1-schedules
post:
summary: add new schedule
tags:
- schedules
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/schedule'
operationId: post-api-v1-schedules
description: Create a new schedule. A new unique id will be returned
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/schedule'
description: The "id" field will be set by the server.
parameters: [ ]
'/api/v1/schedules/{schedule_id}':
parameters:
- schema:
type: string
format: uuid
name: schedule_id
in: path
required: true
description: ''
get:
summary: get single schedule
tags:
- schedules
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/schedule'
'404':
description: Not Found
content:
application/json:
schema:
type: object
properties: { }
operationId: get-schedules-schedule_id
description: Return a single schedule by id.
put:
summary: overwrite single schedule
tags:
- schedules
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/schedule'
'400':
description: Bad Request
content:
application/json:
schema:
type: object
properties: { }
'404':
description: Not Found
content:
application/json:
schema:
type: object
properties: { }
operationId: put-schedules-schedule_id
requestBody:
content:
application/json:
schema:
type: object
properties:
name:
type: string
periods:
type: array
items:
$ref: '#/components/schemas/period'
tags:
type: array
items:
$ref: '#/components/schemas/tag'
description: ''
parameters: [ ]
description: Overwrite the properties for a single schedule. Overwriting periods on "on" or "off" will fail.
delete:
summary: delete single schedule
tags:
- schedules
responses:
'200':
description: OK
'403':
description: Forbidden
'404':
description: Not Found
operationId: delete-schedules-schedule_id
description: Deletes a single schedule. Deleting "on" or "off" is forbidden (403).
'/api/v1/schedules/tag/{tag}':
parameters:
- schema:
type: string
name: tag
in: path
required: true
description: ''
get:
summary: get schedules by tag
tags:
- schedules
responses:
'200':
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/schedule'
operationId: get-schedules-tag-schedule_id
description: Receive a list of schedules which include the given tag.
/api/v1/relays:
get:
summary: get all relays
tags:
- relays
responses:
'200':
description: OK
headers: { }
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/relay'
operationId: get-relays
description: Return a list with all relays.
parameters: [ ]
'/api/v1/relays/tag/{tag}':
parameters:
- schema:
type: string
name: tag
in: path
required: true
get:
summary: get relays by tag
tags:
- relays
responses:
'200':
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/relay'
operationId: get-relays-tag-tag
description: Return all relays with the given tag.
/api/v1/controllers:
get:
summary: get all controllers
tags:
- controllers
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/controller'
operationId: get-controllers
description: Return all controllers.
parameters: [ ]
'/api/v1/controllers/{controller_id}':
parameters:
- schema:
type: string
name: controller_id
in: path
description: ''
required: true
get:
summary: get single controller
tags:
- controllers
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/controller'
'404':
description: Not Found
content:
application/json:
schema:
type: object
properties: { }
operationId: get-controllers-controller_id
requestBody:
content:
application/json:
schema:
type: object
properties: { }
description: ''
description: Return a single controller by id. When no controller with the id is found 404 will be returned.
put:
summary: overwrite single controller
tags:
- controllers
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/controller'
'400':
description: Bad Request
content:
application/json:
schema:
type: object
properties: { }
'404':
description: Not Found
content:
application/json:
schema:
type: object
properties: { }
operationId: put-controllers-controller_id
requestBody:
content:
application/json:
schema:
type: object
properties:
name:
type: string
ip:
type: string
format: ipv4
description: Overwrite properties of a single controller.
delete:
summary: delete single controller
tags:
- controllers
responses:
'200':
description: OK
'404':
description: Not Found
operationId: delete-controllers-controller_id
description: Delete a single controller. To recover the controller you need to use the controllers/discover feature.
'/api/v1/controllers/{controller_id}/relays':
parameters:
- schema:
type: string
name: controller_id
in: path
required: true
get:
summary: get all relays for single controller
tags:
- controllers
- relays
responses:
'200':
description: OK
headers: { }
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/relay'
'404':
description: Not Found
content:
application/json:
schema:
type: array
items: { }
operationId: get-controllers-controller_id-relays
description: Returns all relays for a single controller.
'/api/v1/controllers/{controller_id}/relays/{relay_num}':
parameters:
- schema:
type: string
name: controller_id
in: path
required: true
- schema:
type: integer
name: relay_num
in: path
required: true
get:
summary: get single relay for single controller
tags:
- controllers
- relays
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/relay'
'404':
description: Not Found
content:
application/json:
schema:
type: object
properties: { }
operationId: get-controllers-controller_id-relays-relay_num
description: 'Return a single relay by number for a controller by id. When the relay or controller is not found, 404 will be returned.'
put:
summary: overwrite single relay for single controller
tags:
- controllers
- relays
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/relay'
'400':
description: Bad Request
content:
application/json:
schema:
type: object
properties: { }
'404':
description: Not Found
content:
application/json:
schema:
type: object
properties: { }
operationId: put-controllers-controller_id-relays-relay_num
requestBody:
content:
application/json:
schema:
type: object
properties:
name:
type: string
active_schedule:
type: object
properties:
id:
$ref: '#/components/schemas/schedule_id'
schedules:
type: array
maxItems: 7
minItems: 7
items:
type: object
properties:
id:
$ref: '#/components/schemas/schedule_id'
tags:
type: array
items:
$ref: '#/components/schemas/tag'
description: 'active schedule will overwrite schedules[weekday]'
/api/v1/tags:
get:
summary: get all tags
tags:
- tags
responses:
'200':
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/tag'
operationId: get-tags
description: Returns a list of tags.
parameters: [ ]
post:
summary: add new tag
operationId: post-api-v1-tags
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/tag_full'
'400':
description: Bad Request
requestBody:
content:
application/json:
schema:
type: object
properties:
tag:
$ref: '#/components/schemas/tag'
description: ''
tags:
- tags
description: Add a new tag. Will return 400 when the tag already exits.
'/api/v1/tags/{tag}':
parameters:
- schema:
type: string
name: tag
in: path
required: true
get:
summary: get relays and schedules for tag
tags:
- tags
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/tag_full'
'404':
description: Not Found
operationId: get-tags-tag
description: Return all models with the given tag (relays and schedules)
delete:
summary: delete tag
operationId: delete-tags-tag
responses:
'200':
description: OK
'404':
description: Not Found
description: delete tag from database and from affected relays and schedules
tags:
- tags
'/api/v1/controllers/{controller_id}/relays/{relay_num}/pulse':
parameters:
- schema:
type: string
name: controller_id
in: path
required: true
- schema:
type: string
name: relay_num
in: path
required: true
post:
summary: pulse relay on
responses:
'200':
description: OK
'404':
description: Not Found
operationId: post-controllers-controller_id-relays-relay_num-pulse
requestBody:
content:
application/json:
schema:
type: object
properties:
duration:
type: integer
description: ''
description: Turn a relay on for a short amount of time. The duration can be set in the body in seconds. When no duration is supplied the default for the relay will be used. The default is read from the controller's config.
tags:
- controllers
- relays
/api/v1/ws/relays:
get:
summary: get relay status updates
tags:
- websocket
responses:
'200':
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/relay'
operationId: get-ws-relays
description: |-
WEBSOCKET
This websocket will send all relays with the most recent status every 10 seconds.
parameters: [ ]
/api/v1/macros:
get:
summary: get all macros
tags:
- macros
responses:
'200':
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/macro'
operationId: get-api-v1-macros
description: Receive a list with all available macros.
post:
summary: add new macro
tags:
- macros
responses:
'201':
description: Created
operationId: post-api-v1-macros
description: Create a new macro. A new unique id will be returned
requestBody:
content:
application/json:
schema:
type: object
properties:
name:
type: string
actions:
type: array
items:
type: object
properties:
weekday:
type: integer
minimum: 0
maximum: 6
relay:
type: object
properties:
number:
type: integer
controller_id:
$ref: '#/components/schemas/controller_id'
schedule:
type: object
properties:
id:
$ref: '#/components/schemas/schedule_id'
'/api/v1/macros/{macro_id}':
parameters:
- schema:
type: string
name: macro_id
in: path
required: true
get:
summary: get a single macro
tags:
- macros
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/macro'
operationId: get-api-v1-macros-macro_id
description: Return a single macro by id. When no macro with the id is found 404 will be returned.
put:
summary: overwrite a macro
tags:
- macros
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/macro'
operationId: put-api-v1-macros-macro_id
description: Overwrite properties of a single macro.
requestBody:
content:
application/json:
schema:
type: object
properties:
name:
type: string
actions:
type: array
items:
type: object
properties:
weekday:
type: integer
minimum: 0
maximum: 6
schedule:
type: object
properties:
id:
$ref: '#/components/schemas/schedule_id'
relay:
type: object
properties:
number:
type: integer
controller_id:
$ref: '#/components/schemas/controller_id'
delete:
summary: delete a macro
tags:
- macros
responses:
'200':
description: OK
operationId: delete-api-v1-macros-macro_id
description: Delete a single macro.
'/api/v1/macros/{macro_id}/execute':
parameters:
- schema:
type: string
name: macro_id
in: path
required: true
- schema:
type: integer
name: weekday
in: query
put:
summary: execute a macro
tags:
- macros
responses:
'200':
description: OK
'404':
description: Not Found
operationId: put-api-v1-macros-macro_id-execute
description: Execute a macro
/api/v1/schedules/list:
post:
summary: add new schedule list
tags:
- schedules
responses:
'200':
description: OK
operationId: post-schedules-list
requestBody:
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/schedule'
description: Create a list of schedules
parameters: [ ]
components:
schemas:
controller:
title: controller
type: object
properties:
id:
$ref: '#/components/schemas/controller_id'
name:
type: string
example: Garden Controller
ip:
type: string
format: ipv4
example: 224.73.153.12
active:
type: boolean
port:
type: integer
example: 27480
relay_count:
type: integer
minimum: 0
example: 10
relays:
type: array
items:
$ref: '#/components/schemas/relay'
relay:
title: relay
type: object
properties:
number:
type: integer
minimum: 0
example: 3
name:
type: string
example: Sprinkling System 1
controller_id:
$ref: '#/components/schemas/controller_id'
active_schedule:
$ref: '#/components/schemas/schedule-untagged'
schedules:
type: array
maxItems: 7
minItems: 7
items:
$ref: '#/components/schemas/schedule-untagged'
tags:
type: array
items:
$ref: '#/components/schemas/tag'
is_on:
type: boolean
description: NULL when unknown
schedule-untagged:
title: schedule
type: object
description: ''
properties:
id:
$ref: '#/components/schemas/schedule_id'
name:
type: string
example: Sprinkler Sunny Day
periods:
type: array
items:
$ref: '#/components/schemas/period'
schedule:
title: schedule
type: object
description: ''
properties:
id:
$ref: '#/components/schemas/schedule_id'
name:
type: string
example: Sprinkler Sunny Day
periods:
type: array
items:
$ref: '#/components/schemas/period'
tags:
type: array
items:
$ref: '#/components/schemas/tag'
period:
title: period
type: object
properties:
start:
type: string
example: '10:15'
format: 24-hour
end:
type: string
format: 24-hour
example: '14:45'
required:
- start
- end
controller_id:
type: string
title: controller_id
format: uuid
example: 589c0eab-a4b4-4f3a-be97-cf03b1dc8edc
tag:
type: string
title: tag
example: sprinkler
tag_full:
title: tag (full)
type: object
properties:
tag:
$ref: '#/components/schemas/tag'
relays:
type: array
items:
$ref: '#/components/schemas/relay'
schedules:
type: array
items:
$ref: '#/components/schemas/schedule'
schedule_id:
type: string
title: schedule_id
format: uuid
example: 6bceb29b-7d2e-4af3-a26e-11f514dc5cc1
macro:
title: macro
type: object
properties:
id:
type: string
format: uuid
example: a9a4eab4-6c54-4fe4-b755-bdb2a90b3242
name:
type: string
actions:
type: array
items:
$ref: '#/components/schemas/macro_action'
macro_action:
title: macro_action
type: object
description: ''
properties:
weekday:
type: integer
minimum: 0
maximum: 6
schedule:
$ref: '#/components/schemas/schedule'
relay:
$ref: '#/components/schemas/relay'
required:
- weekday
- schedule
- relay

View file

@ -1,56 +0,0 @@
amends "package://emgauwa.app/pkl/emgauwa@0.1.0#/controller.pkl"
relays {
new {
driver = "null"
pin = 0
inverted = true
}
new {
driver = "null"
pin = 1
inverted = true
}
new {
driver = "null"
pin = 2
inverted = true
}
new {
driver = "null"
pin = 3
inverted = true
}
new {
driver = "null"
pin = 4
inverted = true
}
new {
driver = "null"
pin = 5
inverted = true
}
new {
driver = "null"
pin = 10
inverted = true
pulse = 10
}
new {
driver = "null"
pin = 11
inverted = true
pulse = 10
}
new {
driver = "null"
pin = 20
inverted = false
}
new {
driver = "null"
pin = 21
inverted = false
}
}

View file

@ -1 +0,0 @@
amends "package://emgauwa.app/pkl/emgauwa@0.1.0#/core.pkl"

View file

@ -1,32 +0,0 @@
[package]
name = "emgauwa-common"
version = "0.5.0"
edition = "2021"
authors = ["Tobias Reisinger <tobias@msrg.cc>"]
[dependencies]
actix = "0.13"
actix-web = "4.4"
actix-web-actors = "4.2"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
simple_logger = "4.2"
log = "0.4"
config = "0.13"
chrono = { version = "0.4", features = ["serde"] }
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] }
libsqlite3-sys = { version = "*", features = ["bundled"] }
uuid = "1.6"
futures = "0.3"
libc = "0.2"
rppal = "0.17"
rppal-pfd = "0.0.5"
rppal-mcp23s17 = "0.0.3"

View file

@ -1,32 +0,0 @@
[package]
name = "emgauwa-controller"
version = "0.5.0"
edition = "2021"
authors = ["Tobias Reisinger <tobias@msrg.cc>"]
[dependencies]
emgauwa-common = { path = "../emgauwa-common" }
actix = "0.13"
tokio = { version = "1.34", features = ["io-std", "macros", "rt-multi-thread"] }
tokio-tungstenite = "0.21"
simple_logger = "4.3"
log = "0.4"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.5", features = ["serde", "v4"] }
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] }
futures = "0.3"
futures-channel = "0.3"
rppal = "0.17"
rppal-pfd = "0.0.5"
rppal-mcp23s17 = "0.0.3"

View file

@ -1,173 +0,0 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::{Actor, Context, Handler, Message};
use emgauwa_common::constants;
use emgauwa_common::drivers::RelayDriver;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::Controller;
use emgauwa_common::types::RelayStates;
use futures::executor::block_on;
use sqlx::{Pool, Sqlite};
use tokio::sync::Notify;
use crate::settings::Settings;
#[derive(Message)]
#[rtype(result = "Result<(), EmgauwaError>")]
pub struct Reload {}
#[derive(Message)]
#[rtype(result = "()")]
pub struct UpdateRelayStates {
pub relay_states: RelayStates,
}
#[derive(Message)]
#[rtype(result = "Result<(), EmgauwaError>")]
pub struct RelayPulse {
pub relay_number: i64,
pub duration: Option<u32>,
}
#[derive(Message)]
#[rtype(result = "Controller")]
pub struct GetThis {}
#[derive(Message)]
#[rtype(result = "Arc<Notify>")]
pub struct GetControllerNotifier {}
#[derive(Message)]
#[rtype(result = "Arc<Notify>")]
pub struct GetRelayNotifier {}
pub struct AppState {
pub pool: Pool<Sqlite>,
pub this: Controller,
pub settings: Settings,
pub drivers: Vec<Box<dyn RelayDriver>>,
pub controller_notifier: Arc<Notify>,
pub relay_notifier: Arc<Notify>,
}
impl AppState {
pub fn new(
pool: Pool<Sqlite>,
this: Controller,
settings: Settings,
drivers: Vec<Box<dyn RelayDriver>>,
) -> AppState {
AppState {
pool,
this,
settings,
drivers,
controller_notifier: Arc::new(Notify::new()),
relay_notifier: Arc::new(Notify::new()),
}
}
pub fn notify_controller_change(&self) {
self.controller_notifier.notify_one();
}
pub fn notify_relay_change(&self) {
self.relay_notifier.notify_one();
}
}
impl Actor for AppState {
type Context = Context<Self>;
}
impl Handler<Reload> for AppState {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, _msg: Reload, _ctx: &mut Self::Context) -> Self::Result {
log::debug!("Reloading controller");
let mut pool_conn = block_on(self.pool.acquire())?;
self.this.reload(&mut pool_conn)?;
self.notify_controller_change();
Ok(())
}
}
impl Handler<UpdateRelayStates> for AppState {
type Result = ();
fn handle(&mut self, msg: UpdateRelayStates, _ctx: &mut Self::Context) -> Self::Result {
self.this.apply_relay_states(&msg.relay_states);
self.drivers
.iter_mut()
.zip(msg.relay_states.iter())
.for_each(|(driver, state)| {
if let Err(e) = driver.set(state.unwrap_or(false)) {
log::error!("Error setting relay: {}", e);
}
});
self.notify_relay_change();
}
}
impl Handler<RelayPulse> for AppState {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, msg: RelayPulse, _ctx: &mut Self::Context) -> Self::Result {
let relay_num = msg.relay_number;
let duration = Duration::from_secs(
match msg.duration {
None => {
self.settings
.get_relay(relay_num)
.ok_or(EmgauwaError::Other(String::from(
"Relay not found in settings",
)))?
.pulse
}
Some(dur) => Some(dur as u64),
}
.unwrap_or(constants::RELAY_PULSE_DURATION),
);
let now = Instant::now();
let until = now + duration;
self.this.relay_pulse(relay_num, until)?;
log::debug!(
"Pulsing relay {} for {} seconds until {:?}",
relay_num,
duration.as_secs(),
until
);
Ok(())
}
}
impl Handler<GetThis> for AppState {
type Result = Controller;
fn handle(&mut self, _msg: GetThis, _ctx: &mut Self::Context) -> Self::Result {
self.this.clone()
}
}
impl Handler<GetControllerNotifier> for AppState {
type Result = Arc<Notify>;
fn handle(&mut self, _msg: GetControllerNotifier, _ctx: &mut Self::Context) -> Self::Result {
Arc::clone(&self.controller_notifier)
}
}
impl Handler<GetRelayNotifier> for AppState {
type Result = Arc<Notify>;
fn handle(&mut self, _msg: GetRelayNotifier, _ctx: &mut Self::Context) -> Self::Result {
Arc::clone(&self.relay_notifier)
}
}

View file

@ -1,22 +0,0 @@
use serde::{Deserialize, Deserializer};
#[derive(Debug, Clone, Copy)]
pub enum Driver {
Null,
Gpio,
PiFace,
}
impl<'de> Deserialize<'de> for Driver {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
match String::deserialize(deserializer)?.as_str() {
"null" => Ok(Driver::Null),
"gpio" => Ok(Driver::Gpio),
"piface" => Ok(Driver::PiFace),
_ => Err(serde::de::Error::custom("invalid driver")),
}
}
}

View file

@ -1,124 +0,0 @@
use actix::Actor;
use emgauwa_common::db;
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::{Controller, FromDbModel};
use emgauwa_common::types::EmgauwaUid;
use emgauwa_common::utils::{drop_privileges, init_logging};
use rppal_pfd::PiFaceDigital;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use crate::relay_loop::run_relays_loop;
use crate::settings::Settings;
use crate::ws::run_ws_loop;
mod app_state;
mod driver;
mod relay_loop;
mod settings;
mod utils;
mod ws;
async fn create_this_controller(
conn: &mut PoolConnection<Sqlite>,
settings: &Settings,
) -> Result<DbController, EmgauwaError> {
DbController::create(
conn,
&EmgauwaUid::default(),
&settings.name,
settings.relays.len() as i64,
)
.await
.map_err(EmgauwaError::from)
}
async fn create_this_relay(
conn: &mut PoolConnection<Sqlite>,
this_controller: &DbController,
settings_relay: &settings::Relay,
) -> Result<DbRelay, EmgauwaError> {
let relay = DbRelay::create(
conn,
&settings_relay.name,
settings_relay.number.ok_or(EmgauwaError::Internal(
"Relay number is missing".to_string(),
))?,
this_controller,
)
.await?;
let off = DbSchedule::get_off(conn).await?;
for weekday in 0..7 {
DbJunctionRelaySchedule::set_schedule(conn, &relay, &off, weekday).await?;
}
Ok(relay)
}
#[actix::main]
async fn main() -> Result<(), std::io::Error> {
let settings = settings::init()?;
drop_privileges(&settings.permissions)?;
init_logging(&settings.logging.level)?;
let mut pfd: Option<PiFaceDigital> = None;
let drivers = settings.relays_make_drivers(&mut pfd)?;
let pool = db::init(&settings.database)
.await
.map_err(EmgauwaError::from)?;
let mut conn = pool.acquire().await.map_err(EmgauwaError::from)?;
let db_controller = match DbController::get_all(&mut conn)
.await
.map_err(EmgauwaError::from)?
.pop()
{
None => futures::executor::block_on(create_this_controller(&mut conn, &settings))?,
Some(c) => c,
};
for relay in &settings.relays {
if DbRelay::get_by_controller_and_num(
&mut conn,
&db_controller,
relay.number.ok_or(EmgauwaError::Internal(
"Relay number is missing".to_string(),
))?,
)
.await
.map_err(EmgauwaError::from)?
.is_none()
{
create_this_relay(&mut conn, &db_controller, relay)
.await
.map_err(EmgauwaError::from)?;
}
}
let db_controller = db_controller
.update(&mut conn, &db_controller.name, settings.relays.len() as i64)
.await
.map_err(EmgauwaError::from)?;
let this = Controller::from_db_model(&mut conn, db_controller).map_err(EmgauwaError::from)?;
let url = format!(
"ws://{}:{}/api/v1/ws/controllers",
settings.server.host, settings.server.port
);
let app_state = app_state::AppState::new(pool.clone(), this, settings, drivers).start();
let _ = tokio::join!(
tokio::spawn(run_relays_loop(app_state.clone())),
tokio::spawn(run_ws_loop(pool.clone(), app_state.clone(), url)),
);
Ok(())
}

View file

@ -1,150 +0,0 @@
use std::time::{Duration, Instant};
use actix::Addr;
use chrono::{Local, Timelike};
use emgauwa_common::constants::RELAYS_RETRY_TIMEOUT;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::Controller;
use emgauwa_common::types::{RelayStates, Weekday};
use emgauwa_common::utils::printable_relay_states;
use futures::pin_mut;
use tokio::time;
use tokio::time::timeout;
use utils::app_state_get_controller_notifier;
use crate::app_state::AppState;
use crate::utils;
pub async fn run_relays_loop(app_state: Addr<AppState>) {
log::debug!("Spawned relays loop");
loop {
let run_result = run_relays(&app_state).await;
if let Err(err) = run_result {
log::error!("Error running relays: {}", err);
}
time::sleep(RELAYS_RETRY_TIMEOUT).await;
}
}
async fn run_relays(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
let notifier = &*app_state_get_controller_notifier(app_state).await?;
let mut last_weekday = emgauwa_common::utils::get_weekday();
let mut this = utils::app_state_get_this(app_state).await?;
let mut relay_states: RelayStates = Vec::new();
init_relay_states(&mut relay_states, &this);
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
let mut duration_override = None;
loop {
log::debug!(
"Relay loop at {}: {}",
Local::now().naive_local().time(),
printable_relay_states(&this.get_relay_states())
);
let notifier_future = notifier.notified();
pin_mut!(notifier_future);
let mut changed = timeout(
get_next_duration(&this, &mut duration_override),
&mut notifier_future,
)
.await
.is_ok();
check_weekday(app_state, &mut last_weekday, &mut changed).await?;
if changed {
log::debug!("Reloading controller in relay loop");
this = utils::app_state_get_this(app_state).await?;
}
let now_pulse = Instant::now();
duration_override = this
.relays
.iter_mut()
.filter_map(|relay| match relay.check_pulsing(&now_pulse) {
None => None,
Some(pulse) => {
let dur = pulse - now_pulse;
log::debug!(
"Pulsing relay {} for {}s until {:?} ",
relay.r.number,
dur.as_secs(),
pulse
);
Some(dur)
}
})
.min();
calc_relay_states(&mut relay_states, &mut this, app_state).await?;
}
}
fn init_relay_states(relay_states: &mut RelayStates, this: &Controller) {
relay_states.clear();
for _ in 0..this.c.relay_count {
relay_states.push(None);
}
}
async fn calc_relay_states(
relay_states: &mut RelayStates,
this: &mut Controller,
app_state: &Addr<AppState>,
) -> Result<(), EmgauwaError> {
let now = Local::now().time();
let now_pulse = Instant::now();
this.relays
.iter_mut()
.zip(relay_states.iter_mut())
.for_each(|(relay, state)| {
relay.is_on = Some(
relay.active_schedule.is_on(&now) || relay.check_pulsing(&now_pulse).is_some(),
);
*state = relay.is_on;
});
utils::app_state_update_relays_on(app_state, relay_states.clone()).await
}
fn get_next_duration(this: &Controller, duration_override: &mut Option<Duration>) -> Duration {
if let Some(duration) = duration_override {
log::debug!("Duration override. Waiting for {}s", duration.as_secs());
return *duration;
}
let now = Local::now().time();
let now_in_s = now.num_seconds_from_midnight();
let next_timestamp = this
.get_next_time(&now)
.map_or(86400, |t| t.num_seconds_from_midnight());
let duration_to_next = Duration::from_secs((next_timestamp - now_in_s) as u64);
log::debug!(
"Next timestamp: {}; Waiting for {}s",
next_timestamp,
duration_to_next.as_secs()
);
duration_to_next
}
async fn check_weekday(
app_state: &Addr<AppState>,
last_weekday: &mut Weekday,
changed: &mut bool,
) -> Result<(), EmgauwaError> {
let current_weekday = emgauwa_common::utils::get_weekday();
if current_weekday.ne(last_weekday) {
log::debug!("Weekday changed");
*last_weekday = current_weekday;
utils::app_state_reload(app_state).await?;
*changed = true;
}
Ok(())
}

View file

@ -1,106 +0,0 @@
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::{drivers, settings};
use rppal_pfd::PiFaceDigital;
use serde_derive::Deserialize;
use crate::driver::Driver;
#[derive(Clone, Debug, Deserialize)]
#[serde(default)]
#[allow(unused)]
pub struct Relay {
pub driver: Driver,
pub name: String,
pub number: Option<i64>,
pub pin: u8,
pub inverted: bool,
pub pulse: Option<u64>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default)]
#[allow(unused)]
pub struct Settings {
pub server: settings::Server,
pub database: String,
pub permissions: settings::Permissions,
pub logging: settings::Logging,
pub name: String,
pub relays: Vec<Relay>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
server: settings::Server::default(),
database: String::from("sqlite://emgauwa-controller.sqlite"),
permissions: settings::Permissions::default(),
logging: settings::Logging::default(),
name: String::from("Emgauwa Controller"),
relays: Vec::new(),
}
}
}
impl Default for Relay {
fn default() -> Self {
Relay {
driver: Driver::Gpio,
number: None,
name: String::from("Relay"),
pin: 0,
inverted: false,
pulse: None,
}
}
}
pub fn init() -> Result<Settings, EmgauwaError> {
let mut settings: Settings = settings::load("controller", "CONTROLLER")?;
for (num, relay) in settings.relays.iter_mut().enumerate() {
if relay.number.is_none() {
relay.number = Some(num as i64);
}
}
Ok(settings)
}
impl Settings {
pub fn get_relay(&self, number: i64) -> Option<&Relay> {
self.relays.iter().find(|r| r.number == Some(number))
}
pub fn relays_make_drivers(
&self,
pfd: &mut Option<PiFaceDigital>,
) -> Result<Vec<Box<dyn drivers::RelayDriver>>, EmgauwaError> {
let mut drivers = Vec::new();
for relay in &self.relays {
drivers.push(relay.make_driver(pfd)?);
}
Ok(drivers)
}
}
impl Relay {
pub fn make_driver(
&self,
pfd: &mut Option<PiFaceDigital>,
) -> Result<Box<dyn drivers::RelayDriver>, EmgauwaError> {
let driver: Box<dyn drivers::RelayDriver> = match self.driver {
Driver::Null => Box::new(drivers::NullDriver::new(self.pin)),
Driver::Gpio => Box::new(drivers::GpioDriver::new(self.pin, self.inverted)?),
Driver::PiFace => {
if pfd.is_none() {
*pfd = Some(drivers::PiFaceDriver::init_piface()?);
}
Box::new(drivers::PiFaceDriver::new(self.pin, pfd)?)
}
};
Ok(driver)
}
}

View file

@ -1,66 +0,0 @@
use std::sync::Arc;
use actix::Addr;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::Controller;
use emgauwa_common::types::RelayStates;
use tokio::sync::Notify;
use crate::app_state;
use crate::app_state::AppState;
pub async fn app_state_get_this(app_state: &Addr<AppState>) -> Result<Controller, EmgauwaError> {
app_state
.send(app_state::GetThis {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_get_relay_notifier(
app_state: &Addr<AppState>,
) -> Result<Arc<Notify>, EmgauwaError> {
app_state
.send(app_state::GetRelayNotifier {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_get_controller_notifier(
app_state: &Addr<AppState>,
) -> Result<Arc<Notify>, EmgauwaError> {
app_state
.send(app_state::GetControllerNotifier {})
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_reload(app_state: &Addr<AppState>) -> Result<(), EmgauwaError> {
app_state
.send(app_state::Reload {})
.await
.map_err(EmgauwaError::from)?
}
pub async fn app_state_update_relays_on(
app_state: &Addr<AppState>,
relay_states: RelayStates,
) -> Result<(), EmgauwaError> {
app_state
.send(app_state::UpdateRelayStates { relay_states })
.await
.map_err(EmgauwaError::from)
}
pub async fn app_state_relay_pulse(
app_state: &Addr<AppState>,
relay_number: i64,
duration: Option<u32>,
) -> Result<(), EmgauwaError> {
app_state
.send(app_state::RelayPulse {
relay_number,
duration,
})
.await
.map_err(EmgauwaError::from)?
}

View file

@ -1,247 +0,0 @@
use actix::Addr;
use emgauwa_common::constants::WEBSOCKET_RETRY_TIMEOUT;
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{Controller, Relay};
use emgauwa_common::types::{ControllerWsAction, ScheduleUid};
use futures::{future, pin_mut, SinkExt, StreamExt};
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use tokio::time;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, tungstenite};
use crate::app_state::AppState;
use crate::utils;
use crate::utils::{app_state_get_relay_notifier, app_state_get_this};
pub async fn run_ws_loop(pool: Pool<Sqlite>, app_state: Addr<AppState>, url: String) {
log::debug!("Spawned ws loop");
loop {
let run_result = run_websocket(pool.clone(), &app_state, &url).await;
if let Err(err) = run_result {
log::error!("Error running websocket: {}", err);
}
log::info!(
"Retrying to connect in {} seconds...",
WEBSOCKET_RETRY_TIMEOUT.as_secs()
);
time::sleep(WEBSOCKET_RETRY_TIMEOUT).await;
}
}
async fn run_websocket(
pool: Pool<Sqlite>,
app_state: &Addr<AppState>,
url: &str,
) -> Result<(), EmgauwaError> {
log::debug!("Trying to connect to {}", url);
match connect_async(url).await {
Ok(connection) => {
log::info!("Websocket connected");
let (ws_stream, _) = connection;
let (mut write, read) = ws_stream.split();
let ws_action = ControllerWsAction::Register(app_state_get_this(app_state).await?);
let ws_action_json = serde_json::to_string(&ws_action)?;
if let Err(err) = write.send(Message::text(ws_action_json)).await {
log::error!("Failed to register at websocket: {}", err);
return Ok(());
}
let (app_state_tx, app_state_rx) = futures_channel::mpsc::unbounded::<Message>();
tokio::spawn(read_app_state(app_state.clone(), app_state_tx));
let app_state_to_ws = app_state_rx.map(Ok).forward(write);
let read_handler = read.for_each(|msg| handle_message(pool.clone(), app_state, msg));
pin_mut!(app_state_to_ws, read_handler);
future::select(app_state_to_ws, read_handler).await;
log::warn!("Lost connection to websocket");
}
Err(err) => {
log::warn!("Failed to connect to websocket: {}", err,);
}
}
Ok(())
}
async fn read_app_state(
app_state: Addr<AppState>,
tx: futures_channel::mpsc::UnboundedSender<Message>,
) -> Result<(), EmgauwaError> {
let notifier = &*app_state_get_relay_notifier(&app_state).await?;
loop {
notifier.notified().await;
log::debug!("Relay change detected");
let this = app_state_get_this(&app_state).await?;
let relay_states = this.get_relay_states();
let ws_action = ControllerWsAction::RelayStates((this.c.uid, relay_states));
let ws_action_json = serde_json::to_string(&ws_action)?;
tx.unbounded_send(Message::text(ws_action_json))
.map_err(|_| {
EmgauwaError::Other(String::from(
"Failed to forward message from app state to websocket",
))
})?;
}
}
async fn handle_message(
pool: Pool<Sqlite>,
app_state: &Addr<AppState>,
message_result: Result<Message, tungstenite::Error>,
) {
let msg = match message_result {
Ok(msg) => msg,
Err(err) => {
log::error!("Error reading message: {}", err);
return;
}
};
if let Message::Text(text) = msg {
match serde_json::from_str(&text) {
Ok(action) => {
log::debug!("Received action: {:?}", action);
let mut pool_conn = match pool.acquire().await {
Ok(conn) => conn,
Err(err) => {
log::error!("Failed to acquire database connection: {:?}", err);
return;
}
};
let action_res = handle_action(&mut pool_conn, app_state, action).await;
if let Err(e) = action_res {
log::error!("Error handling action: {:?}", e);
}
}
Err(e) => {
log::error!("Error deserializing action: {:?}", e);
}
}
}
}
pub async fn handle_action(
conn: &mut PoolConnection<Sqlite>,
app_state: &Addr<AppState>,
action: ControllerWsAction,
) -> Result<(), EmgauwaError> {
let this = app_state_get_this(app_state).await?;
match action {
ControllerWsAction::Controller(controller) => {
handle_controller(conn, &this, controller).await?
}
ControllerWsAction::Relays(relays) => handle_relays(conn, &this, relays).await?,
ControllerWsAction::Schedules(schedules) => handle_schedules(conn, schedules).await?,
ControllerWsAction::RelayPulse((relay_num, duration)) => {
handle_relay_pulse(app_state, relay_num, duration).await?
}
_ => return Ok(()),
};
utils::app_state_reload(app_state).await
}
async fn handle_controller(
conn: &mut PoolConnection<Sqlite>,
this: &Controller,
controller: Controller,
) -> Result<(), EmgauwaError> {
if controller.c.uid != this.c.uid {
return Err(EmgauwaError::Other(String::from(
"Controller UID mismatch during update",
)));
}
DbController::get_by_uid(conn, &controller.c.uid)
.await?
.ok_or(DatabaseError::NotFound)?
.update(conn, controller.c.name.as_str(), this.c.relay_count)
.await?;
Ok(())
}
async fn handle_schedules(
conn: &mut PoolConnection<Sqlite>,
schedules: Vec<DbSchedule>,
) -> Result<(), EmgauwaError> {
let mut handled_uids = vec![
// on and off schedules are always present and should not be updated
ScheduleUid::On,
ScheduleUid::Off,
];
for schedule in schedules {
if handled_uids.contains(&schedule.uid) {
continue;
}
handled_uids.push(schedule.uid.clone());
log::debug!("Handling schedule: {:?}", schedule);
let schedule_db = DbSchedule::get_by_uid(conn, &schedule.uid).await?;
if let Some(schedule_db) = schedule_db {
schedule_db
.update(conn, schedule.name.as_str(), &schedule.periods)
.await?;
} else {
DbSchedule::create(
conn,
schedule.uid.clone(),
schedule.name.as_str(),
&schedule.periods,
)
.await?;
}
}
Ok(())
}
async fn handle_relays(
conn: &mut PoolConnection<Sqlite>,
this: &Controller,
relays: Vec<Relay>,
) -> Result<(), EmgauwaError> {
for relay in relays {
if relay.controller.uid != this.c.uid {
return Err(EmgauwaError::Other(String::from(
"Controller UID mismatch during relay update",
)));
}
let db_relay = DbRelay::get_by_controller_and_num(conn, &this.c, relay.r.number)
.await?
.ok_or(DatabaseError::NotFound)?;
db_relay.update(conn, relay.r.name.as_str()).await?;
handle_schedules(conn, relay.schedules.clone()).await?;
let mut schedules = Vec::new(); // We need to get the schedules from the database to have the right IDs
for schedule in relay.schedules {
schedules.push(
DbSchedule::get_by_uid(conn, &schedule.uid)
.await?
.ok_or(DatabaseError::NotFound)?,
);
}
DbJunctionRelaySchedule::set_schedules(conn, &db_relay, schedules.iter().collect()).await?;
}
Ok(())
}
async fn handle_relay_pulse(
app_state: &Addr<AppState>,
relay_num: i64,
duration: Option<u32>,
) -> Result<(), EmgauwaError> {
utils::app_state_relay_pulse(app_state, relay_num, duration).await
}

View file

@ -1,31 +0,0 @@
[package]
name = "emgauwa-core"
version = "0.5.0"
edition = "2021"
authors = ["Tobias Reisinger <tobias@msrg.cc>"]
[dependencies]
emgauwa-common = { path = "../emgauwa-common" }
actix = "0.13"
actix-web = "4.4"
actix-web-actors = "4.2"
actix-cors = "0.7"
utoipa = "4.2"
utoipa-swagger-ui = { version = "6.0", features = ["actix-web", "debug-embed"] }
log = "0.4"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.5", features = ["serde", "v4"] }
itertools = "0.12"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "macros", "chrono"] }
futures = "0.3"
tokio = { version = "1.36", features = ["rt", "rt-multi-thread"] }

View file

@ -1,15 +0,0 @@
use std::process::{exit, Command};
fn main() {
println!("cargo:rerun-if-changed=../api.v1.yaml");
let output = Command::new("sh")
.arg("-c")
.arg("yq . < ../api.v1.yaml > $OUT_DIR/api.v1.json")
.output()
.expect("Failed to convert api documentation to json");
if !output.status.success() {
eprintln!("Error: {}", String::from_utf8_lossy(&output.stderr));
exit(1);
}
}

View file

@ -1,188 +0,0 @@
use std::collections::HashMap;
use actix::{Actor, Addr, Context, Handler, Message, Recipient};
use emgauwa_common::db::DbController;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::models::{convert_db_list, Controller, Relay};
use emgauwa_common::types::{ControllerWsAction, EmgauwaUid, RelayStates};
use futures::executor::block_on;
use sqlx::{Pool, Sqlite};
use crate::handlers::v1::ws::relays::{RelaysWs, SendRelays};
#[derive(Message)]
#[rtype(result = "Result<(), EmgauwaError>")]
pub struct DisconnectController {
pub controller_uid: EmgauwaUid,
}
#[derive(Message)]
#[rtype(result = "Result<(), EmgauwaError>")]
pub struct ConnectController {
pub address: Recipient<ControllerWsAction>,
pub controller: Controller,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct UpdateRelayStates {
pub controller_uid: EmgauwaUid,
pub relay_states: RelayStates,
}
#[derive(Message)]
#[rtype(result = "Result<Vec<Relay>, EmgauwaError>")]
pub struct GetRelays {}
#[derive(Message)]
#[rtype(result = "Result<(), EmgauwaError>")]
pub struct Action {
pub controller_uid: EmgauwaUid,
pub action: ControllerWsAction,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ConnectRelayClient {
pub addr: Addr<RelaysWs>,
}
pub struct AppState {
pub pool: Pool<Sqlite>,
pub connected_controllers: HashMap<EmgauwaUid, (Controller, Recipient<ControllerWsAction>)>,
pub connected_relay_clients: Vec<Addr<RelaysWs>>,
}
impl AppState {
pub fn new(pool: Pool<Sqlite>) -> AppState {
AppState {
pool,
connected_controllers: HashMap::new(),
connected_relay_clients: Vec::new(),
}
}
fn get_relays(&self) -> Result<Vec<Relay>, EmgauwaError> {
let mut pool_conn = block_on(self.pool.acquire())?;
let db_controllers = block_on(DbController::get_all(&mut pool_conn))?;
let mut controllers: Vec<Controller> = convert_db_list(&mut pool_conn, db_controllers)?;
self.connected_controllers
.iter()
.for_each(|(uid, (connected_controller, _))| {
if let Some(c) = controllers.iter_mut().find(|c| c.c.uid == *uid) {
c.apply_relay_states(&connected_controller.get_relay_states());
}
});
let mut relays: Vec<Relay> = Vec::new();
controllers.iter().for_each(|c| {
relays.extend(c.relays.clone());
});
Ok(relays)
}
fn notify_relay_clients(&mut self) {
self.connected_relay_clients.retain(|addr| addr.connected());
match self.get_relays() {
Ok(relays) => match serde_json::to_string(&relays) {
Ok(json) => {
self.connected_relay_clients.iter_mut().for_each(|addr| {
let relays_json = json.clone();
addr.do_send(SendRelays { relays_json });
});
}
Err(err) => {
log::error!("Failed to serialize relays: {:?}", err);
}
},
Err(err) => {
log::error!("Failed to get relays: {:?}", err);
}
};
}
}
impl Actor for AppState {
type Context = Context<Self>;
}
impl Handler<DisconnectController> for AppState {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, msg: DisconnectController, _ctx: &mut Self::Context) -> Self::Result {
let mut pool_conn = block_on(self.pool.acquire())?;
if let Some((controller, address)) = self.connected_controllers.remove(&msg.controller_uid)
{
if let Err(err) = block_on(controller.c.update_active(&mut pool_conn, false)) {
log::error!(
"Failed to mark controller {} as inactive: {:?}",
controller.c.uid,
err
);
}
// TODO: why does the block_on(send()) version not return? The AppState will be stuck.
//block_on(address.send(ControllerWsAction::Disconnect))??;
address.do_send(ControllerWsAction::Disconnect);
}
Ok(())
}
}
impl Handler<ConnectController> for AppState {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, msg: ConnectController, _ctx: &mut Self::Context) -> Self::Result {
log::debug!("Connecting controller: {}", msg.controller.c.uid);
self.connected_controllers
.insert(msg.controller.c.uid.clone(), (msg.controller, msg.address));
Ok(())
}
}
impl Handler<UpdateRelayStates> for AppState {
type Result = ();
fn handle(&mut self, msg: UpdateRelayStates, _ctx: &mut Self::Context) -> Self::Result {
if let Some((controller, _)) = self.connected_controllers.get_mut(&msg.controller_uid) {
controller.apply_relay_states(&msg.relay_states);
}
self.notify_relay_clients();
}
}
impl Handler<GetRelays> for AppState {
type Result = Result<Vec<Relay>, EmgauwaError>;
fn handle(&mut self, _msg: GetRelays, _ctx: &mut Self::Context) -> Self::Result {
self.get_relays()
}
}
impl Handler<Action> for AppState {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, msg: Action, _ctx: &mut Self::Context) -> Self::Result {
log::debug!("Forwarding action: {:?}", msg.action);
if let Some((_, address)) = self.connected_controllers.get(&msg.controller_uid) {
// TODO: why does the block_on(send()) version not return? The AppState will be stuck.
//block_on(address.send(msg.action))?
address.do_send(msg.action);
Ok(())
} else {
Err(EmgauwaError::Connection(msg.controller_uid))
}
}
}
impl Handler<ConnectRelayClient> for AppState {
type Result = ();
fn handle(&mut self, msg: ConnectRelayClient, _ctx: &mut Self::Context) -> Self::Result {
self.connected_relay_clients.push(msg.addr);
}
}

View file

@ -1,37 +0,0 @@
use actix_web::{error, Error, HttpRequest, HttpResponse};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
pub mod v1;
enum EmgauwaJsonPayLoadError {
Error(error::JsonPayloadError),
}
impl Serialize for EmgauwaJsonPayLoadError {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut s = serializer.serialize_struct("error", 3)?;
s.serialize_field("type", "json-payload-error")?;
s.serialize_field("code", &400)?;
s.serialize_field(
"description",
&match self {
EmgauwaJsonPayLoadError::Error(err) => format!("{}", err),
},
)?;
s.end()
}
}
pub fn json_error_handler(err: error::JsonPayloadError, _: &HttpRequest) -> Error {
error::InternalError::from_response(
"",
HttpResponse::BadRequest()
.content_type("application/json")
.json(EmgauwaJsonPayLoadError::Error(err)),
)
.into()
}

View file

@ -1,92 +0,0 @@
use actix::Addr;
use actix_web::{delete, get, put, web, HttpResponse};
use emgauwa_common::db::DbController;
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{convert_db_list, Controller, FromDbModel};
use emgauwa_common::types::{ControllerWsAction, EmgauwaUid, RequestControllerUpdate};
use sqlx::{Pool, Sqlite};
use crate::app_state;
use crate::app_state::AppState;
#[get("/controllers")]
pub async fn index(pool: web::Data<Pool<Sqlite>>) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let db_controllers = DbController::get_all(&mut pool_conn).await?;
let controllers: Vec<Controller> = convert_db_list(&mut pool_conn, db_controllers)?;
Ok(HttpResponse::Ok().json(controllers))
}
#[get("/controllers/{controller_id}")]
pub async fn show(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
let controller = DbController::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let return_controller = Controller::from_db_model(&mut pool_conn, controller)?;
Ok(HttpResponse::Ok().json(return_controller))
}
#[put("/controllers/{controller_id}")]
pub async fn update(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
path: web::Path<(String,)>,
data: web::Json<RequestControllerUpdate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
let controller = DbController::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let controller = controller
.update(&mut pool_conn, data.name.as_str(), controller.relay_count)
.await?;
let return_controller = Controller::from_db_model(&mut pool_conn, controller)?;
app_state
.send(app_state::Action {
controller_uid: uid.clone(),
action: ControllerWsAction::Controller(return_controller.clone()),
})
.await??;
Ok(HttpResponse::Ok().json(return_controller))
}
#[delete("/controllers/{controller_id}")]
pub async fn delete(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
app_state
.send(app_state::DisconnectController {
controller_uid: uid.clone(),
})
.await??;
DbController::delete_by_uid(&mut pool_conn, uid).await?;
Ok(HttpResponse::Ok().json("controller got deleted"))
}

View file

@ -1,161 +0,0 @@
use actix::Addr;
use actix_web::{delete, get, post, put, web, HttpResponse};
use emgauwa_common::db::DbMacro;
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{convert_db_list, FromDbModel, Macro, MacroAction, Relay};
use emgauwa_common::types::{
ControllerWsAction, EmgauwaUid, RequestMacroCreate, RequestMacroExecute, RequestMacroUpdate,
};
use itertools::Itertools;
use sqlx::{Pool, Sqlite};
use crate::app_state;
use crate::app_state::AppState;
#[get("/macros")]
pub async fn index(pool: web::Data<Pool<Sqlite>>) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let db_macros = DbMacro::get_all(&mut pool_conn).await?;
let macros: Vec<Macro> = convert_db_list(&mut pool_conn, db_macros)?;
Ok(HttpResponse::Ok().json(macros))
}
#[get("/macros/{macro_id}")]
pub async fn show(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (macro_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(macro_uid.as_str())?;
let db_macro = DbMacro::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let return_macro = Macro::from_db_model(&mut pool_conn, db_macro)?;
Ok(HttpResponse::Ok().json(return_macro))
}
#[post("/macros")]
pub async fn add(
pool: web::Data<Pool<Sqlite>>,
data: web::Json<RequestMacroCreate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let new_macro = DbMacro::create(&mut pool_conn, EmgauwaUid::default(), &data.name).await?;
new_macro
.set_actions(&mut pool_conn, data.actions.as_slice())
.await?;
let return_macro = Macro::from_db_model(&mut pool_conn, new_macro)?;
Ok(HttpResponse::Created().json(return_macro))
}
#[put("/macros/{macro_id}")]
pub async fn update(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
data: web::Json<RequestMacroUpdate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (macro_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(macro_uid.as_str())?;
let db_macro = DbMacro::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
if let Some(name) = &data.name {
db_macro.update(&mut pool_conn, name).await?;
}
if let Some(actions) = &data.actions {
db_macro
.set_actions(&mut pool_conn, actions.as_slice())
.await?;
}
let return_macro = Macro::from_db_model(&mut pool_conn, db_macro)?;
Ok(HttpResponse::Ok().json(return_macro))
}
#[delete("/macros/{macro_id}")]
pub async fn delete(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (macro_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(macro_uid.as_str())?;
DbMacro::delete_by_uid(&mut pool_conn, uid).await?;
Ok(HttpResponse::Ok().json("macro got deleted"))
}
#[put("/macros/{macro_id}/execute")]
pub async fn execute(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
path: web::Path<(String,)>,
query: web::Query<RequestMacroExecute>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (macro_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(macro_uid.as_str())?;
let db_macro = DbMacro::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let actions_db = match query.weekday {
None => db_macro.get_actions(&mut pool_conn).await?,
Some(weekday) => {
db_macro
.get_actions_weekday(&mut pool_conn, weekday)
.await?
}
};
let mut actions: Vec<MacroAction> = convert_db_list(&mut pool_conn, actions_db)?;
for action in &actions {
action.execute(&mut pool_conn).await?;
}
let affected_controller_uids: Vec<EmgauwaUid> = actions
.iter()
.map(|action| action.relay.controller_id.clone())
.unique()
.collect();
for controller_uid in affected_controller_uids {
let mut affected_relays: Vec<Relay> = Vec::new();
let mut affected_relay_ids: Vec<i64> = Vec::new();
for action in actions.iter_mut() {
if affected_relay_ids.contains(&action.relay.r.id) {
continue;
}
action.relay.reload(&mut pool_conn)?;
affected_relays.push(action.relay.clone());
affected_relay_ids.push(action.relay.r.id);
}
app_state
.send(app_state::Action {
controller_uid,
action: ControllerWsAction::Relays(affected_relays.clone()),
})
.await??;
}
Ok(HttpResponse::Ok().finish()) // TODO add a message?
}

View file

@ -1,6 +0,0 @@
pub mod controllers;
pub mod macros;
pub mod relays;
pub mod schedules;
pub mod tags;
pub mod ws;

View file

@ -1,185 +0,0 @@
use actix::Addr;
use actix_web::{get, post, put, web, HttpResponse};
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbTag};
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{convert_db_list, FromDbModel, Relay};
use emgauwa_common::types::{
ControllerWsAction, EmgauwaUid, RequestRelayPulse, RequestRelayUpdate,
};
use emgauwa_common::utils;
use sqlx::{Pool, Sqlite};
use crate::app_state;
use crate::app_state::AppState;
#[get("/relays")]
pub async fn index(pool: web::Data<Pool<Sqlite>>) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let db_relays = DbRelay::get_all(&mut pool_conn).await?;
let relays: Vec<Relay> = convert_db_list(&mut pool_conn, db_relays)?;
Ok(HttpResponse::Ok().json(relays))
}
#[get("/relays/tag/{tag}")]
pub async fn tagged(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (tag,) = path.into_inner();
let tag_db = DbTag::get_by_tag(&mut pool_conn, &tag)
.await?
.ok_or(DatabaseError::NotFound)?;
let db_relays = DbRelay::get_by_tag(&mut pool_conn, &tag_db).await?;
let relays: Vec<Relay> = convert_db_list(&mut pool_conn, db_relays)?;
Ok(HttpResponse::Ok().json(relays))
}
#[get("/controllers/{controller_id}/relays")]
pub async fn index_for_controller(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid,) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
let controller = DbController::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let db_relays = controller.get_relays(&mut pool_conn).await?;
let relays: Vec<Relay> = convert_db_list(&mut pool_conn, db_relays)?;
Ok(HttpResponse::Ok().json(relays))
}
#[get("/controllers/{controller_id}/relays/{relay_num}")]
pub async fn show_for_controller(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String, i64)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid, relay_num) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
let controller = DbController::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num)
.await?
.ok_or(DatabaseError::NotFound)?;
let return_relay = Relay::from_db_model(&mut pool_conn, relay)?;
Ok(HttpResponse::Ok().json(return_relay))
}
#[put("/controllers/{controller_id}/relays/{relay_num}")]
pub async fn update_for_controller(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
path: web::Path<(String, i64)>,
data: web::Json<RequestRelayUpdate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid, relay_num) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
let controller = DbController::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let mut relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num)
.await?
.ok_or(DatabaseError::NotFound)?;
if let Some(name) = &data.name {
relay = relay.update(&mut pool_conn, name.as_str()).await?;
}
if let Some(schedule_uids) = &data.schedules {
if schedule_uids.len() == 7 {
let mut schedules = Vec::new();
for s_uid in schedule_uids {
schedules.push(s_uid.get_schedule(&mut pool_conn).await?);
}
DbJunctionRelaySchedule::set_schedules(
&mut pool_conn,
&relay,
schedules.iter().collect(),
)
.await?;
}
}
if let Some(s_uid) = &data.active_schedule {
let schedule = s_uid.get_schedule(&mut pool_conn).await?;
DbJunctionRelaySchedule::set_schedule(
&mut pool_conn,
&relay,
&schedule,
utils::get_weekday(),
)
.await?;
}
if let Some(tags) = &data.tags {
relay.set_tags(&mut pool_conn, tags.as_slice()).await?;
}
let relay = relay.reload(&mut pool_conn).await?;
let return_relay = Relay::from_db_model(&mut pool_conn, relay)?;
app_state
.send(app_state::Action {
controller_uid: uid,
action: ControllerWsAction::Relays(vec![return_relay.clone()]),
})
.await??;
Ok(HttpResponse::Ok().json(return_relay))
}
#[post("/controllers/{controller_id}/relays/{relay_num}/pulse")]
pub async fn pulse(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
path: web::Path<(String, i64)>,
data: web::Json<RequestRelayPulse>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (controller_uid, relay_num) = path.into_inner();
let uid = EmgauwaUid::try_from(controller_uid.as_str())?;
let controller = DbController::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let relay = DbRelay::get_by_controller_and_num(&mut pool_conn, &controller, relay_num)
.await?
.ok_or(DatabaseError::NotFound)?;
let duration = data.duration.filter(|&d| d > 0);
app_state
.send(app_state::Action {
controller_uid: uid,
action: ControllerWsAction::RelayPulse((relay.number, duration)),
})
.await??;
Ok(HttpResponse::Ok().finish()) // TODO add a message?
}

View file

@ -1,197 +0,0 @@
use actix::Addr;
use actix_web::{delete, get, post, put, web, HttpResponse};
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbSchedule, DbTag};
use emgauwa_common::errors::{ApiError, DatabaseError, EmgauwaError};
use emgauwa_common::models::{convert_db_list, FromDbModel, Schedule};
use emgauwa_common::types::{
ControllerWsAction, RequestScheduleCreate, RequestScheduleUpdate, ScheduleUid,
};
use itertools::Itertools;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use crate::app_state;
use crate::app_state::AppState;
#[get("/schedules")]
pub async fn index(pool: web::Data<Pool<Sqlite>>) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let db_schedules = DbSchedule::get_all(&mut pool_conn).await?;
let schedules: Vec<Schedule> = convert_db_list(&mut pool_conn, db_schedules)?;
Ok(HttpResponse::Ok().json(schedules))
}
#[get("/schedules/tag/{tag}")]
pub async fn tagged(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (tag,) = path.into_inner();
let tag_db = DbTag::get_by_tag(&mut pool_conn, &tag)
.await?
.ok_or(DatabaseError::NotFound)?;
let db_schedules = DbSchedule::get_by_tag(&mut pool_conn, &tag_db).await?;
let schedules: Vec<Schedule> = convert_db_list(&mut pool_conn, db_schedules)?;
Ok(HttpResponse::Ok().json(schedules))
}
#[get("/schedules/{schedule_id}")]
pub async fn show(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (schedule_uid,) = path.into_inner();
let uid = ScheduleUid::try_from(schedule_uid.as_str())?;
let schedule = DbSchedule::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let return_schedule = Schedule::from_db_model(&mut pool_conn, schedule)?;
Ok(HttpResponse::Ok().json(return_schedule))
}
#[post("/schedules")]
pub async fn add(
pool: web::Data<Pool<Sqlite>>,
data: web::Json<RequestScheduleCreate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let new_schedule = DbSchedule::create(
&mut pool_conn,
ScheduleUid::default(),
&data.name,
&data.periods,
)
.await?;
if let Some(tags) = &data.tags {
new_schedule
.set_tags(&mut pool_conn, tags.as_slice())
.await?;
}
let return_schedule = Schedule::from_db_model(&mut pool_conn, new_schedule)?;
Ok(HttpResponse::Created().json(return_schedule))
}
async fn add_list_single(
conn: &mut PoolConnection<Sqlite>,
request_schedule: &RequestScheduleCreate,
) -> Result<DbSchedule, DatabaseError> {
let new_schedule = DbSchedule::create(
conn,
ScheduleUid::default(),
&request_schedule.name,
&request_schedule.periods,
)
.await?;
if let Some(tags) = &request_schedule.tags {
new_schedule.set_tags(conn, tags.as_slice()).await?;
}
Ok(new_schedule)
}
#[post("/schedules/list")]
pub async fn add_list(
pool: web::Data<Pool<Sqlite>>,
data: web::Json<Vec<RequestScheduleCreate>>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let mut db_schedules: Vec<DbSchedule> = Vec::new();
for s in data.iter() {
let new_s = futures::executor::block_on(add_list_single(&mut pool_conn, s))?;
db_schedules.push(new_s);
}
let schedules: Vec<Schedule> = convert_db_list(&mut pool_conn, db_schedules)?;
Ok(HttpResponse::Created().json(schedules))
}
#[put("/schedules/{schedule_id}")]
pub async fn update(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
path: web::Path<(String,)>,
data: web::Json<RequestScheduleUpdate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (schedule_uid,) = path.into_inner();
let uid = ScheduleUid::try_from(schedule_uid.as_str())?;
let schedule = DbSchedule::get_by_uid(&mut pool_conn, &uid)
.await?
.ok_or(DatabaseError::NotFound)?;
let name = match &data.name {
None => schedule.name.as_str(),
Some(name) => name.as_str(),
};
let periods = match &data.periods {
None => &schedule.periods,
Some(period) => period,
};
let schedule = schedule.update(&mut pool_conn, name, periods).await?;
if let Some(tags) = &data.tags {
schedule.set_tags(&mut pool_conn, tags.as_slice()).await?;
}
let controller_ids: Vec<i64> = DbJunctionRelaySchedule::get_relays(&mut pool_conn, &schedule)
.await?
.into_iter()
.map(|r| r.controller_id)
.unique()
.collect();
for controller_id in controller_ids {
let controller = DbController::get(&mut pool_conn, controller_id)
.await?
.ok_or(DatabaseError::NotFound)?;
app_state
.send(app_state::Action {
controller_uid: controller.uid,
action: ControllerWsAction::Schedules(vec![schedule.clone()]),
})
.await??;
}
let return_schedule = Schedule::from_db_model(&mut pool_conn, schedule)?;
Ok(HttpResponse::Ok().json(return_schedule))
}
#[delete("/schedules/{schedule_id}")]
pub async fn delete(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (schedule_uid,) = path.into_inner();
let uid = ScheduleUid::try_from(schedule_uid.as_str())?;
match uid {
ScheduleUid::Off => Err(EmgauwaError::from(ApiError::ProtectedSchedule)),
ScheduleUid::On => Err(EmgauwaError::from(ApiError::ProtectedSchedule)),
ScheduleUid::Any(_) => {
DbSchedule::delete_by_uid(&mut pool_conn, uid).await?;
Ok(HttpResponse::Ok().json("schedule got deleted"))
}
}
}

View file

@ -1,61 +0,0 @@
use actix_web::{delete, get, post, web, HttpResponse};
use emgauwa_common::db::DbTag;
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{FromDbModel, Tag};
use emgauwa_common::types::RequestTagCreate;
use sqlx::{Pool, Sqlite};
#[get("/tags")]
pub async fn index(pool: web::Data<Pool<Sqlite>>) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let db_tags = DbTag::get_all(&mut pool_conn).await?;
let tags: Vec<String> = db_tags.iter().map(|t| t.tag.clone()).collect();
Ok(HttpResponse::Ok().json(tags))
}
#[get("/tags/{tag_name}")]
pub async fn show(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (tag_name,) = path.into_inner();
let tag = DbTag::get_by_tag(&mut pool_conn, &tag_name)
.await?
.ok_or(DatabaseError::NotFound)?;
let return_tag = Tag::from_db_model(&mut pool_conn, tag)?;
Ok(HttpResponse::Ok().json(return_tag))
}
#[delete("/tags/{tag_name}")]
pub async fn delete(
pool: web::Data<Pool<Sqlite>>,
path: web::Path<(String,)>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let (tag_name,) = path.into_inner();
DbTag::delete_by_tag(&mut pool_conn, &tag_name).await?;
Ok(HttpResponse::Ok().json("tag got deleted"))
}
#[post("/tags")]
pub async fn add(
pool: web::Data<Pool<Sqlite>>,
data: web::Json<RequestTagCreate>,
) -> Result<HttpResponse, EmgauwaError> {
let mut pool_conn = pool.acquire().await?;
let new_tag = DbTag::create(&mut pool_conn, &data.tag).await?;
let cache = (Vec::new(), Vec::new()); // a new tag can't have any relays or schedules
let return_tag = Tag::from_db_model_cache(&mut pool_conn, new_tag, cache)?;
Ok(HttpResponse::Created().json(return_tag))
}

View file

@ -1,114 +0,0 @@
use actix::{Actor, AsyncContext};
use emgauwa_common::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
use emgauwa_common::errors::{DatabaseError, EmgauwaError};
use emgauwa_common::models::{Controller, FromDbModel};
use emgauwa_common::types::{ControllerWsAction, EmgauwaUid, RelayStates};
use emgauwa_common::utils;
use futures::executor::block_on;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use crate::app_state::{Action, ConnectController, UpdateRelayStates};
use crate::handlers::v1::ws::controllers::ControllersWs;
impl ControllersWs {
pub fn handle_register(
&mut self,
conn: &mut PoolConnection<Sqlite>,
ctx: &mut <ControllersWs as Actor>::Context,
controller: Controller,
) -> Result<(), EmgauwaError> {
log::info!(
"Registering controller: {} ({})",
controller.c.name,
controller.c.uid
);
let c = &controller.c;
let controller_db = block_on(DbController::get_by_uid_or_create(
conn,
&c.uid,
&c.name,
c.relay_count,
))?;
block_on(controller_db.update_active(conn, true))?;
// update only the relay count
block_on(controller_db.update(conn, &controller_db.name, c.relay_count))?;
for relay in &controller.relays {
log::debug!(
"Registering relay: {} ({})",
relay.r.name,
match relay.is_on {
Some(true) => "+",
Some(false) => "-",
None => "?",
}
);
let (new_relay, created) = block_on(DbRelay::get_by_controller_and_num_or_create(
conn,
&controller_db,
relay.r.number,
&relay.r.name,
))?;
if created {
let mut relay_schedules = Vec::new();
for schedule in &relay.schedules {
let (new_schedule, _) = block_on(DbSchedule::get_by_uid_or_create(
conn,
schedule.uid.clone(),
&schedule.name,
&schedule.periods,
))?;
relay_schedules.push(new_schedule);
}
block_on(DbJunctionRelaySchedule::set_schedules(
conn,
&new_relay,
relay_schedules.iter().collect(),
))?;
}
}
let controller_uid = &controller.c.uid;
let controller_db = block_on(DbController::get_by_uid(conn, controller_uid))?
.ok_or(DatabaseError::InsertGetError)?;
let controller = Controller::from_db_model(conn, controller_db)?;
let addr = ctx.address();
self.controller_uid = Some(controller_uid.clone());
block_on(self.app_state.send(ConnectController {
address: addr.recipient(),
controller: controller.clone(),
}))??;
block_on(self.app_state.send(Action {
controller_uid: controller_uid.clone(),
action: ControllerWsAction::Controller(controller.clone()),
}))??;
block_on(self.app_state.send(Action {
controller_uid: controller_uid.clone(),
action: ControllerWsAction::Relays(controller.relays),
}))??;
log::debug!("Done registering controller");
Ok(())
}
pub fn handle_relay_states(
&mut self,
controller_uid: EmgauwaUid,
relay_states: RelayStates,
) -> Result<(), EmgauwaError> {
log::debug!(
"Received relay states: {} for {}",
utils::printable_relay_states(&relay_states),
controller_uid
);
block_on(self.app_state.send(UpdateRelayStates {
controller_uid,
relay_states,
}))?;
Ok(())
}
}

View file

@ -1,154 +0,0 @@
mod handlers;
use std::time::Instant;
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use actix_web_actors::ws;
use actix_web_actors::ws::ProtocolError;
use emgauwa_common::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT};
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::types::{ControllerWsAction, EmgauwaUid};
use futures::executor::block_on;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use ws::Message;
use crate::app_state::{AppState, DisconnectController};
use crate::utils::flatten_result;
pub struct ControllersWs {
pub pool: Pool<Sqlite>,
pub controller_uid: Option<EmgauwaUid>,
pub app_state: Addr<AppState>,
pub hb: Instant,
}
impl Actor for ControllersWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
if let Some(controller_uid) = &self.controller_uid {
let flat_res = flatten_result(
block_on(self.app_state.send(DisconnectController {
controller_uid: controller_uid.clone(),
}))
.map_err(EmgauwaError::from),
);
if let Err(err) = flat_res {
log::error!("Error disconnecting controller: {:?}", err);
}
}
}
}
impl ControllersWs {
pub fn handle_action(
&mut self,
conn: &mut PoolConnection<Sqlite>,
ctx: &mut <ControllersWs as Actor>::Context,
action: ControllerWsAction,
) {
let action_res = match action {
ControllerWsAction::Register(controller) => self.handle_register(conn, ctx, controller),
ControllerWsAction::RelayStates((controller_uid, relay_states)) => {
self.handle_relay_states(controller_uid, relay_states)
}
_ => Ok(()),
};
if let Err(e) = action_res {
log::error!("Error handling action: {:?}", e);
ctx.text(
serde_json::to_string(&e).unwrap_or(format!("Error in handling action: {:?}", e)),
);
}
}
// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL).
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > HEARTBEAT_TIMEOUT {
log::warn!("Websocket Controller heartbeat failed, disconnecting!");
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(&[]);
});
}
}
impl Handler<ControllerWsAction> for ControllersWs {
type Result = Result<(), EmgauwaError>;
fn handle(&mut self, action: ControllerWsAction, ctx: &mut Self::Context) -> Self::Result {
match action {
ControllerWsAction::Disconnect => {
ctx.close(None);
ctx.stop();
}
_ => {
let action_json = serde_json::to_string(&action)?;
ctx.text(action_json);
}
}
Ok(())
}
}
impl StreamHandler<Result<Message, ProtocolError>> for ControllersWs {
fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
let mut pool_conn = match block_on(self.pool.acquire()) {
Ok(conn) => conn,
Err(err) => {
log::error!("Failed to acquire database connection: {:?}", err);
ctx.stop();
return;
}
};
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
match msg {
Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg)
}
Message::Pong(_) => {
self.hb = Instant::now();
}
Message::Text(text) => match serde_json::from_str(&text) {
Ok(action) => {
self.handle_action(&mut pool_conn, ctx, action);
}
Err(e) => {
log::error!("Error deserializing action: {:?}", e);
ctx.text(
serde_json::to_string(&EmgauwaError::Serialization(e))
.unwrap_or(String::from("Error in deserializing action")),
);
}
},
Message::Binary(_) => log::warn!("Received unexpected binary in controller ws"),
Message::Close(reason) => {
ctx.close(reason);
ctx.stop();
}
Message::Continuation(_) => {
ctx.stop();
}
Message::Nop => (),
}
}
}

View file

@ -1,53 +0,0 @@
use std::time::Instant;
use actix::Addr;
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use emgauwa_common::errors::EmgauwaError;
use sqlx::{Pool, Sqlite};
use crate::app_state::AppState;
use crate::handlers::v1::ws::controllers::ControllersWs;
use crate::handlers::v1::ws::relays::RelaysWs;
pub mod controllers;
pub mod relays;
#[get("/ws/controllers")]
pub async fn ws_controllers(
pool: web::Data<Pool<Sqlite>>,
app_state: web::Data<Addr<AppState>>,
req: HttpRequest,
stream: web::Payload,
) -> Result<HttpResponse, EmgauwaError> {
let resp = ws::start(
ControllersWs {
pool: pool.get_ref().clone(),
controller_uid: None,
app_state: app_state.get_ref().clone(),
hb: Instant::now(),
},
&req,
stream,
)
.map_err(|_| EmgauwaError::Internal(String::from("error starting websocket")));
resp
}
#[get("/ws/relays")]
pub async fn ws_relays(
app_state: web::Data<Addr<AppState>>,
req: HttpRequest,
stream: web::Payload,
) -> Result<HttpResponse, EmgauwaError> {
let resp = ws::start(
RelaysWs {
app_state: app_state.get_ref().clone(),
hb: Instant::now(),
},
&req,
stream,
)
.map_err(|_| EmgauwaError::Internal(String::from("error starting websocket")));
resp
}

View file

@ -1,106 +0,0 @@
use std::time::Instant;
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler};
use actix_web_actors::ws;
use actix_web_actors::ws::ProtocolError;
use emgauwa_common::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT};
use emgauwa_common::errors::EmgauwaError;
use futures::executor::block_on;
use crate::app_state::{AppState, ConnectRelayClient};
pub struct RelaysWs {
pub app_state: Addr<AppState>,
pub hb: Instant,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct SendRelays {
pub relays_json: String,
}
impl Actor for RelaysWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// get unique id for ctx
match self.get_relays_json() {
Ok(relays_json) => {
ctx.text(relays_json);
self.hb(ctx);
block_on(self.app_state.send(ConnectRelayClient {
addr: ctx.address(),
}))
.unwrap();
}
Err(err) => {
log::error!("Error getting relays: {:?}", err);
ctx.stop();
}
}
}
}
impl RelaysWs {
fn get_relays_json(&self) -> Result<String, EmgauwaError> {
let relays = block_on(self.app_state.send(crate::app_state::GetRelays {}))??;
serde_json::to_string(&relays).map_err(EmgauwaError::from)
}
// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL).
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > HEARTBEAT_TIMEOUT {
log::debug!("Websocket Relay heartbeat failed, disconnecting!");
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(&[]);
});
}
}
impl StreamHandler<Result<ws::Message, ProtocolError>> for RelaysWs {
fn handle(&mut self, msg: Result<ws::Message, ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
match msg {
ws::Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg)
}
ws::Message::Pong(_) => {
self.hb = Instant::now();
}
ws::Message::Text(_) => log::debug!("Received unexpected text in relays ws"),
ws::Message::Binary(_) => log::debug!("Received unexpected binary in relays ws"),
ws::Message::Close(reason) => {
ctx.close(reason);
ctx.stop();
}
ws::Message::Continuation(_) => {
ctx.stop();
}
ws::Message::Nop => (),
}
}
}
impl Handler<SendRelays> for RelaysWs {
type Result = ();
fn handle(&mut self, msg: SendRelays, ctx: &mut Self::Context) -> Self::Result {
ctx.text(msg.relays_json);
}
}

View file

@ -1,124 +0,0 @@
use std::net::TcpListener;
use actix::{Actor, Arbiter};
use actix_cors::Cors;
use actix_web::middleware::TrailingSlash;
use actix_web::{middleware, web, App, HttpServer};
use emgauwa_common::db::DbController;
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::utils::{drop_privileges, init_logging};
use serde_json::json;
use utoipa_swagger_ui::SwaggerUi;
use crate::app_state::AppState;
mod app_state;
mod handlers;
mod settings;
mod utils;
#[actix_web::main]
async fn main() -> Result<(), std::io::Error> {
let settings = settings::init()?;
let listener = TcpListener::bind(format!("{}:{}", settings.server.host, settings.server.port))?;
drop_privileges(&settings.permissions)?;
init_logging(&settings.logging.level)?;
let pool = emgauwa_common::db::init(&settings.database).await?;
let mut conn = pool.acquire().await.map_err(EmgauwaError::from)?;
DbController::all_inactive(&mut conn)
.await
.map_err(EmgauwaError::from)?;
conn.close().await.map_err(EmgauwaError::from)?;
let app_state_arbiter = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
});
let app_state_pool = pool.clone();
let app_state = Actor::start_in_arbiter(&app_state_arbiter.handle(), move |_| {
AppState::new(app_state_pool)
});
log::info!(
"Starting server on {}:{}",
settings.server.host,
settings.server.port
);
HttpServer::new(move || {
let cors = Cors::default().allow_any_method().allow_any_header();
let origins = settings.origins.clone();
let cors = match settings.origins.is_empty() {
true => cors.allow_any_origin(),
false => cors.allowed_origin_fn(move |origin, _req_head| {
origins.contains(&origin.to_str().unwrap_or_default().to_string())
}),
};
let api_default = json!({
"openapi": "3.0.0",
"info": {
"version": "0.0.0",
"title": "Failed to load API documentation",
}
});
let api_v1_json =
serde_json::from_str(include_str!(concat!(env!("OUT_DIR"), "/api.v1.json")))
.unwrap_or(api_default.clone());
App::new()
.wrap(cors)
.wrap(middleware::Logger::default())
.app_data(web::JsonConfig::default().error_handler(handlers::json_error_handler))
.app_data(web::Data::new(pool.clone()))
.app_data(web::Data::new(app_state.clone()))
.service(
SwaggerUi::new("/api/docs/{_:.*}")
.external_urls_from_iter_unchecked([("/api/v1.json", api_v1_json)]),
)
.service(
web::scope("/api/v1")
.wrap(middleware::NormalizePath::new(TrailingSlash::Trim))
.service(handlers::v1::controllers::index)
.service(handlers::v1::controllers::show)
.service(handlers::v1::controllers::update)
.service(handlers::v1::controllers::delete)
.service(handlers::v1::relays::index)
.service(handlers::v1::relays::tagged)
.service(handlers::v1::relays::index_for_controller)
.service(handlers::v1::relays::show_for_controller)
.service(handlers::v1::relays::update_for_controller)
.service(handlers::v1::relays::pulse)
.service(handlers::v1::schedules::index)
.service(handlers::v1::schedules::tagged)
.service(handlers::v1::schedules::show)
.service(handlers::v1::schedules::add)
.service(handlers::v1::schedules::add_list)
.service(handlers::v1::schedules::update)
.service(handlers::v1::schedules::delete)
.service(handlers::v1::tags::index)
.service(handlers::v1::tags::show)
.service(handlers::v1::tags::delete)
.service(handlers::v1::tags::add)
.service(handlers::v1::macros::index)
.service(handlers::v1::macros::show)
.service(handlers::v1::macros::add)
.service(handlers::v1::macros::update)
.service(handlers::v1::macros::delete)
.service(handlers::v1::macros::execute)
.service(handlers::v1::ws::ws_controllers)
.service(handlers::v1::ws::ws_relays),
)
})
.listen(listener)?
.run()
.await
}

View file

@ -1,32 +0,0 @@
use emgauwa_common::errors::EmgauwaError;
use emgauwa_common::settings;
use serde_derive::Deserialize;
#[derive(Clone, Debug, Deserialize)]
#[serde(default)]
#[allow(unused)]
pub struct Settings {
pub server: settings::Server,
pub database: String,
pub permissions: settings::Permissions,
pub logging: settings::Logging,
pub origins: Vec<String>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
server: settings::Server::default(),
database: String::from("sqlite://emgauwa-core.sqlite"),
permissions: settings::Permissions::default(),
logging: settings::Logging::default(),
origins: Vec::new(),
}
}
}
pub fn init() -> Result<Settings, EmgauwaError> {
settings::load("core", "CORE")
}

View file

@ -1,7 +0,0 @@
pub fn flatten_result<T, E>(res: Result<Result<T, E>, E>) -> Result<T, E> {
match res {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(e),
Err(e) => Err(e),
}
}