add: status for mqtt

fix: refactor connection handlers
This commit is contained in:
Tobias Reisinger 2020-06-26 01:01:46 +02:00
parent 2bc11ee829
commit 6c6e5023da
19 changed files with 534 additions and 183 deletions

View file

@ -1,6 +1,6 @@
cmake_minimum_required (VERSION 3.7) cmake_minimum_required (VERSION 3.7)
project(core project(core
VERSION 0.1.3 VERSION 0.2.0
LANGUAGES C) LANGUAGES C)
add_executable(core src/main.c) add_executable(core src/main.c)
@ -9,7 +9,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu99 -Wpedantic -Werror -Wall -Wextra
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -g -fprofile-arcs -ftest-coverage") set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -g -fprofile-arcs -ftest-coverage")
add_definitions("-DMG_ENABLE_EXTRA_ERRORS_DESC") add_definitions("-DMG_ENABLE_EXTRA_ERRORS_DESC -DMG_ENABLE_MQTT_BROKER")
aux_source_directory(src/ SRC_DIR) aux_source_directory(src/ SRC_DIR)
aux_source_directory(src/models MODELS_SRC) aux_source_directory(src/models MODELS_SRC)

View file

@ -6,6 +6,9 @@ not-found-file-mime = text/html
not-found-content = 404 - NOT FOUND not-found-content = 404 - NOT FOUND
not-found-content-type = text/plain not-found-content-type = text/plain
: 4421 for dev-env; 4420 for testing-env; 4419 for prod-env; 4422 for testing : 4422 for testing; 4421 for dev-env; 4420 for testing-env; 4419 for prod-env
discovery-port = 4421 discovery-port = 4421
: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env
mqtt-port = 1885
log-level = debug log-level = debug

View file

@ -30,8 +30,9 @@ typedef struct
char group[256]; char group[256];
log_level_t log_level; log_level_t log_level;
run_type_t run_type; run_type_t run_type;
char server_port[6]; uint16_t server_port;
uint16_t discovery_port; uint16_t discovery_port;
uint16_t mqtt_port;
char not_found_file[256]; char not_found_file[256];
char not_found_file_type[256]; char not_found_file_type[256];
char not_found_content[256]; char not_found_content[256];

View file

@ -0,0 +1,10 @@
#ifndef CORE_ENDPOINTS_API_V1_WS_H
#define CORE_ENDPOINTS_API_V1_WS_H
#include <router.h>
void
api_v1_ws_relays(struct mg_connection *nc, struct http_message *hm, endpoint_args_t *args, endpoint_response_t *response);
#endif /* CORE_ENDPOINTS_API_V1_WS_H */

View file

@ -2,6 +2,9 @@
#define CORE_HANDLERS_H #define CORE_HANDLERS_H
void void
handler_connection(struct mg_connection *c, int ev, void *p); handler_http(struct mg_connection *nc, int ev, void *p);
void
handler_mqtt(struct mg_connection *nc, int ev, void *p);
#endif /* CORE_HANDLERS_H */ #endif /* CORE_HANDLERS_H */

View file

@ -17,6 +17,7 @@ typedef struct
int number; int number;
int controller_id; int controller_id;
int active_schedule_id; int active_schedule_id;
int is_on;
schedule_t *active_schedule; schedule_t *active_schedule;
schedule_t *schedules[7]; schedule_t *schedules[7];
} relay_t; } relay_t;
@ -24,14 +25,14 @@ typedef struct
int int
relay_save(); relay_save();
int
relay_remove();
void void
relay_reload_active_schedule(relay_t *relay); relay_reload_active_schedule(relay_t *relay);
cJSON* cJSON*
relay_to_json(); relay_to_json(relay_t *relay);
cJSON*
relay_list_to_json(relay_t **relays);
void void
relay_free(relay_t *relay); relay_free(relay_t *relay);
@ -39,9 +40,6 @@ relay_free(relay_t *relay);
void void
relay_free_list(relay_t **relays_list); relay_free_list(relay_t **relays_list);
relay_t**
relay_get_by_simple(const char *key, const void *value, intptr_t bind_func, int bind_func_param);
relay_t* relay_t*
relay_get_by_id(int id); relay_get_by_id(int id);

