bmsd/mqtt.c

Mon, 18 May 2020 11:00:59 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Mon, 18 May 2020 11:00:59 +0200
changeset 679
48f8f3fce7c0
parent 590
a43b8b85d8b3
child 747
b6fbe6821468
permissions
-rw-r--r--

Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.

/*****************************************************************************
 * Copyright (C) 2017-2020
 *   
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the bms (Brewery Management System)
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * bms is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 *****************************************************************************/

#include "bms.h"
#include "xutil.h"
#include "mqtt.h"
#include "nodes.h"
#include "fermenters.h"
#include "co2meters.h"
#include "ispindels.h"


extern sys_config       Config;
extern int		debug;



/* Global variables for use in callbacks. */
int              	mqtt_qos = 0;
int              	mqtt_status = STATUS_CONNECTING;
int              	mqtt_mid_sent = 0;
int              	mqtt_last_mid = -1;
int              	mqtt_last_mid_sent = -1;
int              	mqtt_connected = TRUE;
int              	mqtt_disconnect_sent = FALSE;
int              	mqtt_connect_lost = FALSE;
int              	mqtt_my_shutdown = FALSE;
int              	mqtt_use = FALSE;
int			keepalive = 60;
unsigned int		max_inflight = 20;
struct mosquitto	*mosq = NULL;
char			*state = NULL;
char			my_hostname[256];
int			Sequence = 0;


char *payload_header(void)
{
    static char	*tmp;
    char	buf[128];

    tmp = xstrcpy((char *)"{\"timestamp\":");
    sprintf(buf, "%ld", time(NULL));
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"seq\":");
    sprintf(buf, "%d", Sequence++);
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"metric\":");
    return tmp;
}



char *topic_base(char *msgtype)
{
    static char	*tmp;

    tmp = xstrcpy((char *)"mbv1.0/brewery/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, my_hostname);
    return tmp;
}



void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
{
    char	*topic = NULL;

    if (mqtt_connect_lost) {
        mqtt_connect_lost = FALSE;
        syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
    }

    if (!result) {
	topic = topic_base((char *)"NCMD");		// TODO: do we need this??
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = xstrcpy((char *)"mbv1.0/fermenters/#");	// Subscribe to fermenter messages.
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = xstrcpy((char *)"mbv1.0/co2meters/#");	// Subscribe to co2meter messages.
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = xstrcpy((char *)"mbv1.0/ispindels/#");  // Subscribe to ispindel messages.
        mosquitto_subscribe(mosq, NULL, topic, 0);
        free(topic);
	topic = NULL;
       	mqtt_status = STATUS_CONNACK_RECVD;
    } else {
       	syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
    }
}



void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
{
    if (mqtt_my_shutdown) {
       	syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
       	mqtt_connected = FALSE;
    } else {
       	/*
         * The remote server was brought down. We must keep running
         */
       	syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
       	mqtt_connect_lost = TRUE;
    }
}



void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
{
    mqtt_last_mid_sent = mid;
}



void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for (i = 1; i < qos_count; i++) {
	syslog(LOG_NOTICE, "     %d", granted_qos[i]);
    }
}



void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
{
    syslog(LOG_NOTICE, "MQTT: %s", str);
    if (debug)
    	fprintf(stdout, "MQTT: %s\n", str);
}



void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
{
    if (message->payloadlen) {
	// TODO: process subscribed topics here.
	if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) {
	    node_birth_data(message->topic, (char *)message->payload);
	    return;
	}
	if (strstr(message->topic, (char *)"fermenters") &&  (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) {
	    fermenter_birth_data(message->topic, (char *)message->payload);
	    return;
	}
	if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) {
	    fermenter_log(message->topic, (char *)message->payload);
	    return;
	}
	if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DCMD")) {
            return; // just ignore our own commands.
        }
	if (strstr(message->topic, (char *)"co2meters") &&  strstr(message->topic, (char *)"DBIRTH")) {
            co2meter_birth_data(message->topic, (char *)message->payload);
            return;
        }
        if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DLOG")) {
            co2meter_log(message->topic, (char *)message->payload);
            return;
        }
	if (strstr(message->topic, (char *)"ispindels") && strstr(message->topic, (char *)"DBIRTH")) {
	    ispindel_birth_data(message->topic, (char *)message->payload);
	    return;
	}
	syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
    } else {
	if (strstr(message->topic, (char *)"NBIRTH")) {
	    // Ignore ??
	    fprintf(stdout, "MQTT: %s NULL\n", message->topic);
	    return;
	}
	if (strstr(message->topic, (char *)"NDEATH")) {
	    node_death(message->topic);
	    return;
	}
	if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) {
	    fermenter_death(message->topic);
	    return;
	}
	syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
    }
}



void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) 
{
    // publish the data
    if (payload)
	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
    else
	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
}



void mqtt_publish(char *topic, char *payload)
{
    mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, false);
}



