diff --git a/CMakeLists.txt b/CMakeLists.txt index dd2647f..18e1934 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required (VERSION 3.7) project(controller - VERSION 0.1.0 + VERSION 0.2.0 LANGUAGES C) add_executable(controller src/main.c) diff --git a/controller.ini b/controller.ini index f6a83ae..da0bcae 100644 --- a/controller.ini +++ b/controller.ini @@ -1,6 +1,11 @@ [controller] name = new emgauwa device + +: 4422 for testing; 4421 for dev-env; 4420 for testing-env; 4419 for prod-env discovery-port = 4421 +: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env +mqtt-port = 1885 + relay-count = 10 database = controller_db.lmdb log-level = debug diff --git a/include/config.h b/include/config.h index f8757e4..80d7bb1 100644 --- a/include/config.h +++ b/include/config.h @@ -21,6 +21,7 @@ typedef struct run_type_t run_type; char name[MAX_NAME_LENGTH + 1]; uint16_t discovery_port; + uint16_t mqtt_port; uint8_t relay_count; config_relay_t *relay_configs; } config_t; diff --git a/include/connections.h b/include/connections.h new file mode 100644 index 0000000..fdb268e --- /dev/null +++ b/include/connections.h @@ -0,0 +1,17 @@ +#ifndef CONTROLLER_CONNECTIONS_H +#define CONTROLLER_CONNECTIONS_H + +#include + +struct mg_connection* +connection_discovery_bind(struct mg_mgr *mgr); + +struct mg_connection* +connection_command_bind(struct mg_mgr *mgr); + +struct mg_connection* +connection_mqtt_connect(struct mg_mgr *mgr); + +extern struct mg_connection *global_connection_mqtt; + +#endif /* CONTROLLER_CONNECTIONS_H */ diff --git a/include/models/relay.h b/include/models/relay.h index 283c3c2..224e7b4 100644 --- a/include/models/relay.h +++ b/include/models/relay.h @@ -12,6 +12,7 @@ typedef struct { uint8_t number; int is_on; + int sent_to_broker; char name[MAX_NAME_LENGTH + 1]; schedule_t *schedules[7]; } relay_t; diff --git a/src/connections.c b/src/connections.c new file mode 100644 index 0000000..3be866f --- /dev/null +++ b/src/connections.c @@ -0,0 +1,35 @@ +#include + +#include +#include +#include +#include + +struct mg_connection *global_connection_mqtt; + +struct mg_connection* +connection_discovery_bind(struct mg_mgr *mgr) +{ + char address[100]; + sprintf(address, "udp://0.0.0.0:%u", global_controller->discovery_port); + struct mg_connection *c = mg_bind(mgr, address, handler_discovery); + return c; +} + +struct mg_connection* +connection_command_bind(struct mg_mgr *mgr) +{ + char address[100]; + sprintf(address, "tcp://0.0.0.0:%u", global_controller->command_port); + struct mg_connection *c = mg_bind(mgr, address, handler_command); + return c; +} + +struct mg_connection* +connection_mqtt_connect(struct mg_mgr *mgr) +{ + char address[100]; + sprintf(address, "tcp://localhost:%u", global_config.mqtt_port); + struct mg_connection *c = mg_connect(mgr, address, handler_mqtt); + return c; +} diff --git a/src/handlers/loop.c b/src/handlers/loop.c index 8ac4aa4..f0b6324 100644 --- a/src/handlers/loop.c +++ b/src/handlers/loop.c @@ -35,12 +35,16 @@ handler_loop(struct mg_connection *c_mqtt) } if(relay->is_on != is_active) + { + relay->sent_to_broker = 0; + } + if(!relay->sent_to_broker && c_mqtt) { sprintf(topic_buf, "controller/%s/relay/%u", controller_uid, i); sprintf(payload_buf, "%u", is_active); mg_mqtt_publish(c_mqtt, topic_buf, 0, MG_MQTT_QOS(0), payload_buf, strlen(payload_buf)); + relay->sent_to_broker = 1; } - relay->is_on = is_active; if(global_config.relay_configs[i].inverted) diff --git a/src/handlers/mqtt.c b/src/handlers/mqtt.c index c538876..64b75d6 100644 --- a/src/handlers/mqtt.c +++ b/src/handlers/mqtt.c @@ -1,37 +1,43 @@ #include #include +#include #include void handler_mqtt(struct mg_connection *nc, int ev, void *p) { - struct mg_mqtt_message *msg = (struct mg_mqtt_message *) p; - (void) nc; + struct mg_mqtt_message *msg = (struct mg_mqtt_message *) p; + (void) nc; - if (ev != MG_EV_POLL) LOG_DEBUG("USER HANDLER GOT EVENT %d\n", ev); + switch (ev) + { + case MG_EV_CONNECT: + { + struct mg_send_mqtt_handshake_opts opts; + memset(&opts, 0, sizeof(opts)); + // TODO add password - switch (ev) { - case MG_EV_CONNECT: { - struct mg_send_mqtt_handshake_opts opts; - memset(&opts, 0, sizeof(opts)); - // TODO add password - - mg_set_protocol_mqtt(nc); - mg_send_mqtt_handshake_opt(nc, global_controller->name, opts); - break; + mg_set_protocol_mqtt(nc); + mg_send_mqtt_handshake_opt(nc, global_controller->name, opts); + break; + } + case MG_EV_MQTT_CONNACK: + if(msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED) + { + LOG_INFO("Got MQTT connection error: %d\n", msg->connack_ret_code); + break; + } + if(!global_connection_mqtt) + { + LOG_DEBUG("connected to MQTT server\n"); + global_connection_mqtt = nc; + } + break; + case MG_EV_CLOSE: + if(global_connection_mqtt) + { + LOG_DEBUG("disconnected from MQTT server\n"); + } + global_connection_mqtt = NULL; + break; } - case MG_EV_MQTT_CONNACK: - if (msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED) { - LOG_DEBUG("Got mqtt connection error: %d\n", msg->connack_ret_code); - exit(1); - } - break; - case MG_EV_MQTT_PUBACK: - LOG_DEBUG("Message publishing acknowledged (msg_id: %d)\n", msg->message_id); - break; - case MG_EV_MQTT_SUBACK: - LOG_DEBUG("Subscription acknowledged, forwarding to '/test'\n"); - break; - case MG_EV_CLOSE: - LOG_DEBUG("Connection closed\n"); - } } diff --git a/src/helpers/load_config.c b/src/helpers/load_config.c index 22c83a5..8c4af97 100644 --- a/src/helpers/load_config.c +++ b/src/helpers/load_config.c @@ -70,6 +70,11 @@ helper_load_config(IniDispatch *disp, void *config_void) config->discovery_port = atoi(disp->value); return 0; } + if(CONFINI_IS_KEY("controller", "mqtt-port")) + { + config->mqtt_port = atoi(disp->value); + return 0; + } if(CONFINI_IS_KEY("controller", "relay-count")) { config->relay_count = atoi(disp->value); diff --git a/src/main.c b/src/main.c index e7b7578..fc5a249 100644 --- a/src/main.c +++ b/src/main.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -32,8 +33,9 @@ terminate(int signum) { LOG_INFO("terminating controller (%d)\n", signum); - LOG_DEBUG("freeing mongoose manager\n"); - mg_mgr_free(&mgr); + // TODO fix mg_mgr_free() causing loop (can't terminate) + //LOG_DEBUG("freeing mongoose manager\n"); + //mg_mgr_free(&mgr); LOG_DEBUG("closing database\n"); mdb_env_close(global_mdb_env); @@ -71,6 +73,8 @@ main(int argc, const char** argv) global_config.file = "controller.ini"; global_config.log_level = LOG_LEVEL_INFO; + global_config.discovery_port = 4421; + global_config.mqtt_port = 1885; helper_parse_cli(argc, argv, &global_config); @@ -102,15 +106,10 @@ main(int argc, const char** argv) global_controller = controller_load(global_mdb_env); - char address[100]; - sprintf(address, "udp://0.0.0.0:%u", global_controller->discovery_port); - struct mg_connection *c_discovery = mg_bind(&mgr, address, handler_discovery); - sprintf(address, "tcp://0.0.0.0:%u", global_controller->command_port); - struct mg_connection *c_command = mg_bind(&mgr, address, handler_command); - sprintf(address, "tcp://localhost:%u", 1883); - struct mg_connection *c_mqtt = mg_connect(&mgr, address, handler_mqtt); + connection_discovery_bind(&mgr); + connection_mqtt_connect(&mgr); + struct mg_connection *c_command = connection_command_bind(&mgr); - (void)c_discovery; // still unused global_controller->command_port = helper_get_port(c_command->sock); controller_save(global_controller, global_mdb_env); @@ -144,7 +143,12 @@ main(int argc, const char** argv) for(;;) { mg_mgr_poll(&mgr, 1000); - handler_loop(c_mqtt); + if(!global_connection_mqtt) + { + LOG_DEBUG("mqtt connection is not open\n"); + connection_mqtt_connect(&mgr); + } + handler_loop(global_connection_mqtt); } terminate(0); diff --git a/src/models/relay.c b/src/models/relay.c index b725d59..6667520 100644 --- a/src/models/relay.c +++ b/src/models/relay.c @@ -15,6 +15,7 @@ relay_create(uint8_t number) new_relay->name[0] = '\0'; new_relay->is_on = -1; + new_relay->sent_to_broker = 0; uuid_t off_id; memset(off_id, 0, sizeof(uuid_t));