View file

@ -12,7 +12,9 @@ typedef enum
HTTP_METHOD_POST = (1 << 1), HTTP_METHOD_POST = (1 << 1),
HTTP_METHOD_PUT = (1 << 2), HTTP_METHOD_PUT = (1 << 2),
HTTP_METHOD_DELETE = (1 << 3), HTTP_METHOD_DELETE = (1 << 3),
HTTP_METHOD_OPTIONS = (1 << 4) HTTP_METHOD_OPTIONS = (1 << 4),
HTTP_METHOD_WEBSOCKET = (1 << 5)
} http_method_e; } http_method_e;
endpoint_t* endpoint_t*

26
include/status.h Normal file
View file

@ -0,0 +1,26 @@
#ifndef CORE_STATUS_H
#define CORE_STATUS_H
#include <models/controller.h>
extern relay_t **global_relay_status_list;
void
status_init();
void
status_reload_entry(int relay_id);
void
status_update_entry(int relay_id, int is_on);
void
status_broadcast(struct mg_mgr *mgr);
void
status_send(struct mg_connection *c);
void
status_free();
#endif /* CORE_STATUS_H */

View file

@ -54,11 +54,6 @@ config_load(IniDispatch *disp, void *config_void)
if(disp->type == INI_KEY) if(disp->type == INI_KEY)
{ {
if(CONFINI_IS_KEY("core", "server-port"))
{
strcpy(config->server_port, disp->value);
return 0;
}
if(CONFINI_IS_KEY("core", "database")) if(CONFINI_IS_KEY("core", "database"))
{ {
strcpy(config->database, disp->value); strcpy(config->database, disp->value);
@ -98,11 +93,21 @@ config_load(IniDispatch *disp, void *config_void)
{ {
return config_load_log_level(disp, config); return config_load_log_level(disp, config);
} }
if(CONFINI_IS_KEY("core", "server-port"))
{
config->server_port = atoi(disp->value);
return 0;
}
if(CONFINI_IS_KEY("core", "discovery-port")) if(CONFINI_IS_KEY("core", "discovery-port"))
{ {
config->discovery_port = atoi(disp->value); config->discovery_port = atoi(disp->value);
return 0; return 0;
} }
if(CONFINI_IS_KEY("core", "mqtt-port"))
{
config->mqtt_port = atoi(disp->value);
return 0;
}
} }
return 0; return 0;
} }

View file

@ -0,0 +1,11 @@
#include <endpoints/api_v1_ws.h>
#include <status.h>
void
api_v1_ws_relays(struct mg_connection *nc, struct http_message *hm, endpoint_args_t *args, endpoint_response_t *response)
{
(void)nc;
(void)hm;
(void)args;
(void)response;
}

View file

