add: mqtt status client
This commit is contained in:
parent
1d1ae61310
commit
679175f1a9
11 changed files with 119 additions and 40 deletions
|
@ -1,6 +1,6 @@
|
||||||
cmake_minimum_required (VERSION 3.7)
|
cmake_minimum_required (VERSION 3.7)
|
||||||
project(controller
|
project(controller
|
||||||
VERSION 0.1.0
|
VERSION 0.2.0
|
||||||
LANGUAGES C)
|
LANGUAGES C)
|
||||||
|
|
||||||
add_executable(controller src/main.c)
|
add_executable(controller src/main.c)
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
[controller]
|
[controller]
|
||||||
name = new emgauwa device
|
name = new emgauwa device
|
||||||
|
|
||||||
|
: 4422 for testing; 4421 for dev-env; 4420 for testing-env; 4419 for prod-env
|
||||||
discovery-port = 4421
|
discovery-port = 4421
|
||||||
|
: 1886 for testing; 1885 for dev-env; 1884 for testing-env; 1883 for prod-env
|
||||||
|
mqtt-port = 1885
|
||||||
|
|
||||||
relay-count = 10
|
relay-count = 10
|
||||||
database = controller_db.lmdb
|
database = controller_db.lmdb
|
||||||
log-level = debug
|
log-level = debug
|
||||||
|
|
|
@ -21,6 +21,7 @@ typedef struct
|
||||||
run_type_t run_type;
|
run_type_t run_type;
|
||||||
char name[MAX_NAME_LENGTH + 1];
|
char name[MAX_NAME_LENGTH + 1];
|
||||||
uint16_t discovery_port;
|
uint16_t discovery_port;
|
||||||
|
uint16_t mqtt_port;
|
||||||
uint8_t relay_count;
|
uint8_t relay_count;
|
||||||
config_relay_t *relay_configs;
|
config_relay_t *relay_configs;
|
||||||
} config_t;
|
} config_t;
|
||||||
|
|
17
include/connections.h
Normal file
17
include/connections.h
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
#ifndef CONTROLLER_CONNECTIONS_H
|
||||||
|
#define CONTROLLER_CONNECTIONS_H
|
||||||
|
|
||||||
|
#include <mongoose.h>
|
||||||
|
|
||||||
|
struct mg_connection*
|
||||||
|
connection_discovery_bind(struct mg_mgr *mgr);
|
||||||
|
|
||||||
|
struct mg_connection*
|
||||||
|
connection_command_bind(struct mg_mgr *mgr);
|
||||||
|
|
||||||
|
struct mg_connection*
|
||||||
|
connection_mqtt_connect(struct mg_mgr *mgr);
|
||||||
|
|
||||||
|
extern struct mg_connection *global_connection_mqtt;
|
||||||
|
|
||||||
|
#endif /* CONTROLLER_CONNECTIONS_H */
|
|
@ -12,6 +12,7 @@ typedef struct
|
||||||
{
|
{
|
||||||
uint8_t number;
|
uint8_t number;
|
||||||
int is_on;
|
int is_on;
|
||||||
|
int sent_to_broker;
|
||||||
char name[MAX_NAME_LENGTH + 1];
|
char name[MAX_NAME_LENGTH + 1];
|
||||||
schedule_t *schedules[7];
|
schedule_t *schedules[7];
|
||||||
} relay_t;
|
} relay_t;
|
||||||
|
|
35
src/connections.c
Normal file
35
src/connections.c
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
|
||||||
|
#include <connections.h>
|
||||||
|
#include <models/controller.h>
|
||||||
|
#include <config.h>
|
||||||
|
#include <handlers.h>
|
||||||
|
|
||||||
|
struct mg_connection *global_connection_mqtt;
|
||||||
|
|
||||||
|
struct mg_connection*
|
||||||
|
connection_discovery_bind(struct mg_mgr *mgr)
|
||||||
|
{
|
||||||
|
char address[100];
|
||||||
|
sprintf(address, "udp://0.0.0.0:%u", global_controller->discovery_port);
|
||||||
|
struct mg_connection *c = mg_bind(mgr, address, handler_discovery);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct mg_connection*
|
||||||
|
connection_command_bind(struct mg_mgr *mgr)
|
||||||
|
{
|
||||||
|
char address[100];
|
||||||
|
sprintf(address, "tcp://0.0.0.0:%u", global_controller->command_port);
|
||||||
|
struct mg_connection *c = mg_bind(mgr, address, handler_command);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct mg_connection*
|
||||||
|
connection_mqtt_connect(struct mg_mgr *mgr)
|
||||||
|
{
|
||||||
|
char address[100];
|
||||||
|
sprintf(address, "tcp://localhost:%u", global_config.mqtt_port);
|
||||||
|
struct mg_connection *c = mg_connect(mgr, address, handler_mqtt);
|
||||||
|
return c;
|
||||||
|
}
|
|
@ -35,12 +35,16 @@ handler_loop(struct mg_connection *c_mqtt)
|
||||||
}
|
}
|
||||||
|
|
||||||
if(relay->is_on != is_active)
|
if(relay->is_on != is_active)
|
||||||
|
{
|
||||||
|
relay->sent_to_broker = 0;
|
||||||
|
}
|
||||||
|
if(!relay->sent_to_broker && c_mqtt)
|
||||||
{
|
{
|
||||||
sprintf(topic_buf, "controller/%s/relay/%u", controller_uid, i);
|
sprintf(topic_buf, "controller/%s/relay/%u", controller_uid, i);
|
||||||
sprintf(payload_buf, "%u", is_active);
|
sprintf(payload_buf, "%u", is_active);
|
||||||
mg_mqtt_publish(c_mqtt, topic_buf, 0, MG_MQTT_QOS(0), payload_buf, strlen(payload_buf));
|
mg_mqtt_publish(c_mqtt, topic_buf, 0, MG_MQTT_QOS(0), payload_buf, strlen(payload_buf));
|
||||||
|
relay->sent_to_broker = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
relay->is_on = is_active;
|
relay->is_on = is_active;
|
||||||
|
|
||||||
if(global_config.relay_configs[i].inverted)
|
if(global_config.relay_configs[i].inverted)
|
||||||
|
|
|
@ -1,37 +1,43 @@
|
||||||
#include <logger.h>
|
#include <logger.h>
|
||||||
#include <handlers.h>
|
#include <handlers.h>
|
||||||
|
#include <connections.h>
|
||||||
#include <models/controller.h>
|
#include <models/controller.h>
|
||||||
|
|
||||||
void
|
void
|
||||||
handler_mqtt(struct mg_connection *nc, int ev, void *p) {
|
handler_mqtt(struct mg_connection *nc, int ev, void *p) {
|
||||||
struct mg_mqtt_message *msg = (struct mg_mqtt_message *) p;
|
struct mg_mqtt_message *msg = (struct mg_mqtt_message *) p;
|
||||||
(void) nc;
|
(void) nc;
|
||||||
|
|
||||||
if (ev != MG_EV_POLL) LOG_DEBUG("USER HANDLER GOT EVENT %d\n", ev);
|
switch (ev)
|
||||||
|
{
|
||||||
|
case MG_EV_CONNECT:
|
||||||
|
{
|
||||||
|
struct mg_send_mqtt_handshake_opts opts;
|
||||||
|
memset(&opts, 0, sizeof(opts));
|
||||||
|
// TODO add password
|
||||||
|
|
||||||
switch (ev) {
|
mg_set_protocol_mqtt(nc);
|
||||||
case MG_EV_CONNECT: {
|
mg_send_mqtt_handshake_opt(nc, global_controller->name, opts);
|
||||||
struct mg_send_mqtt_handshake_opts opts;
|
break;
|
||||||
memset(&opts, 0, sizeof(opts));
|
}
|
||||||
// TODO add password
|
case MG_EV_MQTT_CONNACK:
|
||||||
|
if(msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED)
|
||||||
mg_set_protocol_mqtt(nc);
|
{
|
||||||
mg_send_mqtt_handshake_opt(nc, global_controller->name, opts);
|
LOG_INFO("Got MQTT connection error: %d\n", msg->connack_ret_code);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
if(!global_connection_mqtt)
|
||||||
|
{
|
||||||
|
LOG_DEBUG("connected to MQTT server\n");
|
||||||
|
global_connection_mqtt = nc;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case MG_EV_CLOSE:
|
||||||
|
if(global_connection_mqtt)
|
||||||
|
{
|
||||||
|
LOG_DEBUG("disconnected from MQTT server\n");
|
||||||
|
}
|
||||||
|
global_connection_mqtt = NULL;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
case MG_EV_MQTT_CONNACK:
|
|
||||||
if (msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED) {
|
|
||||||
LOG_DEBUG("Got mqtt connection error: %d\n", msg->connack_ret_code);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case MG_EV_MQTT_PUBACK:
|
|
||||||
LOG_DEBUG("Message publishing acknowledged (msg_id: %d)\n", msg->message_id);
|
|
||||||
break;
|
|
||||||
case MG_EV_MQTT_SUBACK:
|
|
||||||
LOG_DEBUG("Subscription acknowledged, forwarding to '/test'\n");
|
|
||||||
break;
|
|
||||||
case MG_EV_CLOSE:
|
|
||||||
LOG_DEBUG("Connection closed\n");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,11 @@ helper_load_config(IniDispatch *disp, void *config_void)
|
||||||
config->discovery_port = atoi(disp->value);
|
config->discovery_port = atoi(disp->value);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
if(CONFINI_IS_KEY("controller", "mqtt-port"))
|
||||||
|
{
|
||||||
|
config->mqtt_port = atoi(disp->value);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
if(CONFINI_IS_KEY("controller", "relay-count"))
|
if(CONFINI_IS_KEY("controller", "relay-count"))
|
||||||
{
|
{
|
||||||
config->relay_count = atoi(disp->value);
|
config->relay_count = atoi(disp->value);
|
||||||
|
|
26
src/main.c
26
src/main.c
|
@ -10,6 +10,7 @@
|
||||||
#include <models/controller.h>
|
#include <models/controller.h>
|
||||||
#include <database.h>
|
#include <database.h>
|
||||||
#include <config.h>
|
#include <config.h>
|
||||||
|
#include <connections.h>
|
||||||
#include <constants.h>
|
#include <constants.h>
|
||||||
#include <handlers.h>
|
#include <handlers.h>
|
||||||
#include <drivers.h>
|
#include <drivers.h>
|
||||||
|
@ -32,8 +33,9 @@ terminate(int signum)
|
||||||
{
|
{
|
||||||
LOG_INFO("terminating controller (%d)\n", signum);
|
LOG_INFO("terminating controller (%d)\n", signum);
|
||||||
|
|
||||||
LOG_DEBUG("freeing mongoose manager\n");
|
// TODO fix mg_mgr_free() causing loop (can't terminate)
|
||||||
mg_mgr_free(&mgr);
|
//LOG_DEBUG("freeing mongoose manager\n");
|
||||||
|
//mg_mgr_free(&mgr);
|
||||||
|
|
||||||
LOG_DEBUG("closing database\n");
|
LOG_DEBUG("closing database\n");
|
||||||
mdb_env_close(global_mdb_env);
|
mdb_env_close(global_mdb_env);
|
||||||
|
@ -71,6 +73,8 @@ main(int argc, const char** argv)
|
||||||
|
|
||||||
global_config.file = "controller.ini";
|
global_config.file = "controller.ini";
|
||||||
global_config.log_level = LOG_LEVEL_INFO;
|
global_config.log_level = LOG_LEVEL_INFO;
|
||||||
|
global_config.discovery_port = 4421;
|
||||||
|
global_config.mqtt_port = 1885;
|
||||||
|
|
||||||
helper_parse_cli(argc, argv, &global_config);
|
helper_parse_cli(argc, argv, &global_config);
|
||||||
|
|
||||||
|
@ -102,15 +106,10 @@ main(int argc, const char** argv)
|
||||||
|
|
||||||
global_controller = controller_load(global_mdb_env);
|
global_controller = controller_load(global_mdb_env);
|
||||||
|
|
||||||
char address[100];
|
connection_discovery_bind(&mgr);
|
||||||
sprintf(address, "udp://0.0.0.0:%u", global_controller->discovery_port);
|
connection_mqtt_connect(&mgr);
|
||||||
struct mg_connection *c_discovery = mg_bind(&mgr, address, handler_discovery);
|
struct mg_connection *c_command = connection_command_bind(&mgr);
|
||||||
sprintf(address, "tcp://0.0.0.0:%u", global_controller->command_port);
|
|
||||||
struct mg_connection *c_command = mg_bind(&mgr, address, handler_command);
|
|
||||||
sprintf(address, "tcp://localhost:%u", 1883);
|
|
||||||
struct mg_connection *c_mqtt = mg_connect(&mgr, address, handler_mqtt);
|
|
||||||
|
|
||||||
(void)c_discovery; // still unused
|
|
||||||
global_controller->command_port = helper_get_port(c_command->sock);
|
global_controller->command_port = helper_get_port(c_command->sock);
|
||||||
|
|
||||||
controller_save(global_controller, global_mdb_env);
|
controller_save(global_controller, global_mdb_env);
|
||||||
|
@ -144,7 +143,12 @@ main(int argc, const char** argv)
|
||||||
for(;;)
|
for(;;)
|
||||||
{
|
{
|
||||||
mg_mgr_poll(&mgr, 1000);
|
mg_mgr_poll(&mgr, 1000);
|
||||||
handler_loop(c_mqtt);
|
if(!global_connection_mqtt)
|
||||||
|
{
|
||||||
|
LOG_DEBUG("mqtt connection is not open\n");
|
||||||
|
connection_mqtt_connect(&mgr);
|
||||||
|
}
|
||||||
|
handler_loop(global_connection_mqtt);
|
||||||
}
|
}
|
||||||
|
|
||||||
terminate(0);
|
terminate(0);
|
||||||
|
|
|
@ -15,6 +15,7 @@ relay_create(uint8_t number)
|
||||||
new_relay->name[0] = '\0';
|
new_relay->name[0] = '\0';
|
||||||
|
|
||||||
new_relay->is_on = -1;
|
new_relay->is_on = -1;
|
||||||
|
new_relay->sent_to_broker = 0;
|
||||||
|
|
||||||
uuid_t off_id;
|
uuid_t off_id;
|
||||||
memset(off_id, 0, sizeof(uuid_t));
|
memset(off_id, 0, sizeof(uuid_t));
|
||||||
|
|
Loading…
Reference in a new issue