void publishNData(bool birth, int flag)
{
    char                *topic = NULL, *payload = NULL;
    struct utsname      ubuf;
    bool                comma = false;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{");
			        
    if (birth || flag & MQTT_NODE_CONTROL) {
	payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}");
	comma = true;
    }

    if (birth) {
	if (comma)
	    payload = xstrcat(payload, (char *)",");
	payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
	if (uname(&ubuf) == 0) {
	    payload = xstrcat(payload, (char *)",\"os\":\"");
	    payload = xstrcat(payload, ubuf.sysname);
	    payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
	    payload = xstrcat(payload, ubuf.release);
	    payload = xstrcat(payload, (char *)"\"");
	} else {
	    payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
	}

	payload = xstrcat(payload, (char *)",\"FW\":\"");
	payload = xstrcat(payload, (char *)VERSION);
	payload = xstrcat(payload, (char *)"\"}");
	comma = true;
    }
    payload = xstrcat(payload, (char *)"}}");

    if (birth) {
	topic = topic_base((char *)"NBIRTH");
	publisher(mosq, topic, payload, true);
    } else {
	topic = topic_base((char *)"NDATA");
	publisher(mosq, topic, payload, false);
    }

    free(payload);
    payload = NULL;
    free(topic);
    topic = NULL;
}



int mqtt_connect(void)
{
    char	*id = NULL, *topic = NULL;
    char	err[1024];
    int		rc;

    /*
     * Initialize mosquitto communication
     */
    gethostname(my_hostname, 255);
    mosquitto_lib_init();
    id = xstrcpy((char *)"bmsd/");
    id = xstrcat(id, my_hostname);
    if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
       /*
        * Enforce maximum client id length of 23 characters
        */
       id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
    }

    mosq = mosquitto_new(id, TRUE, NULL);
    if (!mosq) {
       switch(errno) {
           case ENOMEM:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
               break;
           case EINVAL:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
               break;
       }
       mosquitto_lib_cleanup();
       return 1;
    }
    free(id);
    id = NULL;

    /*
     * Set our will
     */
    topic = topic_base((char *)"NDEATH");
    if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) {
	if (rc > MOSQ_ERR_SUCCESS)
	    syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
	mosquitto_lib_cleanup();
	return 2;
    }
    free(topic);
    topic = NULL;

    if (debug)
       mosquitto_log_callback_set(mosq, my_log_callback);

    /*
     * Username/Password
     */
    if (Config.mqtt_user) {
	if (Config.mqtt_pass) {
	    rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass);
	} else {
	    rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL);
	}
	if (rc == MOSQ_ERR_INVAL) {
	    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid");
	} else if (rc == MOSQ_ERR_NOMEM) {
	    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory");
	}
	if (rc != MOSQ_ERR_SUCCESS) {
	    mosquitto_lib_cleanup();
	    return 3;
	}
    }

    mosquitto_max_inflight_messages_set(mosq, max_inflight);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_publish_callback_set(mosq, my_publish_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
        if (rc == MOSQ_ERR_ERRNO) {
            strerror_r(errno, err, 1024);
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
        } else {
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
        }
        mosquitto_lib_cleanup();
	syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
	return 4;
    } else {
        mqtt_use = TRUE;
        syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);

        /*
         * Initialise is complete, report our presence state
         */
        mosquitto_loop_start(mosq);
	publishNData(true, 0);
    }

    return 0;
}



void mqtt_disconnect(void)
{
    int		rc;
    char	*topic = NULL;

    if (mqtt_use) {
        /*
         * Final publish 0 to clients/<hostname>/bmsd/state
	 * After that, remove the retained topic.
         */
        syslog(LOG_NOTICE, "MQTT disconnecting");
	topic = topic_base((char *)"NBIRTH");
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = topic_base((char *)"NDEATH");
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
	mqtt_last_mid = mqtt_mid_sent;
        mqtt_status = STATUS_WAITING;
	mqtt_my_shutdown = TRUE;

        do {
            if (mqtt_status == STATUS_WAITING) {
                if (debug)
                    fprintf(stdout, (char *)"Waiting\n");
                if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
                    mosquitto_disconnect(mosq);
                    mqtt_disconnect_sent = TRUE;
                }
                usleep(100000);
            }
            rc = MOSQ_ERR_SUCCESS;
        } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);

        mosquitto_loop_stop(mosq, FALSE);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
	mqtt_use = FALSE;
	mqtt_status = STATUS_CONNECTING;
	mqtt_mid_sent = 0;
	mqtt_last_mid = -1;
	mqtt_last_mid_sent = -1;
	mqtt_connected = TRUE;
	mqtt_disconnect_sent = FALSE;
	mqtt_connect_lost = FALSE;
	mqtt_my_shutdown = FALSE;
	syslog(LOG_NOTICE, "MQTT: disconnected");
    }
}

mercurial