Add WIP websocket to controller and refactor settings

This commit is contained in:
Tobias Reisinger 2023-11-23 16:00:24 +01:00
parent 452454f9e8
commit 32c75ad73a
Signed by: serguzim
GPG key ID: 13AD60C237A28DFE
11 changed files with 101 additions and 33 deletions

BIN
Cargo.lock generated

Binary file not shown.

11
Makefile Normal file
View file

@ -0,0 +1,11 @@
build:
cargo build
sqlx: build
cargo sqlx database create
cargo sqlx migrate run
cargo sqlx prepare
build-rpi:
cross build --target arm-unknown-linux-gnueabihf

9
emgauwa-controller.toml Normal file
View file

@ -0,0 +1,9 @@
database = "sqlite://emgauwa-controller.sqlite"
[core]
port = 4419
host = "127.0.0.1"
[logging]
level = "DEBUG"
file = "stdout"

View file

@ -7,8 +7,7 @@ authors = ["Tobias Reisinger <tobias@msrg.cc>"]
[dependencies] [dependencies]
emgauwa-lib = { path = "../emgauwa-lib" } emgauwa-lib = { path = "../emgauwa-lib" }
config = "0.13" tokio = { version = "1.34", features = ["io-std", "macros", "rt-multi-thread"] }
tokio-tungstenite = "0.20" tokio-tungstenite = "0.20"
simple_logger = "4.2" simple_logger = "4.2"
@ -21,4 +20,4 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
futures = "0.3.29" futures = "0.3"

View file

@ -1,5 +1,53 @@
use std::str;
use futures::{future, pin_mut, StreamExt};
use futures::channel::mpsc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
mod settings; mod settings;
fn main() { #[tokio::main]
let _settings = settings::init(); async fn main() {
let settings = settings::init();
let url = format!(
"ws://{}:{}/api/v1/ws/controllers",
settings.core.host,
settings.core.port
);
let (stdin_tx, stdin_rx) = mpsc::unbounded();
tokio::spawn(read_stdin(stdin_tx));
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
let (write, read) = ws_stream.split();
let stdin_to_ws = stdin_rx.map(Ok).forward(write);
let ws_to_stdout = {
read.for_each(|message| async {
let data = message.unwrap().into_text().unwrap();
println!("{}", data);
})
};
pin_mut!(stdin_to_ws, ws_to_stdout);
future::select(stdin_to_ws, ws_to_stdout).await;
} }
// Our helper method which will read data from stdin and send it along the
// sender provided.
async fn read_stdin(tx: mpsc::UnboundedSender<Message>) {
let mut stdin = tokio::io::stdin();
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf).await {
Err(_) | Ok(0) => break,
Ok(n) => n,
};
buf.truncate(n);
tx.unbounded_send(Message::text(str::from_utf8(&buf).unwrap())).unwrap();
}
}

View file

@ -1,5 +1,4 @@
use config::Config; use emgauwa_lib::{constants, utils};
use emgauwa_lib::constants;
use serde_derive::Deserialize; use serde_derive::Deserialize;
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
@ -22,6 +21,7 @@ pub struct Logging {
#[serde(default)] #[serde(default)]
#[allow(unused)] #[allow(unused)]
pub struct Settings { pub struct Settings {
pub core: Core,
pub database: String, pub database: String,
pub logging: Logging, pub logging: Logging,
} }
@ -29,6 +29,7 @@ pub struct Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
core: Core::default(),
database: String::from("sqlite://emgauwa-controller.sqlite"), database: String::from("sqlite://emgauwa-controller.sqlite"),
logging: Logging::default(), logging: Logging::default(),
} }
@ -54,15 +55,5 @@ impl Default for Logging {
} }
pub fn init() -> Settings { pub fn init() -> Settings {
Config::builder() utils::load_settings("controller", "CONTROLLER")
.add_source(config::File::with_name("emgauwa-controller"))
.add_source(
config::Environment::with_prefix("EMGAUWA_CONTROLLER")
.prefix_separator("_")
.separator("__"),
)
.build()
.unwrap()
.try_deserialize::<Settings>()
.unwrap_or_else(|_| panic!("Error reading settings."))
} }

View file

@ -11,8 +11,6 @@ actix = "0.13"
actix-web = "4.4" actix-web = "4.4"
actix-web-actors = "4.2" actix-web-actors = "4.2"
config = "0.13"
simple_logger = "4.2" simple_logger = "4.2"
log = "0.4" log = "0.4"

View file

@ -1,5 +1,4 @@
use config::Config; use emgauwa_lib::{constants, utils};
use emgauwa_lib::constants;
use serde_derive::Deserialize; use serde_derive::Deserialize;
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
@ -41,15 +40,5 @@ impl Default for Logging {
} }
pub fn init() -> Settings { pub fn init() -> Settings {
Config::builder() utils::load_settings("core", "CORE")
.add_source(config::File::with_name("emgauwa-core"))
.add_source(
config::Environment::with_prefix("EMGAUWA_CORE")
.prefix_separator("__")
.separator("__"),
)
.build()
.unwrap()
.try_deserialize::<Settings>()
.expect("Error reading settings.")
} }

View file

@ -14,6 +14,8 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
config = "0.13"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
sqlx = { version = "0.7", features = ["sqlite", "runtime-async-std", "macros", "chrono"] } sqlx = { version = "0.7", features = ["sqlite", "runtime-async-std", "macros", "chrono"] }

View file

@ -25,6 +25,7 @@ impl StreamHandler<Result<Message, ProtocolError>> for ControllerWs {
fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
let schedules = futures::executor::block_on(get_schedules(&mut self.pool)).unwrap(); let schedules = futures::executor::block_on(get_schedules(&mut self.pool)).unwrap();
let schedules_json = serde_json::to_string(&schedules).unwrap(); let schedules_json = serde_json::to_string(&schedules).unwrap();
println!("{:?}", msg);
match msg { match msg {
Ok(Message::Ping(msg)) => ctx.pong(&msg), Ok(Message::Ping(msg)) => ctx.pong(&msg),
Ok(Message::Text(_text)) => ctx.text(schedules_json), Ok(Message::Text(_text)) => ctx.text(schedules_json),

View file

@ -1,3 +1,23 @@
pub fn vec_has_error<T, E>(target: &[Result<T, E>]) -> bool { pub fn vec_has_error<T, E>(target: &[Result<T, E>]) -> bool {
target.iter().any(|t| t.is_err()) target.iter().any(|t| t.is_err())
} }
pub fn load_settings<T>(config_name: &str, env_prefix: &str) -> T
where
for<'de> T: serde::Deserialize<'de>
{
let default_file = config::File::with_name(&format!("emgauwa-{}", config_name))
.required(false);
config::Config::builder()
.add_source(default_file)
.add_source(
config::Environment::with_prefix(&format!("EMGAUWA_{}", env_prefix))
.prefix_separator("__")
.separator("__"),
)
.build()
.expect("Error building settings")
.try_deserialize::<T>()
.expect("Error reading settings")
}