thermferm/mqtt.c

Sat, 14 Jul 2018 17:21:25 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Sat, 14 Jul 2018 17:21:25 +0200
changeset 533
49580ca85ab7
parent 518
fd36bedab944
child 534
92b546d4a839
permissions
-rw-r--r--

Versie 0.6.3. MQTT device berichten alleen als een fermenter ingeschakeld is. MQTT fermenter birth en death berichhten als een fementer in of uitgeschakeld wordt. MQTT node death bericht bij normaal afsluiten van de daemon. Alle MQTT persistent berichten worden nu goed opgeruikmd.

/*****************************************************************************
 * Copyright (C) 2016-2018
 *   
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the mbsePi-apps
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * mbsePi-apps is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 *****************************************************************************/

#include "thermferm.h"
#include "xutil.h"
#include "mqtt.h"

extern sys_config       Config;
extern int		debug;
extern const char	UNITMODE[5][8];
extern const char	PROFSTATE[5][6];
extern const char	TEMPSTATE[3][8];

int			Sequence = 0;


#ifdef HAVE_MOSQUITTO_H


/* Global variables for use in callbacks. */
int              	mqtt_qos = 0;
int              	mqtt_status = STATUS_CONNECTING;
int              	mqtt_mid_sent = 0;
int              	mqtt_last_mid = -1;
int              	mqtt_last_mid_sent = -1;
int              	mqtt_connected = TRUE;
int              	mqtt_disconnect_sent = FALSE;
int              	mqtt_connect_lost = FALSE;
int              	mqtt_my_shutdown = FALSE;
int              	mqtt_use = FALSE;
int			keepalive = 60;
unsigned int		max_inflight = 20;
struct mosquitto	*mosq = NULL;
char			*state = NULL;
char			my_hostname[256];



char *payload_header(void)
{
    char	*tmp, buf[128];

    tmp = xstrcpy((char *)"{\"timestamp\":");
    sprintf(buf, "%ld", time(NULL));
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"seq\":");
    sprintf(buf, "%d", Sequence++);
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"metric\":");
    return tmp;
}



char *topic_base(char *msgtype)
{
    char	*tmp;

    tmp = xstrcpy((char *)"mbv1.0/fermenters/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, my_hostname);
    return tmp;
}



void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
{
    char	*topic = NULL;

    if (mqtt_connect_lost) {
	mqtt_connect_lost = FALSE;
	syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
    }

    if (!result) {
	topic = topic_base((char *)"NCMD");
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = topic_base((char *)"DCMD");
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = NULL;
	mqtt_status = STATUS_CONNACK_RECVD;
    } else {
	syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
    }
}



void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
{
    if (mqtt_my_shutdown) {
       syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
       mqtt_connected = FALSE;
    } else {
       /*
        * The remote server was brought down. We must keep running
        */
       syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
       mqtt_connect_lost = TRUE;
    }
}



void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
{
    mqtt_last_mid_sent = mid;
}



void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for (i = 1; i < qos_count; i++) {
	syslog(LOG_NOTICE, "     %d", granted_qos[i]);
    }
}



void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
{
    syslog(LOG_NOTICE, "MQTT: %s", str);
    if (debug)
    	fprintf(stdout, "MQTT: %s\n", str);
}



void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
{
    if (message->payloadlen) {
	syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
        // TODO: process subscribed topics here.

    } else {
	syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
    }
}



void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) {
    // publish the data
    if (payload)
        mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
    else
	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
}



void pub_domoticz_temp(int idx, int value) {
    char	*dload = NULL;
    char	sidx[10], sval[20];

    if (idx == 0)
	return;

    sprintf(sidx, "%d", idx);
    sprintf(sval, "%.3f", value / 1000.0);

    dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
    dload = xstrcat(dload, sidx);
    dload = xstrcat(dload, (char *)",\"nvalue\":0,\"svalue\":\"");
    dload = xstrcat(dload, sval);
    dload = xstrcat(dload, (char *)"\"}");
    publisher(mosq, (char *)"domoticz/in", dload, false);
    free(dload);
    dload = NULL;
}



