Refactor more stuff
This commit is contained in:
		
							parent
							
								
									5a7b2de0ea
								
							
						
					
					
						commit
						9394a1ae52
					
				
					 15 changed files with 167 additions and 86 deletions
				
			
		| 
						 | 
				
			
			@ -1,36 +1,42 @@
 | 
			
		|||
use std::collections::HashMap;
 | 
			
		||||
use std::sync::{Arc, Mutex};
 | 
			
		||||
 | 
			
		||||
use actix::{Actor, Context, Handler, Message, Recipient};
 | 
			
		||||
use emgauwa_lib::errors::DatabaseError;
 | 
			
		||||
use emgauwa_lib::errors::EmgauwaError;
 | 
			
		||||
use emgauwa_lib::models::Controller;
 | 
			
		||||
use emgauwa_lib::types::{ControllerUid, ControllerWsAction};
 | 
			
		||||
use futures::executor::block_on;
 | 
			
		||||
use sqlx::{Pool, Sqlite};
 | 
			
		||||
 | 
			
		||||
#[derive(Message)]
 | 
			
		||||
#[rtype(result = "Result<(), DatabaseError>")]
 | 
			
		||||
#[rtype(result = "Result<(), EmgauwaError>")]
 | 
			
		||||
pub struct DisconnectController {
 | 
			
		||||
	pub controller_uid: ControllerUid,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Message)]
 | 
			
		||||
#[rtype(result = "Result<(), DatabaseError>")]
 | 
			
		||||
#[rtype(result = "Result<(), EmgauwaError>")]
 | 
			
		||||
pub struct ConnectController {
 | 
			
		||||
	pub address: Recipient<ControllerWsAction>,
 | 
			
		||||
	pub controller: Controller,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Message)]
 | 
			
		||||
#[rtype(result = "Result<(), EmgauwaError>")]
 | 
			
		||||
pub struct Action {
 | 
			
		||||
	pub controller_uid: ControllerUid,
 | 
			
