thermferm/mqtt.c

Thu, 19 Jul 2018 14:08:32 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Thu, 19 Jul 2018 14:08:32 +0200
changeset 547
0e4d4b45249f
parent 546
d2e8626e7118
child 548
2924fe4911d9
permissions
-rw-r--r--

Versie 0.8.0. Fixed eerste DBIRTH fout geinitialiseerde waardes.

/*****************************************************************************
 * Copyright (C) 2016-2018
 *   
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the mbsePi-apps
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * mbsePi-apps is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 *****************************************************************************/

#include "thermferm.h"
#include "xutil.h"
#include "mqtt.h"

extern sys_config       Config;
extern int		debug;
extern const char	UNITMODE[5][8];
extern const char	PROFSTATE[5][6];
extern const char	TEMPSTATE[3][8];

int			Sequence = 0;


#ifdef HAVE_MOSQUITTO_H


/* Global variables for use in callbacks. */
int              	mqtt_qos = 0;
int              	mqtt_status = STATUS_CONNECTING;
int              	mqtt_mid_sent = 0;
int              	mqtt_last_mid = -1;
int              	mqtt_last_mid_sent = -1;
int              	mqtt_connected = TRUE;
int              	mqtt_disconnect_sent = FALSE;
int              	mqtt_connect_lost = FALSE;
int              	mqtt_my_shutdown = FALSE;
int              	mqtt_use = FALSE;
int			keepalive = 60;
unsigned int		max_inflight = 20;
struct mosquitto	*mosq = NULL;
char			*state = NULL;
char			my_hostname[256];



char *payload_header(void)
{
    char	*tmp, buf[128];

    tmp = xstrcpy((char *)"{\"timestamp\":");
    sprintf(buf, "%ld", time(NULL));
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"seq\":");
    sprintf(buf, "%d", Sequence++);
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"metric\":");
    return tmp;
}



char *topic_base(char *msgtype)
{
    char	*tmp;

    tmp = xstrcpy((char *)"mbv1.0/fermenters/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, my_hostname);
    return tmp;
}



void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
{
    char	*topic = NULL;

    if (mqtt_connect_lost) {
	mqtt_connect_lost = FALSE;
	syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
    }

    if (!result) {
	topic = topic_base((char *)"NCMD");
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = topic_base((char *)"DCMD");
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = NULL;
	mqtt_status = STATUS_CONNACK_RECVD;
    } else {
	syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
    }
}



void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
{
    if (mqtt_my_shutdown) {
       syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
       mqtt_connected = FALSE;
    } else {
       /*
        * The remote server was brought down. We must keep running
        */
       syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
       mqtt_connect_lost = TRUE;
    }
}



void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
{
    mqtt_last_mid_sent = mid;
}



void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for (i = 1; i < qos_count; i++) {
	syslog(LOG_NOTICE, "     %d", granted_qos[i]);
    }
}



void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
{
    syslog(LOG_NOTICE, "MQTT: %s", str);
    if (debug)
    	fprintf(stdout, "MQTT: %s\n", str);
}



void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
{
    if (message->payloadlen) {
	syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
        // TODO: process subscribed topics here.

    } else {
	syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
    }
}



void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) {
    // publish the data
    if (payload)
        mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
    else
	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
}



void pub_domoticz_temp(int idx, int value) {
    char	*dload = NULL;
    char	sidx[10], sval[20];

    if (idx == 0)
	return;

    sprintf(sidx, "%d", idx);
    sprintf(sval, "%.3f", value / 1000.0);

    dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
    dload = xstrcat(dload, sidx);
    dload = xstrcat(dload, (char *)",\"nvalue\":0,\"svalue\":\"");
    dload = xstrcat(dload, sval);
    dload = xstrcat(dload, (char *)"\"}");
    publisher(mosq, (char *)"domoticz/in", dload, false);
    free(dload);
    dload = NULL;
}



void pub_domoticz_output(int idx, int value) {
    char        *dload = NULL;
    char        sidx[10], sval[10];

    if (idx == 0)
	return;

    sprintf(sidx, "%d", idx);
    sprintf(sval, "%d", value);

    dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
    dload = xstrcat(dload, sidx);
    dload = xstrcat(dload, (char *)",\"nvalue\":");
    if (value >= 50)
	dload = xstrcat(dload, (char *)"1");
    else
	dload = xstrcat(dload, (char *)"0");
    dload = xstrcat(dload, (char *)",\"svalue\":\"");
    dload = xstrcat(dload, sval);
    dload = xstrcat(dload, (char *)"\"}");
    publisher(mosq, (char *)"domoticz/in", dload, false);
    free(dload);
    dload = NULL;
}