void pub_domoticz_output(int idx, int value) {
    char        *dload = NULL;
    char        sidx[10], sval[10];

    if (idx == 0)
	return;

    sprintf(sidx, "%d", idx);
    sprintf(sval, "%d", value);

    dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
    dload = xstrcat(dload, sidx);
    dload = xstrcat(dload, (char *)",\"nvalue\":");
    if (value >= 50)
	dload = xstrcat(dload, (char *)"1");
    else
	dload = xstrcat(dload, (char *)"0");
    dload = xstrcat(dload, (char *)",\"svalue\":\"");
    dload = xstrcat(dload, sval);
    dload = xstrcat(dload, (char *)"\"}");
    publisher(mosq, (char *)"domoticz/in", dload, false);
    free(dload);
    dload = NULL;
}



char *unit_data(units_list *unit, bool birth)
{
    char		*payload = NULL;
    char		buf[128];
    bool		comma = false;
    profiles_list       *profile;
    prof_step           *pstep;

    payload = xstrcat(payload, (char *)"{");
    if (birth) {
    	payload = xstrcat(payload, (char *)"\"uuid\":\"");
    	payload = xstrcat(payload, unit->uuid);
    	payload = xstrcat(payload, (char *)"\",\"alias\":\"");
    	payload = xstrcat(payload, unit->alias);
	payload = xstrcat(payload, (char *)"\",");
    }

    payload = xstrcat(payload, (char *)"\"name\":\"");
    payload = xstrcat(payload, unit->name);
    if (unit->air_address) {
	payload = xstrcat(payload, (char *)"\",\"air\":{\"address\":\"");
	payload = xstrcat(payload, unit->air_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]);
        payload = xstrcat(payload, (char *)"\",\"temperature\":");
        sprintf(buf, "%.3f", unit->air_temperature / 1000.0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)"\",\"air\":null");
    }

    if (unit->beer_address) {
	payload = xstrcat(payload, (char *)",\"beer\":{\"address\":\"");
	payload = xstrcat(payload, unit->beer_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]);
        payload = xstrcat(payload, (char *)"\",\"temperature\":");
        sprintf(buf, "%.3f", unit->beer_temperature / 1000.0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"beer\":null");
    }

    if (unit->chiller_address) {
	payload = xstrcat(payload, (char *)",\"chiller\":{\"address\":\"");
	payload = xstrcat(payload, unit->chiller_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->chiller_state]);
	payload = xstrcat(payload, (char *)"\",\"temperature\":");
	sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"chiller\":null");
    }

    if (unit->heater_address) {
	payload = xstrcat(payload, (char *)",\"heater\":{\"address\":\"");
	payload = xstrcat(payload, unit->heater_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->heater_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"heater\":null");
    }
    
    if (unit->cooler_address) {
	payload = xstrcat(payload, (char *)",\"cooler\":{\"address\":\"");
 	payload = xstrcat(payload, unit->cooler_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->cooler_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"cooler\":null");
    }

    if (unit->fan_address) {
	payload = xstrcat(payload, (char *)",\"fan\":{\"address\":\"");
	payload = xstrcat(payload, unit->fan_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->fan_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"fan\":null");
    }

    if (unit->door_address) {
	payload = xstrcat(payload, (char *)",\"door\":{\"address\":\"");
	payload = xstrcat(payload, unit->door_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->door_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"door\":null");
    }

    if (unit->light_address) {
	payload = xstrcat(payload, (char *)",\"light\":{\"address\":\"");
	payload = xstrcat(payload, unit->light_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->light_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"light\":null");
    }

    if (unit->psu_address) {
	payload = xstrcat(payload, (char *)",\"psu\":{\"address\":\"");
	payload = xstrcat(payload, unit->psu_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->psu_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"psu\":null");
    }

    payload = xstrcat(payload, (char *)",\"mode\":\"");
    payload = xstrcat(payload, (char *)UNITMODE[unit->mode]);
    payload = xstrcat(payload, (char *)"\",\"setpoint\":{\"low\":");
    sprintf(buf, "%.1f", unit->PID_heat->SetP);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"high\":");
    sprintf(buf, "%.1f", unit->PID_cool->SetP);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"}");
    
    if (unit->mode == UNITMODE_PROFILE && unit->profile) {
	for (profile = Config.profiles; profile; profile = profile->next) {
	    if (strcmp(unit->profile, profile->uuid) == 0) {
		payload = xstrcat(payload, (char *)",\"profile\":{\"uuid\":\"");
		payload = xstrcat(payload, unit->profile);
		payload = xstrcat(payload, (char *)",\"name\":\"");
		payload = xstrcat(payload, profile->name);
		payload = xstrcat(payload, (char *)"\",\"inittemp\":{\"low\":");
		sprintf(buf, "%.1f", profile->inittemp_lo);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"high\":");
		sprintf(buf, "%.1f", profile->inittemp_hi);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)"},\"fridgemode\":");
		sprintf(buf, "%d", profile->fridge_mode);
		payload = xstrcat(payload, buf);
		comma = false;
		if (profile->steps) {
		    payload = xstrcat(payload, (char *)",\"steps\":[");
		    for (pstep = profile->steps; pstep; pstep = pstep->next) {
			if (comma)
			    payload = xstrcat(payload, (char *)",");
			payload = xstrcat(payload, (char *)"{\"resttime\":");
			sprintf(buf, "%d", pstep->resttime);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)",\"steptime\":");
			sprintf(buf, "%d", pstep->steptime);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)",\"target\":{\"low\":");
			sprintf(buf, "%.1f", pstep->target_lo);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)",\"high\":");
			sprintf(buf, "%.1f", pstep->target_hi);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)"},\"fridgemode\":");
			sprintf(buf, "%d", pstep->fridge_mode);
			payload = xstrcat(payload, buf);
			payload = xstrcat(payload, (char *)"}");
			comma = true;
		    }
		    payload = xstrcat(payload, (char *)"]");
		} else {
		    payload = xstrcat(payload, (char *)",\"steps\":null");
		}
		payload = xstrcat(payload, (char *)"}");
		break;
	    }
	}
    } else {
	payload = xstrcat(payload, (char *)",\"profile\":null");
    }
    payload = xstrcat(payload, (char *)"}");

    return payload;
}



