bmsd/mqtt.c

changeset 0
033898178630
child 75
1a3c6480e057
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bmsd/mqtt.c	Sat Aug 04 21:19:15 2018 +0200
@@ -0,0 +1,409 @@
+/*****************************************************************************
+ * Copyright (C) 2017-2018
+ *   
+ * Michiel Broek <mbroek at mbse dot eu>
+ *
+ * This file is part of the bms (Brewery Management System)
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * bms is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with ThermFerm; see the file COPYING.  If not, write to the Free
+ * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ *****************************************************************************/
+
+#include "bms.h"
+#include "xutil.h"
+#include "mqtt.h"
+#include "nodes.h"
+#include "fermenters.h"
+
+
+extern sys_config       Config;
+extern int		debug;
+
+
+
+/* Global variables for use in callbacks. */
+int              	mqtt_qos = 0;
+int              	mqtt_status = STATUS_CONNECTING;
+int              	mqtt_mid_sent = 0;
+int              	mqtt_last_mid = -1;
+int              	mqtt_last_mid_sent = -1;
+int              	mqtt_connected = TRUE;
+int              	mqtt_disconnect_sent = FALSE;
+int              	mqtt_connect_lost = FALSE;
+int              	mqtt_my_shutdown = FALSE;
+int              	mqtt_use = FALSE;
+int			keepalive = 60;
+unsigned int		max_inflight = 20;
+struct mosquitto	*mosq = NULL;
+char			*state = NULL;
+char			my_hostname[256];
+int			Sequence = 0;
+
+
+char *payload_header(void)
+{
+    static char	*tmp;
+    char	buf[128];
+
+    tmp = xstrcpy((char *)"{\"timestamp\":");
+    sprintf(buf, "%ld", time(NULL));
+    tmp = xstrcat(tmp, buf);
+    tmp = xstrcat(tmp, (char *)",\"seq\":");
+    sprintf(buf, "%d", Sequence++);
+    tmp = xstrcat(tmp, buf);
+    tmp = xstrcat(tmp, (char *)",\"metric\":");
+    return tmp;
+}
+
+
+
+char *topic_base(char *msgtype)
+{
+    static char	*tmp;
+
+    tmp = xstrcpy((char *)"mbv1.0/brewery/");
+    tmp = xstrcat(tmp, msgtype);
+    tmp = xstrcat(tmp, (char *)"/");
+    tmp = xstrcat(tmp, my_hostname);
+    return tmp;
+}
+
+
+
+void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
+{
+    char	*topic = NULL;
+
+    if (mqtt_connect_lost) {
+        mqtt_connect_lost = FALSE;
+        syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
+    }
+
+    if (!result) {
+	topic = topic_base((char *)"NCMD");		// TODO: do we need this??
+	topic = xstrcat(topic, (char *)"/#");
+	mosquitto_subscribe(mosq, NULL, topic, 0);
+	free(topic);
+	topic = xstrcpy((char *)"mbv1.0/fermenters/#");		// Subscribe to fermenter messages.
+	mosquitto_subscribe(mosq, NULL, topic, 0);
+	free(topic);
+	topic = NULL;
+       	mqtt_status = STATUS_CONNACK_RECVD;
+    } else {
+       	syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
+    }
+}
+
+
+
+void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
+{
+    if (mqtt_my_shutdown) {
+       	syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
+       	mqtt_connected = FALSE;
+    } else {
+       	/*
+         * The remote server was brought down. We must keep running
+         */
+       	syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
+       	mqtt_connect_lost = TRUE;
+    }
+}
+
+
+
+void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
+{
+    mqtt_last_mid_sent = mid;
+}
+
+
+
+void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
+{
+    int i;
+
+    syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
+    for (i = 1; i < qos_count; i++) {
+	syslog(LOG_NOTICE, "     %d", granted_qos[i]);
+    }
+}
+
+
+
+void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
+{
+    syslog(LOG_NOTICE, "MQTT: %s", str);
+    if (debug)
+    	fprintf(stdout, "MQTT: %s\n", str);
+}
+
+
+
+void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
+{
+    if (message->payloadlen) {
+	// TODO: process subscribed topics here.
+	if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) {
+	    node_birth_data(message->topic, (char *)message->payload);
+	    return;
+	}
+	if (strstr(message->topic, (char *)"fermenters") &&  (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) {
+	    fermenter_birth_data(message->topic, (char *)message->payload);
+	    return;
+	}
+	if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) {
+	    fermenter_log(message->topic, (char *)message->payload);
+	    return;
+	}
+	syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
+    } else {
+	if (strstr(message->topic, (char *)"NBIRTH")) {
+	    // Ignore ??
+	    fprintf(stdout, "MQTT: %s NULL\n", message->topic);
+	    return;
+	}
+	if (strstr(message->topic, (char *)"NDEATH")) {
+	    node_death(message->topic);
+	    return;
+	}
+	if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) {
+	    fermenter_death(message->topic);
+	    return;
+	}
+	syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
+    }
+}
+
+
+
+void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) 
+{
+    // publish the data
+    if (payload)
+	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
+    else
+	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
+}
+
+
+
+void publishNData(bool birth, int flag)
+{
+    char                *topic = NULL, *payload = NULL;
+    struct utsname      ubuf;
+    bool                comma = false;
+
+    payload = payload_header();
+    payload = xstrcat(payload, (char *)"{");
+			        
+    if (birth || flag & MQTT_NODE_CONTROL) {
+	payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}");
+	comma = true;
+    }
+
+    if (birth) {
+	if (comma)
+	    payload = xstrcat(payload, (char *)",");
+	payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
+	if (uname(&ubuf) == 0) {
+	    payload = xstrcat(payload, (char *)",\"os\":\"");
+	    payload = xstrcat(payload, ubuf.sysname);
+	    payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
+	    payload = xstrcat(payload, ubuf.release);
+	    payload = xstrcat(payload, (char *)"\"");
+	} else {
+	    payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
+	}
+
+	payload = xstrcat(payload, (char *)",\"FW\":\"");
+	payload = xstrcat(payload, (char *)VERSION);
+	payload = xstrcat(payload, (char *)"\"}");
+	comma = true;
+    }
+    payload = xstrcat(payload, (char *)"}}");
+
+    if (birth) {
+	topic = topic_base((char *)"NBIRTH");
+	publisher(mosq, topic, payload, true);
+    } else {
+	topic = topic_base((char *)"NDATA");
+	publisher(mosq, topic, payload, false);
+    }
+
+    free(payload);
+    payload = NULL;
+    free(topic);
+    topic = NULL;
+}
+
+
+
+int mqtt_connect(void)
+{
+    char	*id = NULL, *topic = NULL;
+    char	err[1024];
+    int		rc;
+
+    /*
+     * Initialize mosquitto communication
+     */
+    gethostname(my_hostname, 255);
+    mosquitto_lib_init();
+    id = xstrcpy((char *)"bmsd/");
+    id = xstrcat(id, my_hostname);
+    if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
+       /*
+        * Enforce maximum client id length of 23 characters
+        */
+       id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+    }
+
+    mosq = mosquitto_new(id, TRUE, NULL);
+    if (!mosq) {
+       switch(errno) {
+           case ENOMEM:
+               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
+               break;
+           case EINVAL:
+               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
+               break;
+       }
+       mosquitto_lib_cleanup();
+       return 1;
+    }
+    free(id);
+    id = NULL;
+
+    /*
+     * Set our will
+     */
+    topic = topic_base((char *)"NDEATH");
+    if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) {
+	if (rc > MOSQ_ERR_SUCCESS)
+	    syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
+	mosquitto_lib_cleanup();
+	return 2;
+    }
+    free(topic);
+    topic = NULL;
+
+    if (debug)
+       mosquitto_log_callback_set(mosq, my_log_callback);
+
+    /*
+     * Username/Password
+     */
+    if (Config.mqtt_user) {
+	if (Config.mqtt_pass) {
+	    rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass);
+	} else {
+	    rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL);
+	}
+	if (rc == MOSQ_ERR_INVAL) {
+	    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid");
+	} else if (rc == MOSQ_ERR_NOMEM) {
+	    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory");
+	}
+	if (rc != MOSQ_ERR_SUCCESS) {
+	    mosquitto_lib_cleanup();
+	    return 3;
+	}
+    }
+
+    mosquitto_max_inflight_messages_set(mosq, max_inflight);
+    mosquitto_connect_callback_set(mosq, my_connect_callback);
+    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
+    mosquitto_publish_callback_set(mosq, my_publish_callback);
+    mosquitto_message_callback_set(mosq, my_message_callback);
+    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
+
+    if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
+        if (rc == MOSQ_ERR_ERRNO) {
+            strerror_r(errno, err, 1024);
+            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
+        } else {
+            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
+        }
+        mosquitto_lib_cleanup();
+	syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
+	return 4;
+    } else {
+        mqtt_use = TRUE;
+        syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);
+
+        /*
+         * Initialise is complete, report our presence state
+         */
+        mosquitto_loop_start(mosq);
+	publishNData(true, 0);
+    }
+
+    return 0;
+}
+
+
+
+void mqtt_disconnect(void)
+{
+    int		rc;
+    char	*topic = NULL;
+
+    if (mqtt_use) {
+        /*
+         * Final publish 0 to clients/<hostname>/bmsd/state
+	 * After that, remove the retained topic.
+         */
+        syslog(LOG_NOTICE, "MQTT disconnecting");
+	topic = topic_base((char *)"NBIRTH");
+	publisher(mosq, topic, NULL, true);
+	free(topic);
+	topic = topic_base((char *)"NDEATH");
+	publisher(mosq, topic, NULL, true);
+	free(topic);
+	topic = NULL;
+	mqtt_last_mid = mqtt_mid_sent;
+        mqtt_status = STATUS_WAITING;
+	mqtt_my_shutdown = TRUE;
+
+        do {
+            if (mqtt_status == STATUS_WAITING) {
+                if (debug)
+                    fprintf(stdout, (char *)"Waiting\n");
+                if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
+                    mosquitto_disconnect(mosq);
+                    mqtt_disconnect_sent = TRUE;
+                }
+                usleep(100000);
+            }
+            rc = MOSQ_ERR_SUCCESS;
+        } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);
+
+        mosquitto_loop_stop(mosq, FALSE);
+        mosquitto_destroy(mosq);
+        mosquitto_lib_cleanup();
+	mqtt_use = FALSE;
+	mqtt_status = STATUS_CONNECTING;
+	mqtt_mid_sent = 0;
+	mqtt_last_mid = -1;
+	mqtt_last_mid_sent = -1;
+	mqtt_connected = TRUE;
+	mqtt_disconnect_sent = FALSE;
+	mqtt_connect_lost = FALSE;
+	mqtt_my_shutdown = FALSE;
+	syslog(LOG_NOTICE, "MQTT: disconnected");
+    }
+}
+
+

mercurial