diff -r 000000000000 -r 033898178630 bmsd/mqtt.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bmsd/mqtt.c Sat Aug 04 21:19:15 2018 +0200 @@ -0,0 +1,409 @@ +/***************************************************************************** + * Copyright (C) 2017-2018 + * + * Michiel Broek + * + * This file is part of the bms (Brewery Management System) + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * bms is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with ThermFerm; see the file COPYING. If not, write to the Free + * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + *****************************************************************************/ + +#include "bms.h" +#include "xutil.h" +#include "mqtt.h" +#include "nodes.h" +#include "fermenters.h" + + +extern sys_config Config; +extern int debug; + + + +/* Global variables for use in callbacks. */ +int mqtt_qos = 0; +int mqtt_status = STATUS_CONNECTING; +int mqtt_mid_sent = 0; +int mqtt_last_mid = -1; +int mqtt_last_mid_sent = -1; +int mqtt_connected = TRUE; +int mqtt_disconnect_sent = FALSE; +int mqtt_connect_lost = FALSE; +int mqtt_my_shutdown = FALSE; +int mqtt_use = FALSE; +int keepalive = 60; +unsigned int max_inflight = 20; +struct mosquitto *mosq = NULL; +char *state = NULL; +char my_hostname[256]; +int Sequence = 0; + + +char *payload_header(void) +{ + static char *tmp; + char buf[128]; + + tmp = xstrcpy((char *)"{\"timestamp\":"); + sprintf(buf, "%ld", time(NULL)); + tmp = xstrcat(tmp, buf); + tmp = xstrcat(tmp, (char *)",\"seq\":"); + sprintf(buf, "%d", Sequence++); + tmp = xstrcat(tmp, buf); + tmp = xstrcat(tmp, (char *)",\"metric\":"); + return tmp; +} + + + +char *topic_base(char *msgtype) +{ + static char *tmp; + + tmp = xstrcpy((char *)"mbv1.0/brewery/"); + tmp = xstrcat(tmp, msgtype); + tmp = xstrcat(tmp, (char *)"/"); + tmp = xstrcat(tmp, my_hostname); + return tmp; +} + + + +void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) +{ + char *topic = NULL; + + if (mqtt_connect_lost) { + mqtt_connect_lost = FALSE; + syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); + } + + if (!result) { + topic = topic_base((char *)"NCMD"); // TODO: do we need this?? + topic = xstrcat(topic, (char *)"/#"); + mosquitto_subscribe(mosq, NULL, topic, 0); + free(topic); + topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. + mosquitto_subscribe(mosq, NULL, topic, 0); + free(topic); + topic = NULL; + mqtt_status = STATUS_CONNACK_RECVD; + } else { + syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); + } +} + + + +void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) +{ + if (mqtt_my_shutdown) { + syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); + mqtt_connected = FALSE; + } else { + /* + * The remote server was brought down. We must keep running + */ + syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); + mqtt_connect_lost = TRUE; + } +} + + + +void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) +{ + mqtt_last_mid_sent = mid; +} + + + +void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) +{ + int i; + + syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); + for (i = 1; i < qos_count; i++) { + syslog(LOG_NOTICE, " %d", granted_qos[i]); + } +} + + + +void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) +{ + syslog(LOG_NOTICE, "MQTT: %s", str); + if (debug) + fprintf(stdout, "MQTT: %s\n", str); +} + + + +void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) +{ + if (message->payloadlen) { + // TODO: process subscribed topics here. + if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { + node_birth_data(message->topic, (char *)message->payload); + return; + } + if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { + fermenter_birth_data(message->topic, (char *)message->payload); + return; + } + if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { + fermenter_log(message->topic, (char *)message->payload); + return; + } + syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); + } else { + if (strstr(message->topic, (char *)"NBIRTH")) { + // Ignore ?? + fprintf(stdout, "MQTT: %s NULL\n", message->topic); + return; + } + if (strstr(message->topic, (char *)"NDEATH")) { + node_death(message->topic); + return; + } + if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { + fermenter_death(message->topic); + return; + } + syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); + } +} + + + +void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) +{ + // publish the data + if (payload) + mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); + else + mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); +} + + + +void publishNData(bool birth, int flag) +{ + char *topic = NULL, *payload = NULL; + struct utsname ubuf; + bool comma = false; + + payload = payload_header(); + payload = xstrcat(payload, (char *)"{"); + + if (birth || flag & MQTT_NODE_CONTROL) { + payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); + comma = true; + } + + if (birth) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); + if (uname(&ubuf) == 0) { + payload = xstrcat(payload, (char *)",\"os\":\""); + payload = xstrcat(payload, ubuf.sysname); + payload = xstrcat(payload, (char *)"\",\"os_version\":\""); + payload = xstrcat(payload, ubuf.release); + payload = xstrcat(payload, (char *)"\""); + } else { + payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); + } + + payload = xstrcat(payload, (char *)",\"FW\":\""); + payload = xstrcat(payload, (char *)VERSION); + payload = xstrcat(payload, (char *)"\"}"); + comma = true; + } + payload = xstrcat(payload, (char *)"}}"); + + if (birth) { + topic = topic_base((char *)"NBIRTH"); + publisher(mosq, topic, payload, true); + } else { + topic = topic_base((char *)"NDATA"); + publisher(mosq, topic, payload, false); + } + + free(payload); + payload = NULL; + free(topic); + topic = NULL; +} + + + +int mqtt_connect(void) +{ + char *id = NULL, *topic = NULL; + char err[1024]; + int rc; + + /* + * Initialize mosquitto communication + */ + gethostname(my_hostname, 255); + mosquitto_lib_init(); + id = xstrcpy((char *)"bmsd/"); + id = xstrcat(id, my_hostname); + if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { + /* + * Enforce maximum client id length of 23 characters + */ + id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; + } + + mosq = mosquitto_new(id, TRUE, NULL); + if (!mosq) { + switch(errno) { + case ENOMEM: + syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); + break; + case EINVAL: + syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); + break; + } + mosquitto_lib_cleanup(); + return 1; + } + free(id); + id = NULL; + + /* + * Set our will + */ + topic = topic_base((char *)"NDEATH"); + if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { + if (rc > MOSQ_ERR_SUCCESS) + syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); + mosquitto_lib_cleanup(); + return 2; + } + free(topic); + topic = NULL; + + if (debug) + mosquitto_log_callback_set(mosq, my_log_callback); + + /* + * Username/Password + */ + if (Config.mqtt_user) { + if (Config.mqtt_pass) { + rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); + } else { + rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); + } + if (rc == MOSQ_ERR_INVAL) { + syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); + } else if (rc == MOSQ_ERR_NOMEM) { + syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); + } + if (rc != MOSQ_ERR_SUCCESS) { + mosquitto_lib_cleanup(); + return 3; + } + } + + mosquitto_max_inflight_messages_set(mosq, max_inflight); + mosquitto_connect_callback_set(mosq, my_connect_callback); + mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); + mosquitto_publish_callback_set(mosq, my_publish_callback); + mosquitto_message_callback_set(mosq, my_message_callback); + mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); + + if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { + if (rc == MOSQ_ERR_ERRNO) { + strerror_r(errno, err, 1024); + syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); + } else { + syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); + } + mosquitto_lib_cleanup(); + syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); + return 4; + } else { + mqtt_use = TRUE; + syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); + + /* + * Initialise is complete, report our presence state + */ + mosquitto_loop_start(mosq); + publishNData(true, 0); + } + + return 0; +} + + + +void mqtt_disconnect(void) +{ + int rc; + char *topic = NULL; + + if (mqtt_use) { + /* + * Final publish 0 to clients//bmsd/state + * After that, remove the retained topic. + */ + syslog(LOG_NOTICE, "MQTT disconnecting"); + topic = topic_base((char *)"NBIRTH"); + publisher(mosq, topic, NULL, true); + free(topic); + topic = topic_base((char *)"NDEATH"); + publisher(mosq, topic, NULL, true); + free(topic); + topic = NULL; + mqtt_last_mid = mqtt_mid_sent; + mqtt_status = STATUS_WAITING; + mqtt_my_shutdown = TRUE; + + do { + if (mqtt_status == STATUS_WAITING) { + if (debug) + fprintf(stdout, (char *)"Waiting\n"); + if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { + mosquitto_disconnect(mosq); + mqtt_disconnect_sent = TRUE; + } + usleep(100000); + } + rc = MOSQ_ERR_SUCCESS; + } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); + + mosquitto_loop_stop(mosq, FALSE); + mosquitto_destroy(mosq); + mosquitto_lib_cleanup(); + mqtt_use = FALSE; + mqtt_status = STATUS_CONNECTING; + mqtt_mid_sent = 0; + mqtt_last_mid = -1; + mqtt_last_mid_sent = -1; + mqtt_connected = TRUE; + mqtt_disconnect_sent = FALSE; + mqtt_connect_lost = FALSE; + mqtt_my_shutdown = FALSE; + syslog(LOG_NOTICE, "MQTT: disconnected"); + } +} + +