core/src/handlers/v1/ws/relays/mod.rs

108 lines
2.7 KiB
Rust

use std::time::Instant;
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler};
use actix_web_actors::ws;
use actix_web_actors::ws::ProtocolError;
use emgauwa_common::constants::{HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT};
use emgauwa_common::errors::EmgauwaError;
use futures::executor::block_on;
use crate::app_state::{AppState, ConnectRelayClient};
pub struct RelaysWs {
pub app_state: Addr<AppState>,
pub hb: Instant,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct SendRelays {
pub relays_json: String,
}
impl Actor for RelaysWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// get unique id for ctx
match self.get_relays_json() {
Ok(relays_json) => {
ctx.text(relays_json);
self.hb(ctx);
block_on(self.app_state.send(ConnectRelayClient {
addr: ctx.address(),
}))
.unwrap();
}
Err(err) => {
log::error!("Error getting relays: {:?}", err);
ctx.stop();
}
}
}
}
impl RelaysWs {
fn get_relays_json(&self) -> Result<String, EmgauwaError> {
let relays = block_on(self.app_state.send(crate::app_state::GetRelays {}))??;
serde_json::to_string(&relays).map_err(EmgauwaError::from)
}
// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL).
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > HEARTBEAT_TIMEOUT {
log::debug!("Websocket Relay heartbeat failed, disconnecting!");
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(&[]);
});
}
}
impl StreamHandler<Result<ws::Message, ProtocolError>> for RelaysWs {
fn handle(&mut self, msg: Result<ws::Message, ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
match msg {
ws::Message::Ping(msg) => {
log::trace!("Received ping from relay-client: {:?}", msg);
self.hb = Instant::now();
ctx.pong(&msg)
}
ws::Message::Pong(_) => {
log::trace!("Received pong from relay-client");
self.hb = Instant::now();
}
ws::Message::Text(_) => log::debug!("Received unexpected text in relays ws"),
ws::Message::Binary(_) => log::debug!("Received unexpected binary in relays ws"),
ws::Message::Close(reason) => {
ctx.close(reason);
ctx.stop();
}
ws::Message::Continuation(_) => {
ctx.stop();
}
ws::Message::Nop => (),
}
}
}
impl Handler<SendRelays> for RelaysWs {
type Result = ();
fn handle(&mut self, msg: SendRelays, ctx: &mut Self::Context) -> Self::Result {
ctx.text(msg.relays_json);
}
}