char *unit_data(units_list *unit, bool birth)
{
    char		*payload = NULL;
    char		buf[128];
    bool		comma = false;
    profiles_list       *profile;
    prof_step           *pstep;

    payload = xstrcat(payload, (char *)"{");

    /*
     * Fixed unit values, never change these!
     */
    if (birth) {
    	payload = xstrcat(payload, (char *)"\"uuid\":\"");
    	payload = xstrcat(payload, unit->uuid);
    	payload = xstrcat(payload, (char *)"\",\"alias\":\"");
    	payload = xstrcat(payload, unit->alias);
	payload = xstrcat(payload, (char *)"\",");
    }

    /*
     * Product (beer) loaded information.
     * TODO: extend with uuid and product code.
     */
    if (unit->name || strlen(unit->name)) {
    	payload = xstrcat(payload, (char *)"\"product\":{\"code\":null,\"name\":\"");
    	payload = xstrcat(payload, unit->name);
    	payload = xstrcat(payload, (char *)"\"}");
    } else {
	payload = xstrcat(payload, (char *)"\"product\":null");
    }

    /*
     * Air temperature sensor
     */
    if (unit->air_address) {
	payload = xstrcat(payload, (char *)",\"air\":{\"address\":\"");
	payload = xstrcat(payload, unit->air_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]);
        payload = xstrcat(payload, (char *)"\",\"temperature\":");
        sprintf(buf, "%.3f", unit->air_temperature / 1000.0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"air\":null");
    }

    /*
     * Beer temperature sensor
     */
    if (unit->beer_address) {
	payload = xstrcat(payload, (char *)",\"beer\":{\"address\":\"");
	payload = xstrcat(payload, unit->beer_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]);
        payload = xstrcat(payload, (char *)"\",\"temperature\":");
        sprintf(buf, "%.3f", unit->beer_temperature / 1000.0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"beer\":null");
    }

    /*
     * External chiller temperature sensor
     */
    if (unit->chiller_address) {
	payload = xstrcat(payload, (char *)",\"chiller\":{\"address\":\"");
	payload = xstrcat(payload, unit->chiller_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->chiller_state]);
	payload = xstrcat(payload, (char *)"\",\"temperature\":");
	sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"chiller\":null");
    }

    /*
     * Heater control, power 0..100% and usage count.
     */
    if (unit->heater_address) {
	payload = xstrcat(payload, (char *)",\"heater\":{\"address\":\"");
	payload = xstrcat(payload, unit->heater_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->heater_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->heater_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"heater\":null");
    }
    
    /*
     * Cooler control, power 0..100% and usage counter.
     */
    if (unit->cooler_address) {
	payload = xstrcat(payload, (char *)",\"cooler\":{\"address\":\"");
 	payload = xstrcat(payload, unit->cooler_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->cooler_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->cooler_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"cooler\":null");
    }

    /*
     * Fan control, 0..100% and usage counter.
     */
    if (unit->fan_address) {
	payload = xstrcat(payload, (char *)",\"fan\":{\"address\":\"");
	payload = xstrcat(payload, unit->fan_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->fan_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->fan_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"fan\":null");
    }

    /*
     * Interior lights control, 0..100% and usage counter.
     */
    if (unit->light_address) {
	payload = xstrcat(payload, (char *)",\"light\":{\"address\":\"");
	payload = xstrcat(payload, unit->light_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->light_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->light_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"light\":null");
    }

    /*
     * Door sensor.
     */
    if (unit->door_address) {
	payload = xstrcat(payload, (char *)",\"door\":{\"address\":\"");
	payload = xstrcat(payload, unit->door_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->door_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"door\":null");
    }

    /*
     * PSU status
     */
    if (unit->psu_address) {
	payload = xstrcat(payload, (char *)",\"psu\":{\"address\":\"");
	payload = xstrcat(payload, unit->psu_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->psu_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"psu\":null");
    }

    /*
     * Working mode and setpoints
     */
    payload = xstrcat(payload, (char *)",\"mode\":\"");
    payload = xstrcat(payload, (char *)UNITMODE[unit->mode]);
    payload = xstrcat(payload, (char *)"\",\"setpoint\":{\"low\":");
    sprintf(buf, "%.1f", unit->PID_heat->SetP);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"high\":");
    sprintf(buf, "%.1f", unit->PID_cool->SetP);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"alarm\":");
    sprintf(buf, "%d", unit->alarm_flag);
    payload = xstrcat(payload, buf);

    /*
     * Loaded profile and state
     */
    if (unit->mode == UNITMODE_PROFILE && unit->profile) {
	for (profile = Config.profiles; profile; profile = profile->next) {
	    if (strcmp(unit->profile, profile->uuid) == 0) {
		payload = xstrcat(payload, (char *)",\"profile\":{\"uuid\":\"");
		payload = xstrcat(payload, unit->profile);
		payload = xstrcat(payload, (char *)",\"name\":\"");
		payload = xstrcat(payload, profile->name);
		payload = xstrcat(payload, (char *)"\",\"inittemp\":{\"low\":");
		sprintf(buf, "%.1f", profile->inittemp_lo);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"high\":");
		sprintf(buf, "%.1f", profile->inittemp_hi);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)"},\"fridgemode\":");
		sprintf(buf, "%d", profile->fridge_mode);
		payload = xstrcat(payload, buf);
		comma = false;
		if (profile->steps) {
		    payload = xstrcat(payload, (char *)",\"steps\":[");
		    for (pstep = profile->steps; pstep; pstep = pstep->next) {
			if (comma)
			    payload = xstrcat(payload, (char *)",");
			payload = xstrcat(payload, (char *)"{\"resttime\":");
			sprintf(buf, "%d", pstep->resttime);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)",\"steptime\":");
			sprintf(buf, "%d", pstep->steptime);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)",\"target\":{\"low\":");
			sprintf(buf, "%.1f", pstep->target_lo);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)",\"high\":");
			sprintf(buf, "%.1f", pstep->target_hi);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)"},\"fridgemode\":");
			sprintf(buf, "%d", pstep->fridge_mode);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)"}");
			comma = true;
		    }
		    payload = xstrcat(payload, (char *)"]");
		} else {
		    payload = xstrcat(payload, (char *)",\"steps\":null");
		}
		payload = xstrcat(payload, (char *)"}");
		break;
	    }
	}
    } else {
	payload = xstrcat(payload, (char *)",\"profile\":null");
    }
    payload = xstrcat(payload, (char *)"}");

    return payload;
}