@ -1,134 +0,0 @@
#include <string.h>
#include <macros.h>
#include <constants.h>
#include <mongoose.h>
#include <logger.h>
#include <router.h>
#include <handlers.h>
#define HEADERS_FMT "Content-Type: %s"
// -2 for "%s" -1 for \0
#define HEADERS_FMT_LEN (sizeof(HEADERS_FMT) - 3)
static char*
add_extra_headers(char *extra_headers)
{
char *result;
size_t std_headers_len = strlen(global_config.http_server_opts.extra_headers);
if(extra_headers == NULL)
{
result = malloc(sizeof(char) * (std_headers_len + 1));
strcpy(result, global_config.http_server_opts.extra_headers);
return result;
}
result = malloc(sizeof(char) * (std_headers_len + strlen(extra_headers) + 3));
sprintf(result, "%s\r\n%s", global_config.http_server_opts.extra_headers, extra_headers);
return result;
}
static void
send_response(struct mg_connection *nc, endpoint_response_t *response)
{
if(response->status_code)
{
char *response_headers = malloc(sizeof(char) * (HEADERS_FMT_LEN + strlen(response->content_type) + 1));
sprintf(response_headers, HEADERS_FMT, response->content_type);
char *extra_headers = add_extra_headers(response_headers);
mg_send_head(nc, response->status_code, response->content_length, extra_headers);
mg_printf(nc, "%s", response->content);
free(response_headers);
free(extra_headers);
if(response->alloced_content)
{
free((char*)response->content);
}
}
}
void
handler_connection(struct mg_connection *nc, int ev, void *p)
{
if (ev == MG_EV_HTTP_REQUEST)
{
struct http_message *hm = (struct http_message *) p;
LOG_TRACE("new http %.*s request for %.*s\n", hm->method.len, hm->method.p, hm->uri.len, hm->uri.p);
endpoint_t *endpoint = router_find_endpoint(hm->uri.p, hm->uri.len, &hm->method);
endpoint_response_t response;
static const char content[] = "the server did not create a response";
endpoint_response_text(&response, 500, content, STRLEN(content));
if(!endpoint)
{
/* Normalize path - resolve "." and ".." (in-place). */
if (!mg_normalize_uri_path(&hm->uri, &hm->uri)) {
mg_http_send_error(nc, 400, global_config.http_server_opts.extra_headers);
return;
}
char *request_file_org = malloc(sizeof(char) * hm->uri.len);
strncpy(request_file_org, hm->uri.p + 1, hm->uri.len);
request_file_org[hm->uri.len - 1] = '\0';
char *request_file = request_file_org;
while(request_file[0] == '/')
{
++request_file;
}
LOG_DEBUG("%s\n", request_file);
int access_result = access(request_file, R_OK);
free(request_file_org);
if(access_result != -1)
{
response.status_code = 0;
mg_serve_http(nc, hm, global_config.http_server_opts);
return;
}
else
{
endpoint = router_get_not_found_endpoint();
}
}
if(!endpoint->func)
{
if(endpoint->method == HTTP_METHOD_OPTIONS)
{
char options_header[256]; // TODO make more generic
sprintf(options_header, "Allow: OPTIONS%s%s%s%s",
endpoint->options & HTTP_METHOD_GET ? ", GET" : "",
endpoint->options & HTTP_METHOD_POST ? ", POST" : "",
endpoint->options & HTTP_METHOD_PUT ? ", PUT" : "",
endpoint->options & HTTP_METHOD_DELETE ? ", DELETE" : ""
);
char *extra_headers = add_extra_headers(options_header);
mg_send_head(nc, 204, 0, extra_headers);
free(extra_headers);
}
else
{
mg_send_head(nc, 501, 0, "Content-Type: text/plain");
}
}
else
{
endpoint->func(nc, hm, endpoint->args, &response);
send_response(nc, &response);
}
for(int i = 0; i < endpoint->args_count; ++i)
{
if(endpoint->args[i].type == ENDPOINT_ARG_TYPE_STR)
{
free((char*)endpoint->args[i].value.v_str);
}
}
}
}

176
src/handlers/http.c Normal file
View file

