thermferm/mqtt.c

Mon, 09 May 2016 21:35:55 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Mon, 09 May 2016 21:35:55 +0200
changeset 500
5aa914eb644e
parent 499
602d9968960f
child 504
862de87f9f89
permissions
-rw-r--r--

Units now have an unique alias name so that the MQTT messages are more friendly.

/*****************************************************************************
 * Copyright (C) 2016
 *   
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the mbsePi-apps
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * mbsePi-apps is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 *****************************************************************************/

#include "thermferm.h"
#include "xutil.h"
#include "mqtt.h"

extern sys_config       Config;
extern int		debug;

#ifdef HAVE_MOSQUITTO_H


/* Global variables for use in callbacks. */
int              	mqtt_qos = 0;
int              	mqtt_status = STATUS_CONNECTING;
int              	mqtt_mid_sent = 0;
int              	mqtt_last_mid = -1;
int              	mqtt_last_mid_sent = -1;
int              	mqtt_connected = TRUE;
int              	mqtt_disconnect_sent = FALSE;
int              	mqtt_connect_lost = FALSE;
int              	mqtt_my_shutdown = FALSE;
int              	mqtt_use = FALSE;
int			keepalive = 60;
unsigned int		max_inflight = 20;
struct mosquitto	*mosq = NULL;
char			*state = NULL;
char			my_hostname[256];


void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
{
    if (mqtt_connect_lost) {
       mqtt_connect_lost = FALSE;
       syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
    }

    if (!result) {
       mqtt_status = STATUS_CONNACK_RECVD;
    } else {
       syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
    }
}



void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
{
    if (mqtt_my_shutdown) {
       syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
       mqtt_connected = FALSE;
    } else {
       /*
        * The remote server was brought down. We must keep running
        */
       syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
       mqtt_connect_lost = TRUE;
    }
}



void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
{
    mqtt_last_mid_sent = mid;
}



void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
{
    syslog(LOG_NOTICE, "MQTT: %s", str);
    if (debug)
    	fprintf(stdout, "MQTT: %s\n", str);
}



#endif


void mqtt_connect(void)
{
#ifdef HAVE_MOSQUITTO_H
    char		*id = NULL;
    char		err[1024];
    int			rc;

    /*
     * Initialize mosquitto communication
     */
    gethostname(my_hostname, 255);
    mosquitto_lib_init();
    id = xstrcpy((char *)"thermferm/");
    id = xstrcat(id, my_hostname);
    if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
       /*
        * Enforce maximum client id length of 23 characters
        */
       id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
    }

    mosq = mosquitto_new(id, TRUE, NULL);
    if (!mosq) {
       switch(errno) {
           case ENOMEM:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
               break;
           case EINVAL:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
               break;
       }
       mosquitto_lib_cleanup();
       return;
    }

    if (debug) {
       mosquitto_log_callback_set(mosq, my_log_callback);
    }

    /*
     * Set our will
     */
    state = xstrcpy((char *)"clients/");
    state = xstrcat(state, my_hostname);
    state = xstrcat(state, (char *)"/thermferm/state");
    if ((rc = mosquitto_will_set(mosq, state, 1, (char *)"0", mqtt_qos, TRUE))) {
        if (rc == MOSQ_ERR_INVAL) {
            syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: input parameters invalid");
        } else if (rc == MOSQ_ERR_NOMEM) {
            syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: Out of Memory");
        } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) {
            syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: invalid payload size");
        }
        mosquitto_lib_cleanup();
        return;
    }

    mosquitto_max_inflight_messages_set(mosq, max_inflight);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_publish_callback_set(mosq, my_publish_callback);

    if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
        if (rc == MOSQ_ERR_ERRNO) {
            strerror_r(errno, err, 1024);
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
        } else {
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
        }
        mosquitto_lib_cleanup();
	syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
    } else {
        mqtt_use = TRUE;
        syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);

        /*
         * Initialise is complete, report our presence state
         */
        mosquitto_loop_start(mosq);
        mosquitto_publish(mosq, &mqtt_mid_sent, state, 1, (char *)"1", mqtt_qos, 1);
    }
#endif
}



void mqtt_disconnect(void)
{
#ifdef HAVE_MOSQUITTO_H
    int		rc;
    char	buf[128];

    if (mqtt_use) {
        /*
         * Final publish 0 to clients/<hostname>/thermferm/state
         */
        syslog(LOG_NOTICE, "MQTT disconnecting");
        sprintf(buf, "0");
        mosquitto_publish(mosq, &mqtt_mid_sent, state, strlen(buf), buf, mqtt_qos, true);
        mqtt_last_mid = mqtt_mid_sent;
        mqtt_status = STATUS_WAITING;
	mqtt_my_shutdown = TRUE;

        do {
            if (mqtt_status == STATUS_WAITING) {
                if (debug)
                    fprintf(stdout, (char *)"Waiting\n");
                if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
                    mosquitto_disconnect(mosq);
                    mqtt_disconnect_sent = TRUE;
                }
                usleep(100000);
            }
            rc = MOSQ_ERR_SUCCESS;
        } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);

        mosquitto_loop_stop(mosq, FALSE);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
	syslog(LOG_NOTICE, "MQTT disconnected");
    }
#endif
}



void mqtt_publish_int(char *uuid, char *tail, int value)
{
#ifdef HAVE_MOSQUITTO_H
    char	topic[1024], buf[128];

    if (mqtt_use) {
	snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
	snprintf(buf, 127, "%d", value);
	mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
    }
#endif
}



void mqtt_publish_float(char *uuid, char *tail, float value, int decimals)
{
#ifdef HAVE_MOSQUITTO_H
    char        topic[1024], buf[128];

    if (mqtt_use) {
	snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
	snprintf(buf, 127, "%.*f", decimals, value);
	mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
    }
#endif
}



void mqtt_publish_str(char *uuid, char *tail, char *value)
{
#ifdef HAVE_MOSQUITTO_H
    char        topic[1024], buf[128];

    if (mqtt_use) {
	snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
	snprintf(buf, 127, "%s", value);
	mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
    }
#endif
}

mercurial