/**
 * @brief Publish DBIRTH for all active units. If there are no active units, don't
 *        publish anything. This function should be called at program start.
 */
void publishDBirthAll(void)
{
    char	*payload = NULL;
    units_list	*unit;
    int		comma = FALSE;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"units\":[");
    for (unit = Config.units; unit; unit = unit->next) {
	if (unit->mode != UNITMODE_OFF) {
	    if (comma)
	    	payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, unit_data(unit, true));
	    comma = TRUE;
	}
    }
    if (comma) {	// Only publish if there is at least one unit active.
    	payload = xstrcat(payload, (char *)"]}}");
    	publisher(mosq, topic_base((char *)"DBIRTH"), payload, true);
    }
    free(payload);
    payload = NULL;
}



void publishDData(units_list *unit)
{
    char	*payload = NULL, *topic = NULL;

    if (mqtt_use) {
	payload = payload_header();
	payload = xstrcat(payload, unit_data(unit, false));
	payload = xstrcat(payload, (char *)"}");
	topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, false);
	free(payload);
	payload = NULL;
	free(topic);
	topic = NULL;
    }
}



void publishDBirth(units_list *unit)
{
    char        *payload = NULL, *topic = NULL;

    if (mqtt_use) {
	payload = payload_header();
	payload = xstrcat(payload, unit_data(unit, true));
	payload = xstrcat(payload, (char *)"}");
	topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, true);
	free(payload);
	payload = NULL;
	free(topic);
	topic = NULL;
    }
}