@ -0,0 +1,176 @@
#include <string.h>
#include <status.h>
#include <macros.h>
#include <constants.h>
#include <mongoose.h>
#include <logger.h>
#include <router.h>
#include <handlers.h>
#define HEADERS_FMT "Content-Type: %s"
// -2 for "%s" -1 for \0
#define HEADERS_FMT_LEN (sizeof(HEADERS_FMT) - 3)
static char*
add_extra_headers(char *extra_headers)
{
char *result;
size_t std_headers_len = strlen(global_config.http_server_opts.extra_headers);
if(extra_headers == NULL)
{
result = malloc(sizeof(char) * (std_headers_len + 1));
strcpy(result, global_config.http_server_opts.extra_headers);
return result;
}
result = malloc(sizeof(char) * (std_headers_len + strlen(extra_headers) + 3));
sprintf(result, "%s\r\n%s", global_config.http_server_opts.extra_headers, extra_headers);
return result;
}
static void
send_response(struct mg_connection *nc, endpoint_response_t *response)
{
if(response->status_code)
{
char *response_headers = malloc(sizeof(char) * (HEADERS_FMT_LEN + strlen(response->content_type) + 1));
sprintf(response_headers, HEADERS_FMT, response->content_type);
char *extra_headers = add_extra_headers(response_headers);
mg_send_head(nc, response->status_code, response->content_length, extra_headers);
mg_printf(nc, "%s", response->content);
free(response_headers);
free(extra_headers);
if(response->alloced_content)
{
free((char*)response->content);
}
}
}
static void
handle_websocket_request(struct mg_connection *nc, struct http_message *hm)
{
LOG_TRACE("new websocket %.*s request for %.*s\n", hm->method.len, hm->method.p, hm->uri.len, hm->uri.p);
struct mg_str method_websocket_str = mg_mk_str("WEBSOCKET");
endpoint_t *endpoint = router_find_endpoint(hm->uri.p, hm->uri.len, &method_websocket_str);
if(!endpoint || !endpoint->func)
{
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
else
{
endpoint->func(nc, hm, endpoint->args, NULL);
for(int i = 0; i < endpoint->args_count; ++i)
{
if(endpoint->args[i].type == ENDPOINT_ARG_TYPE_STR)
{
free((char*)endpoint->args[i].value.v_str);
}
}
}
}
static void
handle_http_request(struct mg_connection *nc, struct http_message *hm)
{
LOG_TRACE("new http %.*s request for %.*s\n", hm->method.len, hm->method.p, hm->uri.len, hm->uri.p);
endpoint_t *endpoint = router_find_endpoint(hm->uri.p, hm->uri.len, &hm->method);
endpoint_response_t response;
static const char content[] = "the server did not create a response";
endpoint_response_text(&response, 500, content, STRLEN(content));
if(!endpoint)
{
/* Normalize path - resolve "." and ".." (in-place). */
if (!mg_normalize_uri_path(&hm->uri, &hm->uri)) {
mg_http_send_error(nc, 400, global_config.http_server_opts.extra_headers);
return;
}
char *request_file_org = malloc(sizeof(char) * hm->uri.len);
strncpy(request_file_org, hm->uri.p + 1, hm->uri.len);
request_file_org[hm->uri.len - 1] = '\0';
char *request_file = request_file_org;
while(request_file[0] == '/')
{
++request_file;
}
LOG_DEBUG("%s\n", request_file);
int access_result = access(request_file, R_OK);
free(request_file_org);
if(access_result != -1)
{
response.status_code = 0;
mg_serve_http(nc, hm, global_config.http_server_opts);
return;
}
else
{
endpoint = router_get_not_found_endpoint();
}
}
if(!endpoint->func)
{
if(endpoint->method == HTTP_METHOD_OPTIONS)
{
char options_header[256]; // TODO make more generic
sprintf(options_header, "Allow: OPTIONS%s%s%s%s",
endpoint->options & HTTP_METHOD_GET ? ", GET" : "",
endpoint->options & HTTP_METHOD_POST ? ", POST" : "",
endpoint->options & HTTP_METHOD_PUT ? ", PUT" : "",
endpoint->options & HTTP_METHOD_DELETE ? ", DELETE" : ""
);
char *extra_headers = add_extra_headers(options_header);
mg_send_head(nc, 204, 0, extra_headers);
free(extra_headers);
}
else
{
mg_send_head(nc, 501, 0, "Content-Type: text/plain");
}
}
else
{
endpoint->func(nc, hm, endpoint->args, &response);
send_response(nc, &response);
}
for(int i = 0; i < endpoint->args_count; ++i)
{
if(endpoint->args[i].type == ENDPOINT_ARG_TYPE_STR)
{
free((char*)endpoint->args[i].value.v_str);
}
}
}
void
handler_http(struct mg_connection *nc, int ev, void *p)
{
if(ev == MG_EV_WEBSOCKET_HANDSHAKE_REQUEST)
{
struct http_message *hm = (struct http_message*)p;
handle_websocket_request(nc, hm);
}
if(ev == MG_EV_WEBSOCKET_HANDSHAKE_DONE)
{
status_send(nc);
}
if(ev == MG_EV_HTTP_REQUEST)
{
struct http_message *hm = (struct http_message*)p;
handle_http_request(nc, hm);
}
}

95
src/handlers/mqtt.c Normal file
View file

@ -0,0 +1,95 @@
#include <string.h>
#include <errno.h>
#include <mongoose.h>
#include <logger.h>
#include <handlers.h>
#include <status.h>
#include <models/controller.h>
#include <models/relay.h>
static void
handle_mqtt_publish_controller(char **topic_save, int controller_id, char *payload)
{
(void)controller_id;
(void)payload;
char *topic_token = strtok_r(NULL, "/", topic_save);
if(!topic_token)
{
return;
}
if(strcmp(topic_token, "relay") == 0)
{
char *relay_num_str = strtok_r(NULL, "/", topic_save);
if(!relay_num_str)
{
return;
}
errno = 0;
int relay_num = strtol(relay_num_str, NULL, 10);
if(errno)
{
return;
}
relay_t *relay = relay_get_for_controller(controller_id, relay_num);
if(!relay)
{
return;
}
status_update_entry(relay->id, payload[0] == '1');
free(relay);
}
}
static void
handle_mqtt_publish(struct mg_mqtt_message *msg)
{
char *topic = malloc(sizeof(char) * (msg->topic.len + 1));
strncpy(topic, msg->topic.p, msg->topic.len);
topic[msg->topic.len] = '\0';
LOG_DEBUG("received mqtt publish for topic %s\n", topic);
char *payload = malloc(sizeof(char) * (msg->payload.len + 1));
strncpy(payload, msg->payload.p, msg->payload.len);
payload[msg->payload.len] = '\0';
char *topic_save_null = NULL;
char **topic_save = &topic_save_null;
char *topic_token = strtok_r(topic, "/", topic_save);
if(topic_token)
{
if(strcmp(topic_token, "controller") == 0)
{
char *controller_uid_str = strtok_r(NULL, "/", topic_save);
uuid_t controller_uid;
if(uuid_parse(controller_uid_str, controller_uid) == 0)
{
controller_t *controller = controller_get_by_uid(controller_uid);
if(controller)
{
handle_mqtt_publish_controller(topic_save, controller->id, payload);
controller_free(controller);
}
}
}
}
free(payload);
free(topic);
}
void
handler_mqtt(struct mg_connection *nc, int ev, void *p)
{
if(ev == MG_EV_POLL)
{
return;
}
mg_mqtt_broker(nc, ev, p);
if(ev == MG_EV_MQTT_PUBLISH)
{
struct mg_mqtt_message *msg = (struct mg_mqtt_message*)p;
handle_mqtt_publish(msg);
}
}

View file

@ -11,6 +11,7 @@
#include <handlers.h> #include <handlers.h>
#include <enums.h> #include <enums.h>
#include <helpers.h> #include <helpers.h>
#include <status.h>
#include <confini.h> #include <confini.h>
#include <models/controller.h> #include <models/controller.h>
@ -26,6 +27,7 @@ terminate(int signum)
sqlite3_close(global_database); sqlite3_close(global_database);
router_free(); router_free();
status_free();
exit(signum); exit(signum);
} }
@ -49,6 +51,11 @@ main(int argc, const char** argv)
/******************** LOAD CONFIG ********************/ /******************** LOAD CONFIG ********************/
global_config.file = "core.ini"; global_config.file = "core.ini";
global_config.log_level = LOG_LEVEL_INFO;
global_config.discovery_port = 4421;
global_config.mqtt_port = 1885;
global_config.server_port = 5000;
strcpy(global_config.user, ""); strcpy(global_config.user, "");
strcpy(global_config.group, ""); strcpy(global_config.group, "");
@ -56,7 +63,6 @@ main(int argc, const char** argv)
strcpy(global_config.not_found_file_type, "text/html"); strcpy(global_config.not_found_file_type, "text/html");
strcpy(global_config.not_found_content, "404 - NOT FOUND"); strcpy(global_config.not_found_content, "404 - NOT FOUND");
strcpy(global_config.not_found_content_type, "text/plain"); strcpy(global_config.not_found_content_type, "text/plain");
global_config.log_level = LOG_LEVEL_INFO;
helper_parse_cli(argc, argv, &global_config); helper_parse_cli(argc, argv, &global_config);
@ -82,18 +88,30 @@ main(int argc, const char** argv)
/******************** SETUP CONNECTION ********************/ /******************** SETUP CONNECTION ********************/
struct mg_connection *c; struct mg_mqtt_broker brk;
mg_mgr_init(&mgr, NULL); mg_mgr_init(&mgr, NULL);
c = mg_bind(&mgr, global_config.server_port, handler_connection);
if(c == NULL) char address[100];
sprintf(address, "tcp://0.0.0.0:%u", global_config.server_port);
struct mg_connection *c_http = mg_bind(&mgr, address, handler_http);
if(c_http == NULL)
{ {
LOG_FATAL("failed to bind to port %s\n", global_config.server_port); LOG_FATAL("failed to bind http server to port %u\n", global_config.server_port);
exit(1); exit(1);
} }
mg_set_protocol_http_websocket(c_http);
mg_set_protocol_http_websocket(c); sprintf(address, "tcp://0.0.0.0:%u", global_config.mqtt_port);
struct mg_connection *c_mqtt = mg_bind(&mgr, address, handler_mqtt);
if(c_mqtt == NULL)
{
LOG_FATAL("failed to bind mqtt server to port %u\n", global_config.mqtt_port);
exit(1);
}
mg_mqtt_broker_init(&brk, NULL);
c_mqtt->priv_2 = &brk;
mg_set_protocol_mqtt(c_mqtt);
helper_drop_privileges(); helper_drop_privileges();
@ -116,16 +134,24 @@ main(int argc, const char** argv)
sqlite3_exec(global_database, "PRAGMA foreign_keys = ON", 0, 0, 0); sqlite3_exec(global_database, "PRAGMA foreign_keys = ON", 0, 0, 0);
/******************** INIT ROUTER ********************/ /******************** INIT COMPONENTS ********************/
router_init(); router_init();
status_init();
/******************** START MAIN LOOP ********************/ /******************** START MAIN LOOP ********************/
time_t timer = time(NULL);
for (;;) for (;;)
{ {
mg_mgr_poll(&mgr, 1000); mg_mgr_poll(&mgr, 1000);
if(time(NULL) - timer >= 10)
{
status_broadcast(&mgr);
timer = time(NULL);
}
} }
terminate(0); terminate(0);