		||||
	pub action: ControllerWsAction,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct AppServer {
 | 
			
		||||
	pub pool: Pool<Sqlite>,
 | 
			
		||||
	pub connected_controllers: Arc<Mutex<HashMap<ControllerUid, Controller>>>,
 | 
			
		||||
	pub connected_controllers: HashMap<ControllerUid, (Controller, Recipient<ControllerWsAction>)>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl AppServer {
 | 
			
		||||
	pub fn new(pool: Pool<Sqlite>) -> AppServer {
 | 
			
		||||
		AppServer {
 | 
			
		||||
			pool,
 | 
			
		||||
			connected_controllers: Arc::new(Mutex::new(HashMap::new())),
 | 
			
		||||
			connected_controllers: HashMap::new(),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -40,13 +46,12 @@ impl Actor for AppServer {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl Handler<DisconnectController> for AppServer {
 | 
			
		||||
	type Result = Result<(), DatabaseError>;
 | 
			
		||||
	type Result = Result<(), EmgauwaError>;
 | 
			
		||||
 | 
			
		||||
	fn handle(&mut self, msg: DisconnectController, _ctx: &mut Self::Context) -> Self::Result {
 | 
			
		||||
		let mut pool_conn = block_on(self.pool.acquire()).unwrap();
 | 
			
		||||
		let mut data = self.connected_controllers.lock().unwrap();
 | 
			
		||||
		let mut pool_conn = block_on(self.pool.acquire())?;
 | 
			
		||||
 | 
			
		||||
		if let Some(controller) = data.remove(&msg.controller_uid) {
 | 
			
		||||
		if let Some((controller, _)) = self.connected_controllers.remove(&msg.controller_uid) {
 | 
			
		||||
			if let Err(err) = block_on(controller.c.update_active(&mut pool_conn, false)) {
 | 
			
		||||
				log::error!(
 | 
			
		||||
					"Failed to mark controller {} as inactive: {:?}",
 | 
			
		||||
| 
						 | 
				
			
			@ -60,12 +65,24 @@ impl Handler<DisconnectController> for AppServer {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl Handler<ConnectController> for AppServer {
 | 
			
		||||
	type Result = Result<(), DatabaseError>;
 | 
			
		||||
	type Result = Result<(), EmgauwaError>;
 | 
			
		||||
 | 
			
		||||
	fn handle(&mut self, msg: ConnectController, _ctx: &mut Self::Context) -> Self::Result {
 | 
			
		||||
		let mut data = self.connected_controllers.lock().unwrap();
 | 
			
		||||
		data.insert(msg.controller.c.uid.clone(), msg.controller);
 | 
			
		||||
		self.connected_controllers
 | 
			
		||||
			.insert(msg.controller.c.uid.clone(), (msg.controller, msg.address));
 | 
			
		||||
 | 
			
		||||
		Ok(())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Handler<Action> for AppServer {
 | 
			
		||||
	type Result = Result<(), EmgauwaError>;
 | 
			
		||||
 | 
			
		||||
	fn handle(&mut self, msg: Action, _ctx: &mut Self::Context) -> Self::Result {
 | 
			
		||||
		if let Some((_, address)) = self.connected_controllers.get(&msg.controller_uid) {
 | 
			
		||||
			block_on(address.send(msg.action))?
 | 
			
		||||
		} else {
 | 
			
		||||
			Err(EmgauwaError::Connection(msg.controller_uid))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,11 +1,15 @@
 | 
			
		|||
use actix::Addr;
 | 
			
		||||
use actix_web::{get, put, web, HttpResponse};
 | 
			
		||||
use emgauwa_lib::db::{DbController, DbRelay, DbTag};
 | 
			
		||||
use emgauwa_lib::errors::{DatabaseError, EmgauwaError};
 | 
			
		||||
use emgauwa_lib::models::{convert_db_list, FromDbModel, Relay};
 | 
			
		||||
use emgauwa_lib::types::ControllerUid;
 | 
			
		||||
use emgauwa_lib::types::{ControllerUid, ControllerWsAction};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
use sqlx::{Pool, Sqlite};
 | 
			
		||||
 | 
			
		||||
use crate::app_state;
 | 
			
		||||
use crate::app_state::AppServer;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Serialize, Deserialize)]
 | 
			
		||||
pub struct RequestRelay {
 | 
			
		||||
	name: String,
 | 
			
		||||
| 
						 | 
				
			
			@ -64,6 +68,7 @@ pub async fn index_for_controller(
 | 
			
		|||
#[get("/api/v1/controllers/{controller_id}/relays/{relay_num}")]
 | 
			
		||||
pub async fn show_for_controller(
 | 
			
		||||
	pool: web::Data<Pool<Sqlite>>,
 | 
			
		||||
	app_server: web::Data<Addr<AppServer>>,
 | 
			
		||||
	path: web::Path<(String, i64)>,
 | 
			
		||||
) -> Result<HttpResponse, EmgauwaError> {
 | 
			
		||||
	let mut pool_conn = pool.acquire().await?;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
use actix::{Actor, AsyncContext};
 | 
			
		||||
use emgauwa_lib::db::{DbController, DbJunctionRelaySchedule, DbRelay, DbSchedule};
 | 
			
		||||
use emgauwa_lib::errors::DatabaseError;
 | 
			
		||||
use emgauwa_lib::errors::{DatabaseError, EmgauwaError};
 | 
			
		||||
use emgauwa_lib::models::{Controller, FromDbModel};
 | 
			
		||||
use futures::executor::block_on;
 | 
			
		||||
use sqlx::pool::PoolConnection;
 | 
			
		||||
| 
						 | 
				
			
			@ -15,7 +15,7 @@ impl ControllerWs {
 | 
			
		|||
		conn: &mut PoolConnection<Sqlite>,
 | 
			
		||||
		ctx: &mut <ControllerWs as Actor>::Context,
 | 
			
		||||
		controller: Controller,
 | 
			
		||||
	) -> Result<(), DatabaseError> {
 | 
			
		||||
	) -> Result<(), EmgauwaError> {
 | 
			
		||||
		log::info!("Registering controller: {:?}", controller);
 | 
			
		||||
		let c = &controller.c;
 | 
			
		||||
		let controller_db = block_on(DbController::get_by_uid_or_create(
 | 
			
		||||
| 
						 | 
				
			
			@ -60,10 +60,10 @@ impl ControllerWs {
 | 
			
		|||
 | 
			
		||||
		let addr = ctx.address();
 | 
			
		||||
		self.controller_uid = Some(controller_uid.clone());
 | 
			
		||||
		self.app_server.do_send(ConnectController {
 | 
			
		||||
		block_on(self.app_server.send(ConnectController {
 | 
			
		||||
			address: addr.recipient(),
 | 
			
		||||
			controller,
 | 
			
		||||
		});
 | 
			
		||||
		}))??;
 | 
			
		||||
 | 
			
		||||
		Ok(())
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,7 +6,7 @@ use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
 | 
			
		|||
use actix_web_actors::ws;
 | 
			
		||||
use actix_web_actors::ws::ProtocolError;
 | 
			
		||||
use emgauwa_lib::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT};
 | 
			
		||||
use emgauwa_lib::errors::{DatabaseError, EmgauwaError};
 | 
			
		||||
use emgauwa_lib::errors::EmgauwaError;
 | 
			
		||||
use emgauwa_lib::types::{ControllerUid, ControllerWsAction};
 | 
			
		||||
use futures::executor::block_on;
 | 
			
		||||
use sqlx::pool::PoolConnection;
 | 
			
		||||
| 
						 | 
				
			
			@ -14,6 +14,7 @@ use sqlx::{Pool, Sqlite};
 | 
			
		|||
use ws::Message;
 | 
			
		||||
 | 
			
		||||
use crate::app_state::{AppServer, DisconnectController};
 | 
			
		||||
use crate::utils::flatten_result;
 | 
			
		||||
 | 
			
		||||
pub struct ControllerWs {
 | 
			
		||||
	pub pool: Pool<Sqlite>,
 | 
			
		||||
| 
						 | 
				
			
			@ -31,9 +32,15 @@ impl Actor for ControllerWs {
 | 
			
		|||
 | 
			
		||||
	fn stopped(&mut self, _ctx: &mut Self::Context) {
 | 
			
		||||
		if let Some(controller_uid) = &self.controller_uid {
 | 
			
		||||
			self.app_server.do_send(DisconnectController {
 | 
			
		||||
				controller_uid: controller_uid.clone(),
 | 
			
		||||
			})
 | 
			
		||||
			let flat_res = flatten_result(
 | 
			
		||||
				block_on(self.app_server.send(DisconnectController {
 | 
			
		||||
					controller_uid: controller_uid.clone(),
 | 
			
		||||
				}))
 | 
			
		||||
				.map_err(EmgauwaError::from),
 | 
			
		||||
			);
 | 
			
		||||
			if let Err(err) = flat_res {
 | 
			
		||||
				log::error!("Error disconnecting controller: {:?}", err);
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -44,9 +51,10 @@ impl ControllerWs {
 | 
			
		|||
		conn: &mut PoolConnection<Sqlite>,
 | 
			
		||||
		ctx: &mut <ControllerWs as Actor>::Context,
 | 
			
		||||
		action: ControllerWsAction,
 | 
			
		||||
	) -> Result<(), DatabaseError> {
 | 
			
		||||
	) -> Result<(), EmgauwaError> {
 | 
			
		||||
		match action {
 | 
			
		||||
			ControllerWsAction::Register(controller) => self.handle_register(conn, ctx, controller),
 | 
			
		||||
			_ => Ok(()),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -78,7 +86,14 @@ impl Handler<ControllerWsAction> for ControllerWs {
 | 
			
		|||
 | 
			
		||||
impl StreamHandler<Result<Message, ProtocolError>> for ControllerWs {
 | 
			
		||||
	fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
 | 
			
		||||
		let mut pool_conn = block_on(self.pool.acquire()).unwrap();
 | 
			
		||||
		let mut pool_conn = match block_on(self.pool.acquire()) {
 | 
			
		||||
			Ok(conn) => conn,
 | 
			
		||||
			Err(err) => {
 | 
			
		||||
				log::error!("Failed to acquire database connection: {:?}", err);
 | 
			
		||||
				ctx.stop();
 | 
			
		||||
				return;
 | 
			
		||||
			}
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		let msg = match msg {
 | 
			
		||||
			Err(_) => {
 | 
			
		||||
| 
						 | 
				
			
			@ -96,14 +111,22 @@ impl StreamHandler<Result<Message, ProtocolError>> for ControllerWs {
 | 
			
		|||
			Message::Pong(_) => {
 | 
			
		||||
				self.hb = Instant::now();
 | 
			
		||||
			}
 | 
			
		||||
			Message::Text(text) => {
 | 
			
		||||
				let action: ControllerWsAction = serde_json::from_str(&text).unwrap();
 | 
			
		||||
				let action_res = self.handle_action(&mut pool_conn, ctx, action);
 | 
			
		||||
				if let Err(e) = action_res {
 | 
			
		||||
					log::error!("Error handling action: {:?}", e);
 | 
			
		||||
					ctx.text(serde_json::to_string(&e).unwrap());
 | 
			
		||||
			Message::Text(text) => match serde_json::from_str(&text) {
 | 
			
		||||
				Ok(action) => {
 | 
			
		||||
					let action_res = self.handle_action(&mut pool_conn, ctx, action);
 | 
			
		||||
					if let Err(e) = action_res {
 | 
			
		||||
						log::error!("Error handling action: {:?}", e);
 | 
			
		||||
						ctx.text(serde_json::to_string(&e).expect("Failed to serialize error"));
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
				Err(e) => {
 | 
			
		||||
					log::error!("Error deserializing action: {:?}", e);
 | 
			
		||||
					ctx.text(
 | 
			
		||||
						serde_json::to_string(&EmgauwaError::Serialization(e))
 | 
			
		||||
							.expect("Failed to serialize error"),
 | 
			
		||||
					);
 | 
			
		||||
				}
 | 
			
		||||
			},
 | 
			
		||||
			Message::Binary(_) => log::warn!("Received unexpected binary in controller ws"),
 | 
			
		||||
			Message::Close(reason) => {
 | 
			
		||||
				ctx.close(reason);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@ use std::time::Instant;
 | 
			
		|||
use actix::Addr;
 | 
			
		||||
use actix_web::{get, web, HttpRequest, HttpResponse};
 | 
			
		||||
use actix_web_actors::ws;
 | 
			
		||||
use emgauwa_lib::errors::{ApiError, EmgauwaError};
 | 
			
		||||
use emgauwa_lib::errors::EmgauwaError;
 | 
			
		||||
use sqlx::{Pool, Sqlite};
 | 
			
		||||
 | 
			
		||||
use crate::app_state::AppServer;
 | 
			
		||||
| 
						 | 
				
			
			@ -28,10 +28,6 @@ pub async fn ws_controllers(
 | 
			
		|||
		&req,
 | 
			
		||||
		stream,
 | 
			
		||||
	)
 | 
			
		||||
	.map_err(|_| {
 | 
			
		||||
		EmgauwaError::from(ApiError::InternalError(String::from(
 | 
			
		||||
			"error starting websocket",
 | 
			
		||||
		)))
 | 
			
		||||
	});
 | 
			
		||||
	.map_err(|_| EmgauwaError::Internal(String::from("error starting websocket")));
 | 
			
		||||
	resp
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,6 +3,14 @@ use std::io::{Error, ErrorKind};
 | 
			
		|||
 | 
			
		||||
use crate::settings::Settings;
 | 
			
		||||
 | 
			
		||||
pub fn flatten_result<T, E>(res: Result<Result<T, E>, E>) -> Result<T, E> {
 | 
			
		||||
	match res {
 | 
			
		||||
		Ok(Ok(t)) => Ok(t),
 | 
			
		||||
		Ok(Err(e)) => Err(e),
 | 
			
		||||
		Err(e) => Err(e),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// https://blog.lxsang.me/post/id/28.0
 | 
			
		||||
pub fn drop_privileges(settings: &Settings) -> Result<(), Error> {
 | 
			
		||||
	log::info!(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue