use std::time::Instant; use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler}; use actix_web_actors::ws; use actix_web_actors::ws::ProtocolError; use emgauwa_common::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}; use emgauwa_common::errors::EmgauwaError; use futures::executor::block_on; use crate::app_state::{AppState, ConnectRelayClient}; pub struct RelaysWs { pub app_state: Addr<AppState>, pub hb: Instant, } #[derive(Message)] #[rtype(result = "()")] pub struct SendRelays { pub relays_json: String, } impl Actor for RelaysWs { type Context = ws::WebsocketContext<Self>; fn started(&mut self, ctx: &mut Self::Context) { // get unique id for ctx match self.get_relays_json() { Ok(relays_json) => { ctx.text(relays_json); self.hb(ctx); block_on(self.app_state.send(ConnectRelayClient { addr: ctx.address(), })).map_err( |err| log::error!("Error connecting relay-client: {:?}", err) ).ok(); } Err(err) => { log::error!("Error getting relays: {:?}", err); ctx.stop(); } } } } impl RelaysWs { fn get_relays_json(&self) -> Result<String, EmgauwaError> { let relays = block_on(self.app_state.send(crate::app_state::GetRelays {}))??; serde_json::to_string(&relays).map_err(EmgauwaError::from) } // helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL). fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > HEARTBEAT_TIMEOUT { log::debug!("Websocket Relay heartbeat failed, disconnecting!"); ctx.stop(); // don't try to send a ping return; } ctx.ping(&[]); }); } } impl StreamHandler<Result<ws::Message, ProtocolError>> for RelaysWs { fn handle(&mut self, msg: Result<ws::Message, ProtocolError>, ctx: &mut Self::Context) { let msg = match msg { Err(_) => { ctx.stop(); return; } Ok(msg) => msg, }; match msg { ws::Message::Ping(msg) => { log::trace!("Received ping from relay-client: {:?}", msg); self.hb = Instant::now(); ctx.pong(&msg) } ws::Message::Pong(_) => { log::trace!("Received pong from relay-client"); self.hb = Instant::now(); } ws::Message::Text(_) => log::debug!("Received unexpected text in relays ws"), ws::Message::Binary(_) => log::debug!("Received unexpected binary in relays ws"), ws::Message::Close(reason) => { ctx.close(reason); ctx.stop(); } ws::Message::Continuation(_) => { ctx.stop(); } ws::Message::Nop => (), } } } impl Handler<SendRelays> for RelaysWs { type Result = (); fn handle(&mut self, msg: SendRelays, ctx: &mut Self::Context) -> Self::Result { ctx.text(msg.relays_json); } }