View file

@ -5,6 +5,7 @@
#include <cJSON.h> #include <cJSON.h>
#include <logger.h> #include <logger.h>
#include <database.h> #include <database.h>
#include <status.h>
#include <models/relay.h> #include <models/relay.h>
#include <models/controller.h> #include <models/controller.h>
#include <models/schedule.h> #include <models/schedule.h>
@ -32,6 +33,8 @@ static relay_t*
relay_db_select_mapper(sqlite3_stmt *stmt) relay_db_select_mapper(sqlite3_stmt *stmt)
{ {
relay_t *new_relay = malloc(sizeof(relay_t)); relay_t *new_relay = malloc(sizeof(relay_t));
new_relay->is_on = 0;
for(int i = 0; i < sqlite3_column_count(stmt); i++) for(int i = 0; i < sqlite3_column_count(stmt); i++)
{ {
const char *name = sqlite3_column_name(stmt, i); const char *name = sqlite3_column_name(stmt, i);
@ -158,28 +161,10 @@ relay_save(relay_t *relay)
junction_relay_schedule_insert(i, relay->id, relay->schedules[i]->id); junction_relay_schedule_insert(i, relay->id, relay->schedules[i]->id);
} }
status_reload_entry(relay->id);
return result; return result;
} }
int
relay_remove(relay_t *relay)
{
sqlite3_stmt *stmt;
if(!relay->id)
{
return 0;
}
sqlite3_prepare_v2(global_database, "DELETE FROM relays WHERE id=?1;", -1, &stmt, NULL);
sqlite3_bind_int(stmt, 1, relay->id);
int rc = sqlite3_step(stmt);
sqlite3_finalize(stmt);
return rc != SQLITE_DONE;
}
void void
relay_reload_active_schedule(relay_t *relay) relay_reload_active_schedule(relay_t *relay)
{ {
@ -254,6 +239,21 @@ relay_to_json(relay_t *relay)
controller_free(controller); controller_free(controller);
cJSON *json_is_on;
switch(relay->is_on)
{
case 0:
json_is_on = cJSON_CreateFalse();
break;
case 1:
json_is_on = cJSON_CreateTrue();
break;
default:
json_is_on = cJSON_CreateNull();
break;
}
cJSON_AddItemToObject(json, "is_on", json_is_on);
cJSON_AddItemToObject(json, "active_schedule", schedule_to_json(relay->active_schedule)); cJSON_AddItemToObject(json, "active_schedule", schedule_to_json(relay->active_schedule));
cJSON *json_schedules = cJSON_CreateArray(); cJSON *json_schedules = cJSON_CreateArray();
@ -292,6 +292,19 @@ relay_to_json(relay_t *relay)
return json; return json;
} }
cJSON*
relay_list_to_json(relay_t **relays)
{
cJSON *json = cJSON_CreateArray();
for(int i = 0; relays[i] != NULL; ++i)
{
cJSON *json_relay = relay_to_json(relays[i]);
cJSON_AddItemToArray(json, json_relay);
}
return json;
}
relay_t* relay_t*
relay_get_by_id(int id) relay_get_by_id(int id)
{ {

View file

@ -4,11 +4,13 @@
#include <router.h> #include <router.h>
#include <macros.h> #include <macros.h>
#include <endpoint.h> #include <endpoint.h>
#include <status.h>
#include <endpoints/api_v1_schedules.h> #include <endpoints/api_v1_schedules.h>
#include <endpoints/api_v1_controllers.h> #include <endpoints/api_v1_controllers.h>
#include <endpoints/api_v1_relays.h> #include <endpoints/api_v1_relays.h>
#include <endpoints/api_v1_tags.h> #include <endpoints/api_v1_tags.h>
#include <endpoints/api_v1_ws.h>
static endpoint_t endpoints[ROUTER_ENDPOINTS_MAX_COUNT]; static endpoint_t endpoints[ROUTER_ENDPOINTS_MAX_COUNT];
static endpoint_t endpoint_not_found; static endpoint_t endpoint_not_found;
@ -38,6 +40,10 @@ get_method_str_for_int(int method_int)
{ {
return mg_mk_str("OPTIONS"); return mg_mk_str("OPTIONS");
} }
if(method_int == HTTP_METHOD_WEBSOCKET)
{
return mg_mk_str("WEBSOCKET");
}
return mg_mk_str("GET"); return mg_mk_str("GET");
} }
@ -81,6 +87,9 @@ router_init()
router_register_endpoint("/api/v1/tags/", HTTP_METHOD_GET, api_v1_tags_GET); router_register_endpoint("/api/v1/tags/", HTTP_METHOD_GET, api_v1_tags_GET);
router_register_endpoint("/api/v1/tags/{str}", HTTP_METHOD_GET, api_v1_tags_STR_GET); router_register_endpoint("/api/v1/tags/{str}", HTTP_METHOD_GET, api_v1_tags_STR_GET);
router_register_endpoint("/api/v1/tags/{str}", HTTP_METHOD_DELETE, api_v1_tags_STR_DELETE); router_register_endpoint("/api/v1/tags/{str}", HTTP_METHOD_DELETE, api_v1_tags_STR_DELETE);
router_register_endpoint("/api/v1/ws/relays", HTTP_METHOD_WEBSOCKET, api_v1_ws_relays);
} }
endpoint_t* endpoint_t*
@ -182,26 +191,31 @@ router_register_endpoint(const char *route, int method, endpoint_func_f func)
static int static int
get_method_int_for_str(struct mg_str *method_str) get_method_int_for_str(struct mg_str *method_str)
{ {
if(strncmp(method_str->p, "GET", method_str->len) == 0) if(mg_vcmp(method_str, "GET") == 0)
{ {
return HTTP_METHOD_GET; return HTTP_METHOD_GET;
} }
if(strncmp(method_str->p, "POST", method_str->len) == 0) if(mg_vcmp(method_str, "POST") == 0)
{ {
return HTTP_METHOD_POST; return HTTP_METHOD_POST;
} }
if(strncmp(method_str->p, "PUT", method_str->len) == 0) if(mg_vcmp(method_str, "PUT") == 0)
{ {
return HTTP_METHOD_PUT; return HTTP_METHOD_PUT;
} }
if(strncmp(method_str->p, "DELETE", method_str->len) == 0) if(mg_vcmp(method_str, "DELETE") == 0)
{ {
return HTTP_METHOD_DELETE; return HTTP_METHOD_DELETE;
} }
if(strncmp(method_str->p, "OPTIONS", method_str->len) == 0) if(mg_vcmp(method_str, "OPTIONS") == 0)
{ {
return HTTP_METHOD_OPTIONS; return HTTP_METHOD_OPTIONS;
} }
if(mg_vcmp(method_str, "WEBSOCKET") == 0)
{
return HTTP_METHOD_WEBSOCKET;
}
return HTTP_METHOD_GET; return HTTP_METHOD_GET;
} }