/**
 * @brief Publish DBIRTH for all active units. If there are no active units, don't
 *        publish anything. This function should be called at program start.
 */
void publishDBirthAll(void)
{
    char	*payload = NULL;
    units_list	*unit;
    int		comma = FALSE;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"units\":[");
    for (unit = Config.units; unit; unit = unit->next) {
	if (unit->mode != UNITMODE_OFF) {
	    if (comma)
	    	payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, unit_data(unit, true));
	    comma = TRUE;
	}
    }
    if (comma) {	// Only publish if there is at least one unit active.
    	payload = xstrcat(payload, (char *)"]}}");
    	publisher(mosq, topic_base((char *)"DBIRTH"), payload, true);
    }
    free(payload);
    payload = NULL;
}

#endif


void publishDData(units_list *unit)
{
#ifdef HAVE_MOSQUITTO_H

    char	*payload = NULL, *topic = NULL;

    if (mqtt_use) {
	payload = payload_header();
	payload = xstrcat(payload, unit_data(unit, false));
	payload = xstrcat(payload, (char *)"}");
	topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, false);
	free(payload);
	payload = NULL;
	free(topic);
	topic = NULL;
    }
#endif
}



void publishDBirth(units_list *unit)
{
#ifdef HAVE_MOSQUITTO_H

    char        *payload = NULL, *topic = NULL;

    if (mqtt_use) {
	payload = payload_header();
	payload = xstrcat(payload, unit_data(unit, true));
	payload = xstrcat(payload, (char *)"}");
	topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, true);
	free(payload);
	payload = NULL;
	free(topic);
	topic = NULL;
    }
#endif
}