void publishDDeath(units_list *unit)
{
    char        *topic = NULL;

    if (mqtt_use) {
	// First delete presistent DBIRTH topic
	topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
	topic = xstrcat(topic_base((char *)"DDEATH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
    }
}



void publishNData(bool birth, int flag)
{
    char		*payload = NULL, sidx[10], buf[64];
    struct utsname	ubuf;
    bool		comma = false;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{");

    if (birth) {
	payload = xstrcat(payload, (char *)"\"uuid\":\"");
	payload = xstrcat(payload, Config.uuid);
	payload = xstrcat(payload, (char *)"\",");
#ifdef HAVE_WIRINGPI_H
	payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Raspberry\",\"hardwaremodel\":\"Unknown\"");
#else
	if (uname(&ubuf) == 0) {
	    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"");
	    payload = xstrcat(payload, ubuf.machine);
	    payload = xstrcat(payload, (char *)"\",\"hardwaremodel\":\"Unknown\"");
	} else {
    	    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
	}
#endif
	if (uname(&ubuf) == 0) {
            payload = xstrcat(payload, (char *)",\"os\":\"");
	    payload = xstrcat(payload, ubuf.sysname);
	    payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
	    payload = xstrcat(payload, ubuf.release);
	    payload = xstrcat(payload, (char *)"\"");
    	} else {
	    payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
    	}

    	payload = xstrcat(payload, (char *)",\"FW\":\"");
    	payload = xstrcat(payload, (char *)VERSION);
    	payload = xstrcat(payload, (char *)"\"}");
	comma = true;
    }

    if (Config.temp_address || Config.hum_address) {
	if (comma)
	    payload = xstrcat(payload, (char *)",");
	payload = xstrcat(payload, (char *)"\"THB\":{");
	if (Config.temp_address) {
	    payload = xstrcat(payload, (char *)"\"temperature\":");
	    sprintf(buf, "%.1f", Config.temp_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	if (Config.temp_address && Config.hum_address)
	    payload = xstrcat(payload, (char *)",");
	if (Config.hum_address) {
	    payload = xstrcat(payload, (char *)"\"humidity\":");
	    sprintf(buf, "%.1f", Config.hum_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	payload = xstrcat(payload, (char *)"}");
    }
    payload = xstrcat(payload, (char *)"}}");

    if (birth)
    	publisher(mosq, topic_base((char *)"NBIRTH"), payload, true);
    else
	publisher(mosq, topic_base((char *)"NDATA"), payload, false);

    free(payload);
    payload = NULL;

    if ((Config.temp_address || Config.hum_address) && Config.temp_hum_idx) {
	sprintf(sidx, "%d", Config.temp_hum_idx);
	sprintf(buf, "%.1f;%.1f;0", Config.temp_value / 1000.0, Config.hum_value / 1000.0);

	payload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
	payload = xstrcat(payload, sidx);
	payload = xstrcat(payload, (char *)",\"nvalue\":0,\"svalue\":\"");
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"\"}");
	publisher(mosq, (char *)"domoticz/in", payload, false);
	free(payload);
	payload = NULL;
    }
}



void mqtt_connect(void)
{
    char	*id = NULL;
    char	err[1024];
    int		rc;

    /*
     * Initialize mosquitto communication
     */
    gethostname(my_hostname, 255);
    mosquitto_lib_init();
    id = xstrcpy((char *)"thermferm/");
    id = xstrcat(id, my_hostname);
    if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
       /*
        * Enforce maximum client id length of 23 characters
        */
       id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
    }

    mosq = mosquitto_new(id, TRUE, NULL);
    if (!mosq) {
       switch(errno) {
           case ENOMEM:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
               break;
           case EINVAL:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
               break;
       }
       mosquitto_lib_cleanup();
       return;
    }

    /*
     * Set our will
     */
    if ((rc = mosquitto_will_set(mosq, topic_base((char *)"NDEATH"), 0, NULL, mqtt_qos, false))) {
	if (rc > MOSQ_ERR_SUCCESS)
	    syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
        mosquitto_lib_cleanup();
        return;
    }

    if (debug)
    	mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_max_inflight_messages_set(mosq, max_inflight);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_publish_callback_set(mosq, my_publish_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
        if (rc == MOSQ_ERR_ERRNO) {
            strerror_r(errno, err, 1024);
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
        } else {
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
        }
        mosquitto_lib_cleanup();
	syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
    } else {
        mqtt_use = TRUE;
        syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);

        /*
         * Initialise is complete, report our presence state
         */
        mosquitto_loop_start(mosq);
	publishNData(true, 0);
    }
}



void mqtt_disconnect(void)
{
    int		rc;

    if (mqtt_use) {
        /*
         * Final publish 0 to clients/<hostname>/thermferm/state
	 * After that, remove the retained topic.
         */
        syslog(LOG_NOTICE, "MQTT disconnecting");
	publisher(mosq, topic_base((char *)"DBIRTH"), NULL, true);	// Not always needed, but ...
	publisher(mosq, topic_base((char *)"DDEATH"), NULL, true);
	publisher(mosq, topic_base((char *)"NBIRTH"), NULL, true);
	publisher(mosq, topic_base((char *)"NDEATH"), NULL, true);
        mqtt_last_mid = mqtt_mid_sent;
        mqtt_status = STATUS_WAITING;
	mqtt_my_shutdown = TRUE;

        do {
            if (mqtt_status == STATUS_WAITING) {
                if (debug)
                    fprintf(stdout, (char *)"Waiting\n");
                if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
                    mosquitto_disconnect(mosq);
                    mqtt_disconnect_sent = TRUE;
                }
                usleep(100000);
            }
            rc = MOSQ_ERR_SUCCESS;
        } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);

        mosquitto_loop_stop(mosq, FALSE);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
	mqtt_use = FALSE;
	mqtt_status = STATUS_CONNECTING;
	mqtt_mid_sent = 0;
	mqtt_last_mid = -1;
	mqtt_last_mid_sent = -1;
	mqtt_connected = TRUE;
	mqtt_disconnect_sent = FALSE;
	mqtt_connect_lost = FALSE;
	mqtt_my_shutdown = FALSE;
	syslog(LOG_NOTICE, "MQTT disconnected");
    }
}


#endif

mercurial