94
src/status.c Normal file
View file

@ -0,0 +1,94 @@
#include <status.h>
#include <logger.h>
relay_t **global_relay_status_list;
void
status_init()
{
global_relay_status_list = relay_get_all();
}
void
status_reload_entry(int relay_id)
{
relay_t **relays = global_relay_status_list;
for(int i = 0; relays[i] != NULL; ++i)
{
if(relays[i]->id != relay_id)
{
continue;
}
int is_on_backup = relays[i]->is_on;
relay_t *updated_relay = relay_get_by_id(relay_id);
relay_free(relays[i]);
relays[i] = updated_relay;
relays[i]->is_on = is_on_backup;
}
}
void
status_update_entry(int relay_id, int is_on)
{
relay_t **relays = global_relay_status_list;
for(int i = 0; relays[i] != NULL; ++i)
{
if(relays[i]->id != relay_id)
{
continue;
}
relays[i]->is_on = is_on;
}
}
static int is_websocket(const struct mg_connection *nc)
{
return nc->flags & MG_F_IS_WEBSOCKET;
}
void
status_broadcast(struct mg_mgr *mgr)
{
struct mg_connection *c;
relay_t **relays = global_relay_status_list;
cJSON *json = relay_list_to_json(relays);
char *json_str = cJSON_Print(json);
size_t json_str_len = strlen(json_str);
for (c = mg_next(mgr, NULL); c != NULL; c = mg_next(mgr, c)) {
if(is_websocket(c))
{
mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, json_str, json_str_len);
}
}
free(json_str);
cJSON_Delete(json);
}
void
status_send(struct mg_connection *c)
{
if(!is_websocket(c))
{
return;
}
relay_t **relays = global_relay_status_list;
cJSON *json = relay_list_to_json(relays);
char *json_str = cJSON_Print(json);
size_t json_str_len = strlen(json_str);
mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, json_str, json_str_len);
free(json_str);
cJSON_Delete(json);
}
void
status_free()
{
relay_free_list(global_relay_status_list);
}

View file

@ -1,6 +1,11 @@
[controller] [controller]
name = new emgauwa device name = new emgauwa device
: 4422 for testing; 4421 for dev-env; 4420 for testing-env; 4419 for prod-env
discovery-port = 4422 discovery-port = 4422
: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env
mqtt-port = 1886
relay-count = 10 relay-count = 10
database = controller_db.lmdb database = controller_db.lmdb
log-level = debug log-level = debug

View file

@ -8,4 +8,7 @@ not-found-content-type = text/plain
: 4421 for dev-env; 4420 for testing-env; 4419 for prod-env; 4422 for testing : 4421 for dev-env; 4420 for testing-env; 4419 for prod-env; 4422 for testing
discovery-port = 4422 discovery-port = 4422
: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env
mqtt-port = 1886
log-level = debug log-level = debug