From 6c6e5023dace21c07ee13aa4a5897e9849bbeaf6 Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Fri, 26 Jun 2020 01:01:46 +0200 Subject: [PATCH] add: status for mqtt fix: refactor connection handlers --- CMakeLists.txt | 4 +- core.ini | 5 +- include/config.h | 3 +- include/endpoints/api_v1_ws.h | 10 ++ include/handlers.h | 5 +- include/models/relay.h | 12 +-- include/router.h | 4 +- include/status.h | 26 +++++ src/config.c | 15 ++- src/endpoints/api_v1_ws_relays.c | 11 ++ src/handlers/connection.c | 134 ----------------------- src/handlers/http.c | 176 +++++++++++++++++++++++++++++++ src/handlers/mqtt.c | 95 +++++++++++++++++ src/main.c | 40 +++++-- src/models/relay.c | 51 +++++---- src/router.c | 24 ++++- src/status.c | 94 +++++++++++++++++ tests/controller.testing.ini | 5 + tests/core.testing.ini | 3 + 19 files changed, 534 insertions(+), 183 deletions(-) create mode 100644 include/endpoints/api_v1_ws.h create mode 100644 include/status.h create mode 100644 src/endpoints/api_v1_ws_relays.c delete mode 100644 src/handlers/connection.c create mode 100644 src/handlers/http.c create mode 100644 src/handlers/mqtt.c create mode 100644 src/status.c diff --git a/CMakeLists.txt b/CMakeLists.txt index d4ca5d3..eef2619 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required (VERSION 3.7) project(core - VERSION 0.1.3 + VERSION 0.2.0 LANGUAGES C) add_executable(core src/main.c) @@ -9,7 +9,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu99 -Wpedantic -Werror -Wall -Wextra set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -g -fprofile-arcs -ftest-coverage") -add_definitions("-DMG_ENABLE_EXTRA_ERRORS_DESC") +add_definitions("-DMG_ENABLE_EXTRA_ERRORS_DESC -DMG_ENABLE_MQTT_BROKER") aux_source_directory(src/ SRC_DIR) aux_source_directory(src/models MODELS_SRC) diff --git a/core.ini b/core.ini index 4d2e6f8..1f55048 100644 --- a/core.ini +++ b/core.ini @@ -6,6 +6,9 @@ not-found-file-mime = text/html not-found-content = 404 - NOT FOUND not-found-content-type = text/plain -: 4421 for dev-env; 4420 for testing-env; 4419 for prod-env; 4422 for testing +: 4422 for testing; 4421 for dev-env; 4420 for testing-env; 4419 for prod-env discovery-port = 4421 +: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env +mqtt-port = 1885 + log-level = debug diff --git a/include/config.h b/include/config.h index 904c9b7..a00f2af 100644 --- a/include/config.h +++ b/include/config.h @@ -30,8 +30,9 @@ typedef struct char group[256]; log_level_t log_level; run_type_t run_type; - char server_port[6]; + uint16_t server_port; uint16_t discovery_port; + uint16_t mqtt_port; char not_found_file[256]; char not_found_file_type[256]; char not_found_content[256]; diff --git a/include/endpoints/api_v1_ws.h b/include/endpoints/api_v1_ws.h new file mode 100644 index 0000000..cb9f818 --- /dev/null +++ b/include/endpoints/api_v1_ws.h @@ -0,0 +1,10 @@ +#ifndef CORE_ENDPOINTS_API_V1_WS_H +#define CORE_ENDPOINTS_API_V1_WS_H + +#include + +void +api_v1_ws_relays(struct mg_connection *nc, struct http_message *hm, endpoint_args_t *args, endpoint_response_t *response); + + +#endif /* CORE_ENDPOINTS_API_V1_WS_H */ diff --git a/include/handlers.h b/include/handlers.h index ed4de0a..712cc9e 100644 --- a/include/handlers.h +++ b/include/handlers.h @@ -2,6 +2,9 @@ #define CORE_HANDLERS_H void -handler_connection(struct mg_connection *c, int ev, void *p); +handler_http(struct mg_connection *nc, int ev, void *p); + +void +handler_mqtt(struct mg_connection *nc, int ev, void *p); #endif /* CORE_HANDLERS_H */ diff --git a/include/models/relay.h b/include/models/relay.h index ee8b9a4..d5a4785 100644 --- a/include/models/relay.h +++ b/include/models/relay.h @@ -17,6 +17,7 @@ typedef struct int number; int controller_id; int active_schedule_id; + int is_on; schedule_t *active_schedule; schedule_t *schedules[7]; } relay_t; @@ -24,14 +25,14 @@ typedef struct int relay_save(); -int -relay_remove(); - void relay_reload_active_schedule(relay_t *relay); cJSON* -relay_to_json(); +relay_to_json(relay_t *relay); + +cJSON* +relay_list_to_json(relay_t **relays); void relay_free(relay_t *relay); @@ -39,9 +40,6 @@ relay_free(relay_t *relay); void relay_free_list(relay_t **relays_list); -relay_t** -relay_get_by_simple(const char *key, const void *value, intptr_t bind_func, int bind_func_param); - relay_t* relay_get_by_id(int id); diff --git a/include/router.h b/include/router.h index 93bfec9..9c82fa5 100644 --- a/include/router.h +++ b/include/router.h @@ -12,7 +12,9 @@ typedef enum HTTP_METHOD_POST = (1 << 1), HTTP_METHOD_PUT = (1 << 2), HTTP_METHOD_DELETE = (1 << 3), - HTTP_METHOD_OPTIONS = (1 << 4) + HTTP_METHOD_OPTIONS = (1 << 4), + + HTTP_METHOD_WEBSOCKET = (1 << 5) } http_method_e; endpoint_t* diff --git a/include/status.h b/include/status.h new file mode 100644 index 0000000..272858c --- /dev/null +++ b/include/status.h @@ -0,0 +1,26 @@ +#ifndef CORE_STATUS_H +#define CORE_STATUS_H + +#include + +extern relay_t **global_relay_status_list; + +void +status_init(); + +void +status_reload_entry(int relay_id); + +void +status_update_entry(int relay_id, int is_on); + +void +status_broadcast(struct mg_mgr *mgr); + +void +status_send(struct mg_connection *c); + +void +status_free(); + +#endif /* CORE_STATUS_H */ diff --git a/src/config.c b/src/config.c index e96c2b7..90627f4 100644 --- a/src/config.c +++ b/src/config.c @@ -54,11 +54,6 @@ config_load(IniDispatch *disp, void *config_void) if(disp->type == INI_KEY) { - if(CONFINI_IS_KEY("core", "server-port")) - { - strcpy(config->server_port, disp->value); - return 0; - } if(CONFINI_IS_KEY("core", "database")) { strcpy(config->database, disp->value); @@ -98,11 +93,21 @@ config_load(IniDispatch *disp, void *config_void) { return config_load_log_level(disp, config); } + if(CONFINI_IS_KEY("core", "server-port")) + { + config->server_port = atoi(disp->value); + return 0; + } if(CONFINI_IS_KEY("core", "discovery-port")) { config->discovery_port = atoi(disp->value); return 0; } + if(CONFINI_IS_KEY("core", "mqtt-port")) + { + config->mqtt_port = atoi(disp->value); + return 0; + } } return 0; } diff --git a/src/endpoints/api_v1_ws_relays.c b/src/endpoints/api_v1_ws_relays.c new file mode 100644 index 0000000..de5828e --- /dev/null +++ b/src/endpoints/api_v1_ws_relays.c @@ -0,0 +1,11 @@ +#include +#include + +void +api_v1_ws_relays(struct mg_connection *nc, struct http_message *hm, endpoint_args_t *args, endpoint_response_t *response) +{ + (void)nc; + (void)hm; + (void)args; + (void)response; +} diff --git a/src/handlers/connection.c b/src/handlers/connection.c deleted file mode 100644 index c726f39..0000000 --- a/src/handlers/connection.c +++ /dev/null @@ -1,134 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include - -#define HEADERS_FMT "Content-Type: %s" - -// -2 for "%s" -1 for \0 -#define HEADERS_FMT_LEN (sizeof(HEADERS_FMT) - 3) - -static char* -add_extra_headers(char *extra_headers) -{ - char *result; - size_t std_headers_len = strlen(global_config.http_server_opts.extra_headers); - if(extra_headers == NULL) - { - result = malloc(sizeof(char) * (std_headers_len + 1)); - strcpy(result, global_config.http_server_opts.extra_headers); - return result; - } - - result = malloc(sizeof(char) * (std_headers_len + strlen(extra_headers) + 3)); - sprintf(result, "%s\r\n%s", global_config.http_server_opts.extra_headers, extra_headers); - return result; -} - -static void -send_response(struct mg_connection *nc, endpoint_response_t *response) -{ - if(response->status_code) - { - char *response_headers = malloc(sizeof(char) * (HEADERS_FMT_LEN + strlen(response->content_type) + 1)); - sprintf(response_headers, HEADERS_FMT, response->content_type); - - char *extra_headers = add_extra_headers(response_headers); - mg_send_head(nc, response->status_code, response->content_length, extra_headers); - mg_printf(nc, "%s", response->content); - - free(response_headers); - free(extra_headers); - - if(response->alloced_content) - { - free((char*)response->content); - } - } -} - -void -handler_connection(struct mg_connection *nc, int ev, void *p) -{ - if (ev == MG_EV_HTTP_REQUEST) - { - struct http_message *hm = (struct http_message *) p; - LOG_TRACE("new http %.*s request for %.*s\n", hm->method.len, hm->method.p, hm->uri.len, hm->uri.p); - - endpoint_t *endpoint = router_find_endpoint(hm->uri.p, hm->uri.len, &hm->method); - - endpoint_response_t response; - static const char content[] = "the server did not create a response"; - endpoint_response_text(&response, 500, content, STRLEN(content)); - - if(!endpoint) - { - /* Normalize path - resolve "." and ".." (in-place). */ - if (!mg_normalize_uri_path(&hm->uri, &hm->uri)) { - mg_http_send_error(nc, 400, global_config.http_server_opts.extra_headers); - return; - } - char *request_file_org = malloc(sizeof(char) * hm->uri.len); - strncpy(request_file_org, hm->uri.p + 1, hm->uri.len); - request_file_org[hm->uri.len - 1] = '\0'; - - char *request_file = request_file_org; - while(request_file[0] == '/') - { - ++request_file; - } - - LOG_DEBUG("%s\n", request_file); - int access_result = access(request_file, R_OK); - free(request_file_org); - if(access_result != -1) - { - response.status_code = 0; - mg_serve_http(nc, hm, global_config.http_server_opts); - return; - } - else - { - endpoint = router_get_not_found_endpoint(); - } - } - - if(!endpoint->func) - { - if(endpoint->method == HTTP_METHOD_OPTIONS) - { - char options_header[256]; // TODO make more generic - sprintf(options_header, "Allow: OPTIONS%s%s%s%s", - endpoint->options & HTTP_METHOD_GET ? ", GET" : "", - endpoint->options & HTTP_METHOD_POST ? ", POST" : "", - endpoint->options & HTTP_METHOD_PUT ? ", PUT" : "", - endpoint->options & HTTP_METHOD_DELETE ? ", DELETE" : "" - ); - char *extra_headers = add_extra_headers(options_header); - mg_send_head(nc, 204, 0, extra_headers); - free(extra_headers); - } - else - { - mg_send_head(nc, 501, 0, "Content-Type: text/plain"); - } - } - else - { - endpoint->func(nc, hm, endpoint->args, &response); - send_response(nc, &response); - } - - for(int i = 0; i < endpoint->args_count; ++i) - { - if(endpoint->args[i].type == ENDPOINT_ARG_TYPE_STR) - { - free((char*)endpoint->args[i].value.v_str); - } - } - } -} diff --git a/src/handlers/http.c b/src/handlers/http.c new file mode 100644 index 0000000..dc7e36a --- /dev/null +++ b/src/handlers/http.c @@ -0,0 +1,176 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +#define HEADERS_FMT "Content-Type: %s" + +// -2 for "%s" -1 for \0 +#define HEADERS_FMT_LEN (sizeof(HEADERS_FMT) - 3) + +static char* +add_extra_headers(char *extra_headers) +{ + char *result; + size_t std_headers_len = strlen(global_config.http_server_opts.extra_headers); + if(extra_headers == NULL) + { + result = malloc(sizeof(char) * (std_headers_len + 1)); + strcpy(result, global_config.http_server_opts.extra_headers); + return result; + } + + result = malloc(sizeof(char) * (std_headers_len + strlen(extra_headers) + 3)); + sprintf(result, "%s\r\n%s", global_config.http_server_opts.extra_headers, extra_headers); + return result; +} + +static void +send_response(struct mg_connection *nc, endpoint_response_t *response) +{ + if(response->status_code) + { + char *response_headers = malloc(sizeof(char) * (HEADERS_FMT_LEN + strlen(response->content_type) + 1)); + sprintf(response_headers, HEADERS_FMT, response->content_type); + + char *extra_headers = add_extra_headers(response_headers); + mg_send_head(nc, response->status_code, response->content_length, extra_headers); + mg_printf(nc, "%s", response->content); + + free(response_headers); + free(extra_headers); + + if(response->alloced_content) + { + free((char*)response->content); + } + } +} + +static void +handle_websocket_request(struct mg_connection *nc, struct http_message *hm) +{ + LOG_TRACE("new websocket %.*s request for %.*s\n", hm->method.len, hm->method.p, hm->uri.len, hm->uri.p); + + struct mg_str method_websocket_str = mg_mk_str("WEBSOCKET"); + endpoint_t *endpoint = router_find_endpoint(hm->uri.p, hm->uri.len, &method_websocket_str); + + if(!endpoint || !endpoint->func) + { + nc->flags |= MG_F_CLOSE_IMMEDIATELY; + } + else + { + endpoint->func(nc, hm, endpoint->args, NULL); + + for(int i = 0; i < endpoint->args_count; ++i) + { + if(endpoint->args[i].type == ENDPOINT_ARG_TYPE_STR) + { + free((char*)endpoint->args[i].value.v_str); + } + } + } +} + +static void +handle_http_request(struct mg_connection *nc, struct http_message *hm) +{ + LOG_TRACE("new http %.*s request for %.*s\n", hm->method.len, hm->method.p, hm->uri.len, hm->uri.p); + + endpoint_t *endpoint = router_find_endpoint(hm->uri.p, hm->uri.len, &hm->method); + + endpoint_response_t response; + static const char content[] = "the server did not create a response"; + endpoint_response_text(&response, 500, content, STRLEN(content)); + + if(!endpoint) + { + /* Normalize path - resolve "." and ".." (in-place). */ + if (!mg_normalize_uri_path(&hm->uri, &hm->uri)) { + mg_http_send_error(nc, 400, global_config.http_server_opts.extra_headers); + return; + } + char *request_file_org = malloc(sizeof(char) * hm->uri.len); + strncpy(request_file_org, hm->uri.p + 1, hm->uri.len); + request_file_org[hm->uri.len - 1] = '\0'; + + char *request_file = request_file_org; + while(request_file[0] == '/') + { + ++request_file; + } + + LOG_DEBUG("%s\n", request_file); + int access_result = access(request_file, R_OK); + free(request_file_org); + if(access_result != -1) + { + response.status_code = 0; + mg_serve_http(nc, hm, global_config.http_server_opts); + return; + } + else + { + endpoint = router_get_not_found_endpoint(); + } + } + + if(!endpoint->func) + { + if(endpoint->method == HTTP_METHOD_OPTIONS) + { + char options_header[256]; // TODO make more generic + sprintf(options_header, "Allow: OPTIONS%s%s%s%s", + endpoint->options & HTTP_METHOD_GET ? ", GET" : "", + endpoint->options & HTTP_METHOD_POST ? ", POST" : "", + endpoint->options & HTTP_METHOD_PUT ? ", PUT" : "", + endpoint->options & HTTP_METHOD_DELETE ? ", DELETE" : "" + ); + char *extra_headers = add_extra_headers(options_header); + mg_send_head(nc, 204, 0, extra_headers); + free(extra_headers); + } + else + { + mg_send_head(nc, 501, 0, "Content-Type: text/plain"); + } + } + else + { + endpoint->func(nc, hm, endpoint->args, &response); + send_response(nc, &response); + } + + for(int i = 0; i < endpoint->args_count; ++i) + { + if(endpoint->args[i].type == ENDPOINT_ARG_TYPE_STR) + { + free((char*)endpoint->args[i].value.v_str); + } + } +} + +void +handler_http(struct mg_connection *nc, int ev, void *p) +{ + if(ev == MG_EV_WEBSOCKET_HANDSHAKE_REQUEST) + { + struct http_message *hm = (struct http_message*)p; + handle_websocket_request(nc, hm); + } + if(ev == MG_EV_WEBSOCKET_HANDSHAKE_DONE) + { + status_send(nc); + } + if(ev == MG_EV_HTTP_REQUEST) + { + struct http_message *hm = (struct http_message*)p; + handle_http_request(nc, hm); + } +} diff --git a/src/handlers/mqtt.c b/src/handlers/mqtt.c new file mode 100644 index 0000000..c15091e --- /dev/null +++ b/src/handlers/mqtt.c @@ -0,0 +1,95 @@ +#include +#include + +#include +#include +#include +#include +#include +#include + +static void +handle_mqtt_publish_controller(char **topic_save, int controller_id, char *payload) +{ + (void)controller_id; + (void)payload; + char *topic_token = strtok_r(NULL, "/", topic_save); + if(!topic_token) + { + return; + } + if(strcmp(topic_token, "relay") == 0) + { + char *relay_num_str = strtok_r(NULL, "/", topic_save); + if(!relay_num_str) + { + return; + } + errno = 0; + int relay_num = strtol(relay_num_str, NULL, 10); + if(errno) + { + return; + } + relay_t *relay = relay_get_for_controller(controller_id, relay_num); + if(!relay) + { + return; + } + status_update_entry(relay->id, payload[0] == '1'); + free(relay); + } +} + +static void +handle_mqtt_publish(struct mg_mqtt_message *msg) +{ + char *topic = malloc(sizeof(char) * (msg->topic.len + 1)); + strncpy(topic, msg->topic.p, msg->topic.len); + topic[msg->topic.len] = '\0'; + LOG_DEBUG("received mqtt publish for topic %s\n", topic); + + char *payload = malloc(sizeof(char) * (msg->payload.len + 1)); + strncpy(payload, msg->payload.p, msg->payload.len); + payload[msg->payload.len] = '\0'; + + char *topic_save_null = NULL; + char **topic_save = &topic_save_null; + char *topic_token = strtok_r(topic, "/", topic_save); + if(topic_token) + { + if(strcmp(topic_token, "controller") == 0) + { + char *controller_uid_str = strtok_r(NULL, "/", topic_save); + uuid_t controller_uid; + if(uuid_parse(controller_uid_str, controller_uid) == 0) + { + controller_t *controller = controller_get_by_uid(controller_uid); + if(controller) + { + handle_mqtt_publish_controller(topic_save, controller->id, payload); + controller_free(controller); + } + } + } + } + + free(payload); + free(topic); +} + +void +handler_mqtt(struct mg_connection *nc, int ev, void *p) +{ + if(ev == MG_EV_POLL) + { + return; + } + + mg_mqtt_broker(nc, ev, p); + if(ev == MG_EV_MQTT_PUBLISH) + { + struct mg_mqtt_message *msg = (struct mg_mqtt_message*)p; + handle_mqtt_publish(msg); + } +} diff --git a/src/main.c b/src/main.c index 9d066d0..a6618c8 100644 --- a/src/main.c +++ b/src/main.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -26,6 +27,7 @@ terminate(int signum) sqlite3_close(global_database); router_free(); + status_free(); exit(signum); } @@ -49,6 +51,11 @@ main(int argc, const char** argv) /******************** LOAD CONFIG ********************/ global_config.file = "core.ini"; + global_config.log_level = LOG_LEVEL_INFO; + global_config.discovery_port = 4421; + global_config.mqtt_port = 1885; + global_config.server_port = 5000; + strcpy(global_config.user, ""); strcpy(global_config.group, ""); @@ -56,7 +63,6 @@ main(int argc, const char** argv) strcpy(global_config.not_found_file_type, "text/html"); strcpy(global_config.not_found_content, "404 - NOT FOUND"); strcpy(global_config.not_found_content_type, "text/plain"); - global_config.log_level = LOG_LEVEL_INFO; helper_parse_cli(argc, argv, &global_config); @@ -82,18 +88,30 @@ main(int argc, const char** argv) /******************** SETUP CONNECTION ********************/ - struct mg_connection *c; + struct mg_mqtt_broker brk; mg_mgr_init(&mgr, NULL); - c = mg_bind(&mgr, global_config.server_port, handler_connection); - if(c == NULL) + char address[100]; + sprintf(address, "tcp://0.0.0.0:%u", global_config.server_port); + struct mg_connection *c_http = mg_bind(&mgr, address, handler_http); + if(c_http == NULL) { - LOG_FATAL("failed to bind to port %s\n", global_config.server_port); + LOG_FATAL("failed to bind http server to port %u\n", global_config.server_port); exit(1); } + mg_set_protocol_http_websocket(c_http); - mg_set_protocol_http_websocket(c); + sprintf(address, "tcp://0.0.0.0:%u", global_config.mqtt_port); + struct mg_connection *c_mqtt = mg_bind(&mgr, address, handler_mqtt); + if(c_mqtt == NULL) + { + LOG_FATAL("failed to bind mqtt server to port %u\n", global_config.mqtt_port); + exit(1); + } + mg_mqtt_broker_init(&brk, NULL); + c_mqtt->priv_2 = &brk; + mg_set_protocol_mqtt(c_mqtt); helper_drop_privileges(); @@ -116,16 +134,24 @@ main(int argc, const char** argv) sqlite3_exec(global_database, "PRAGMA foreign_keys = ON", 0, 0, 0); - /******************** INIT ROUTER ********************/ + /******************** INIT COMPONENTS ********************/ router_init(); + status_init(); /******************** START MAIN LOOP ********************/ + time_t timer = time(NULL); + for (;;) { mg_mgr_poll(&mgr, 1000); + if(time(NULL) - timer >= 10) + { + status_broadcast(&mgr); + timer = time(NULL); + } } terminate(0); diff --git a/src/models/relay.c b/src/models/relay.c index 75632e6..3463901 100644 --- a/src/models/relay.c +++ b/src/models/relay.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,8 @@ static relay_t* relay_db_select_mapper(sqlite3_stmt *stmt) { relay_t *new_relay = malloc(sizeof(relay_t)); + new_relay->is_on = 0; + for(int i = 0; i < sqlite3_column_count(stmt); i++) { const char *name = sqlite3_column_name(stmt, i); @@ -158,28 +161,10 @@ relay_save(relay_t *relay) junction_relay_schedule_insert(i, relay->id, relay->schedules[i]->id); } + status_reload_entry(relay->id); return result; } -int -relay_remove(relay_t *relay) -{ - sqlite3_stmt *stmt; - if(!relay->id) - { - return 0; - } - - sqlite3_prepare_v2(global_database, "DELETE FROM relays WHERE id=?1;", -1, &stmt, NULL); - sqlite3_bind_int(stmt, 1, relay->id); - - int rc = sqlite3_step(stmt); - - sqlite3_finalize(stmt); - - return rc != SQLITE_DONE; -} - void relay_reload_active_schedule(relay_t *relay) { @@ -254,6 +239,21 @@ relay_to_json(relay_t *relay) controller_free(controller); + cJSON *json_is_on; + switch(relay->is_on) + { + case 0: + json_is_on = cJSON_CreateFalse(); + break; + case 1: + json_is_on = cJSON_CreateTrue(); + break; + default: + json_is_on = cJSON_CreateNull(); + break; + } + cJSON_AddItemToObject(json, "is_on", json_is_on); + cJSON_AddItemToObject(json, "active_schedule", schedule_to_json(relay->active_schedule)); cJSON *json_schedules = cJSON_CreateArray(); @@ -292,6 +292,19 @@ relay_to_json(relay_t *relay) return json; } +cJSON* +relay_list_to_json(relay_t **relays) +{ + cJSON *json = cJSON_CreateArray(); + + for(int i = 0; relays[i] != NULL; ++i) + { + cJSON *json_relay = relay_to_json(relays[i]); + cJSON_AddItemToArray(json, json_relay); + } + return json; +} + relay_t* relay_get_by_id(int id) { diff --git a/src/router.c b/src/router.c index d920eef..809668e 100644 --- a/src/router.c +++ b/src/router.c @@ -4,11 +4,13 @@ #include #include #include +#include #include #include #include #include +#include static endpoint_t endpoints[ROUTER_ENDPOINTS_MAX_COUNT]; static endpoint_t endpoint_not_found; @@ -38,6 +40,10 @@ get_method_str_for_int(int method_int) { return mg_mk_str("OPTIONS"); } + if(method_int == HTTP_METHOD_WEBSOCKET) + { + return mg_mk_str("WEBSOCKET"); + } return mg_mk_str("GET"); } @@ -81,6 +87,9 @@ router_init() router_register_endpoint("/api/v1/tags/", HTTP_METHOD_GET, api_v1_tags_GET); router_register_endpoint("/api/v1/tags/{str}", HTTP_METHOD_GET, api_v1_tags_STR_GET); router_register_endpoint("/api/v1/tags/{str}", HTTP_METHOD_DELETE, api_v1_tags_STR_DELETE); + + + router_register_endpoint("/api/v1/ws/relays", HTTP_METHOD_WEBSOCKET, api_v1_ws_relays); } endpoint_t* @@ -182,26 +191,31 @@ router_register_endpoint(const char *route, int method, endpoint_func_f func) static int get_method_int_for_str(struct mg_str *method_str) { - if(strncmp(method_str->p, "GET", method_str->len) == 0) + if(mg_vcmp(method_str, "GET") == 0) { return HTTP_METHOD_GET; } - if(strncmp(method_str->p, "POST", method_str->len) == 0) + if(mg_vcmp(method_str, "POST") == 0) { return HTTP_METHOD_POST; } - if(strncmp(method_str->p, "PUT", method_str->len) == 0) + if(mg_vcmp(method_str, "PUT") == 0) { return HTTP_METHOD_PUT; } - if(strncmp(method_str->p, "DELETE", method_str->len) == 0) + if(mg_vcmp(method_str, "DELETE") == 0) { return HTTP_METHOD_DELETE; } - if(strncmp(method_str->p, "OPTIONS", method_str->len) == 0) + if(mg_vcmp(method_str, "OPTIONS") == 0) { return HTTP_METHOD_OPTIONS; } + + if(mg_vcmp(method_str, "WEBSOCKET") == 0) + { + return HTTP_METHOD_WEBSOCKET; + } return HTTP_METHOD_GET; } diff --git a/src/status.c b/src/status.c new file mode 100644 index 0000000..2008861 --- /dev/null +++ b/src/status.c @@ -0,0 +1,94 @@ +#include +#include + +relay_t **global_relay_status_list; + +void +status_init() +{ + global_relay_status_list = relay_get_all(); +} + +void +status_reload_entry(int relay_id) +{ + relay_t **relays = global_relay_status_list; + for(int i = 0; relays[i] != NULL; ++i) + { + if(relays[i]->id != relay_id) + { + continue; + } + + int is_on_backup = relays[i]->is_on; + relay_t *updated_relay = relay_get_by_id(relay_id); + relay_free(relays[i]); + relays[i] = updated_relay; + relays[i]->is_on = is_on_backup; + } +} + +void +status_update_entry(int relay_id, int is_on) +{ + relay_t **relays = global_relay_status_list; + for(int i = 0; relays[i] != NULL; ++i) + { + if(relays[i]->id != relay_id) + { + continue; + } + relays[i]->is_on = is_on; + } +} + +static int is_websocket(const struct mg_connection *nc) +{ + return nc->flags & MG_F_IS_WEBSOCKET; +} + +void +status_broadcast(struct mg_mgr *mgr) +{ + struct mg_connection *c; + relay_t **relays = global_relay_status_list; + + cJSON *json = relay_list_to_json(relays); + char *json_str = cJSON_Print(json); + size_t json_str_len = strlen(json_str); + + for (c = mg_next(mgr, NULL); c != NULL; c = mg_next(mgr, c)) { + if(is_websocket(c)) + { + mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, json_str, json_str_len); + } + } + free(json_str); + cJSON_Delete(json); +} + +void +status_send(struct mg_connection *c) +{ + if(!is_websocket(c)) + { + return; + } + + relay_t **relays = global_relay_status_list; + + cJSON *json = relay_list_to_json(relays); + char *json_str = cJSON_Print(json); + size_t json_str_len = strlen(json_str); + + mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, json_str, json_str_len); + + free(json_str); + cJSON_Delete(json); +} + +void +status_free() +{ + relay_free_list(global_relay_status_list); +} diff --git a/tests/controller.testing.ini b/tests/controller.testing.ini index e34fecd..b58e30e 100644 --- a/tests/controller.testing.ini +++ b/tests/controller.testing.ini @@ -1,6 +1,11 @@ [controller] name = new emgauwa device + +: 4422 for testing; 4421 for dev-env; 4420 for testing-env; 4419 for prod-env discovery-port = 4422 +: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env +mqtt-port = 1886 + relay-count = 10 database = controller_db.lmdb log-level = debug diff --git a/tests/core.testing.ini b/tests/core.testing.ini index 87bc44e..177fc17 100644 --- a/tests/core.testing.ini +++ b/tests/core.testing.ini @@ -8,4 +8,7 @@ not-found-content-type = text/plain : 4421 for dev-env; 4420 for testing-env; 4419 for prod-env; 4422 for testing discovery-port = 4422 +: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env +mqtt-port = 1886 + log-level = debug