void publishDDeath(units_list *unit)
{
#ifdef HAVE_MOSQUITTO_H

    char        *topic = NULL;

    if (mqtt_use) {
	// First delete presistent DBIRTH topic
	topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
	topic = xstrcat(topic_base((char *)"DDEATH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
    }
#endif
}



void publishNData(bool birth, int flag)
{
#ifdef HAVE_MOSQUITTO_H
    char		*payload = NULL, sidx[10], buf[64];
    struct utsname	ubuf;
    bool		comma = false;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{");

    if (birth || flag & MQTT_NODE_CONTROL) {
    	payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}");
	comma = true;
    }

    if (birth) {
    	if (comma)
	    payload = xstrcat(payload, (char *)",");
#ifdef HAVE_WIRINGPI_H
	payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Raspberry\",\"hardwaremodel\":\"Unknown\"");
#else
    	payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
#endif
	if (uname(&ubuf) == 0) {
            payload = xstrcat(payload, (char *)",\"os\":\"");
	    payload = xstrcat(payload, ubuf.sysname);
	    payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
	    payload = xstrcat(payload, ubuf.release);
	    payload = xstrcat(payload, (char *)"\"");
    	} else {
	    payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
    	}

    	payload = xstrcat(payload, (char *)",\"FW\":\"");
    	payload = xstrcat(payload, (char *)VERSION);
    	payload = xstrcat(payload, (char *)"\"}");
	comma = true;
    }

    if (Config.temp_address || Config.hum_address) {
	if (comma)
	    payload = xstrcat(payload, (char *)",");
	payload = xstrcat(payload, (char *)"\"HT\":{");
	if (Config.temp_address) {
	    payload = xstrcat(payload, (char *)"\"temperature\":");
	    sprintf(buf, "%.1f", Config.temp_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	if (Config.temp_address && Config.hum_address)
	    payload = xstrcat(payload, (char *)",");
	if (Config.hum_address) {
	    payload = xstrcat(payload, (char *)"\"humidity\":");
	    sprintf(buf, "%.1f", Config.hum_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	payload = xstrcat(payload, (char *)"}");
    }
    payload = xstrcat(payload, (char *)"}}");

    if (birth)
    	publisher(mosq, topic_base((char *)"NBIRTH"), payload, true);
    else
	publisher(mosq, topic_base((char *)"NDATA"), payload, false);

    free(payload);
    payload = NULL;

    if ((Config.temp_address || Config.hum_address) && Config.temp_hum_idx) {
	sprintf(sidx, "%d", Config.temp_hum_idx);
	sprintf(buf, "%.1f;%.1f;0", Config.temp_value / 1000.0, Config.hum_value / 1000.0);

	payload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
	payload = xstrcat(payload, sidx);
	payload = xstrcat(payload, (char *)",\"nvalue\":0,\"svalue\":\"");
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"\"}");
	publisher(mosq, (char *)"domoticz/in", payload, false);
	free(payload);
	payload = NULL;
    }
#endif
}



void mqtt_connect(void)
{
#ifdef HAVE_MOSQUITTO_H
    char	*id = NULL;
    char	err[1024];
    int		rc;

    /*
     * Initialize mosquitto communication
     */
    gethostname(my_hostname, 255);
    mosquitto_lib_init();
    id = xstrcpy((char *)"thermferm/");
    id = xstrcat(id, my_hostname);
    if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
       /*
        * Enforce maximum client id length of 23 characters
        */
       id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
    }

    mosq = mosquitto_new(id, TRUE, NULL);
    if (!mosq) {
       switch(errno) {
           case ENOMEM:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
               break;
           case EINVAL:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
               break;
       }
       mosquitto_lib_cleanup();
       return;
    }

    /*
     * Set our will
     */
    if ((rc = mosquitto_will_set(mosq, topic_base((char *)"NDEATH"), 0, NULL, mqtt_qos, false))) {
	if (rc > MOSQ_ERR_SUCCESS)
	    syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
        mosquitto_lib_cleanup();
        return;
    }

    if (debug)
    	mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_max_inflight_messages_set(mosq, max_inflight);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_publish_callback_set(mosq, my_publish_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
        if (rc == MOSQ_ERR_ERRNO) {
            strerror_r(errno, err, 1024);
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
        } else {
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
        }
        mosquitto_lib_cleanup();
	syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
    } else {
        mqtt_use = TRUE;
        syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);

        /*
         * Initialise is complete, report our presence state
         */
        mosquitto_loop_start(mosq);
	publishNData(true, 0);
	publishDBirthAll();
    }
#endif
}



void mqtt_disconnect(void)
{
#ifdef HAVE_MOSQUITTO_H
    int		rc;

    if (mqtt_use) {
        /*
         * Final publish 0 to clients/<hostname>/thermferm/state
	 * After that, remove the retained topic.
         */
        syslog(LOG_NOTICE, "MQTT disconnecting");
	publisher(mosq, topic_base((char *)"NBIRTH"), NULL, true);
	publisher(mosq, topic_base((char *)"NDEATH"), NULL, true);
        mqtt_last_mid = mqtt_mid_sent;
        mqtt_status = STATUS_WAITING;
	mqtt_my_shutdown = TRUE;

        do {
            if (mqtt_status == STATUS_WAITING) {
                if (debug)
                    fprintf(stdout, (char *)"Waiting\n");
                if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
                    mosquitto_disconnect(mosq);
                    mqtt_disconnect_sent = TRUE;
                }
                usleep(100000);
            }
            rc = MOSQ_ERR_SUCCESS;
        } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);

        mosquitto_loop_stop(mosq, FALSE);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
	mqtt_use = FALSE;
	syslog(LOG_NOTICE, "MQTT disconnected");
    }
#endif
}


mercurial