use std::str; use crate::relay_loop::run_relay_loop; use crate::settings::Settings; use emgauwa_lib::db::{DbController, DbRelay}; use emgauwa_lib::handlers::v1::ws::controllers::ControllerWsAction; use emgauwa_lib::models::convert_db_list; use emgauwa_lib::types::ControllerUid; use emgauwa_lib::{db, models}; use futures::channel::mpsc; use futures::{future, pin_mut, SinkExt, StreamExt}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use tokio::io::AsyncReadExt; use tokio_tungstenite::tungstenite::Error; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; mod driver; mod relay_loop; mod settings; async fn create_this_controller( conn: &mut PoolConnection, settings: &Settings, ) -> DbController { DbController::create( conn, &ControllerUid::default(), &settings.name, i64::try_from(settings.relays.len()).expect("Too many relays"), true, ) .await .expect("Failed to create controller") } async fn create_this_relay( conn: &mut PoolConnection, this_controller: &DbController, settings_relay: &settings::Relay, ) -> DbRelay { DbRelay::create( conn, &settings_relay.name, settings_relay.number.unwrap(), this_controller, ) .await .expect("Failed to create relay") } #[tokio::main] async fn main() { let settings = settings::init(); let pool = db::init(&settings.database).await; let mut conn = pool.acquire().await.unwrap(); let db_controller = DbController::get_all(&mut conn) .await .expect("Failed to get controller from database") .pop() .unwrap_or_else(|| { futures::executor::block_on(create_this_controller(&mut conn, &settings)) }); let db_relays: Vec = settings .relays .iter() .map(|relay| { futures::executor::block_on(async { match DbRelay::get_by_controller_and_num( &mut conn, &db_controller, relay.number.unwrap(), ) .await .expect("Failed to get relay from database") { None => create_this_relay(&mut conn, &db_controller, relay).await, Some(relay) => relay, } }) }) .collect(); let db_controller = db_controller .update(&mut conn, &db_controller.name, db_relays.len() as i64, true) .await .unwrap(); let relays = convert_db_list(&mut conn, db_relays).expect("Failed to convert relays"); let this = models::Controller { controller: db_controller, relays, }; let url = format!( "ws://{}:{}/api/v1/ws/controllers", settings.core.host, settings.core.port ); let (stdin_tx, stdin_rx) = mpsc::unbounded(); tokio::spawn(read_stdin(stdin_tx)); let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); let (mut write, read) = ws_stream.split(); let ws_action = ControllerWsAction::Register(this); println!("Sending action: {:?}", ws_action); let ws_action_json = serde_json::to_string(&ws_action).unwrap(); println!("Sending json: {}", ws_action_json); write.send(Message::text(ws_action_json)).await.unwrap(); let ws_to_stdout = read.for_each(handle_message); let stdin_to_ws = stdin_rx.map(Ok).forward(write); tokio::spawn(run_relay_loop(settings)); pin_mut!(stdin_to_ws, ws_to_stdout); future::select(stdin_to_ws, ws_to_stdout).await; } // Our helper method which will read data from stdin and send it along the // sender provided. async fn read_stdin(tx: mpsc::UnboundedSender) { let mut stdin = tokio::io::stdin(); loop { let mut buf = vec![0; 1024]; let n = match stdin.read(&mut buf).await { Err(_) | Ok(0) => break, Ok(n) => n, }; buf.truncate(n); tx.unbounded_send(Message::text(str::from_utf8(&buf).unwrap())) .unwrap(); } } pub async fn handle_message(message_result: Result) { match message_result { Ok(message) => println!("{}", message.into_text().unwrap()), Err(err) => println!("Error: {}", err), } }