thermferm/mqtt.c

Sun, 27 Sep 2020 17:08:27 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Sun, 27 Sep 2020 17:08:27 +0200
changeset 610
5563ee815701
parent 608
a69b5d92fd72
child 612
452f79a5ad71
permissions
-rw-r--r--

Implemented MQTT user/password auth.

/*****************************************************************************
 * Copyright (C) 2016-2020
 *   
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the mbsePi-apps
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * mbsePi-apps is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 *****************************************************************************/

#include "thermferm.h"
#include "rdconfig.h"
#include "devices.h"
#include "xutil.h"
#include "mqtt.h"

extern sys_config       Config;
extern int		debug;
extern int		my_shutdown;
extern int		my_reboot;
extern const char	UNITMODE[5][8];
extern const char	UNITSTAGE[4][12];
extern const char	PROFSTATE[5][6];
extern const char	TEMPSTATE[3][8];

int			Sequence = 0;




/* Global variables for use in callbacks. */
int              	mqtt_qos = 0;
int              	mqtt_status = STATUS_CONNECTING;
int              	mqtt_mid_sent = 0;
int              	mqtt_last_mid = -1;
int              	mqtt_last_mid_sent = -1;
int              	mqtt_connected = TRUE;
int              	mqtt_disconnect_sent = FALSE;
int              	mqtt_connect_lost = FALSE;
int              	mqtt_my_shutdown = FALSE;
int              	mqtt_use = FALSE;
int			keepalive = 60;
unsigned int		max_inflight = 20;
struct mosquitto	*mosq = NULL;
char			*state = NULL;
char			my_hostname[256];


#ifndef HAVE_WIRINGPI_H

/*
 * Code from wiringPi written by Gordon Henderson.
 * Copied here to have some sort of hardware detection without wiringPi.
 */
const char *piModelNames [20] =
{
  "Model A",    //  0
  "Model B",    //  1
  "Model A+",   //  2
  "Model B+",   //  3
  "Pi 2",       //  4
  "Alpha",      //  5
  "CM",         //  6
  "Unknown07",  // 07
  "Pi 3",       // 08
  "Pi Zero",    // 09
  "CM3",        // 10
  "Unknown11",  // 11
  "Pi Zero-W",  // 12
  "Pi 3B+",     // 13
  "Pi 3A+",     // 14
  "Unknown15",  // 15
  "CM3+",       // 16
  "Unknown17",  // 17
  "Unknown18",  // 18
  "Unknown19",  // 19
};
const char *piRevisionNames [16] =
{
  "00",
  "01",
  "02",
  "03",
  "04",
  "05",
  "06",
  "07",
  "08",
  "09",
  "10",
  "11",
  "12",
  "13",
  "14",
  "15",
} ;
const char *piMakerNames [16] =
{
  "Sony",       //       0
  "Egoman",     //       1
  "Embest",     //       2
  "Unknown",    //       3
  "Embest",     //       4
  "Unknown05",  //       5
  "Unknown06",  //       6
  "Unknown07",  //       7
  "Unknown08",  //       8
  "Unknown09",  //       9
  "Unknown10",  //      10
  "Unknown11",  //      11
  "Unknown12",  //      12
  "Unknown13",  //      13
  "Unknown14",  //      14
  "Unknown15",  //      15
} ;
const int piMemorySize [8] =
{
   256,         //       0
   512,         //       1
  1024,         //       2
     0,         //       3
     0,         //       4
     0,         //       5
     0,         //       6
     0,         //       7
} ;

void piBoardId (int *model, int *rev, int *mem, int *maker, int *warranty)
{
  FILE *cpuFd ;
  char line [120] ;
  char *c ;
  unsigned int revision ;
  int bRev, bType, bProc, bMfg, bMem, bWarranty ;

    *model = -1;	// Mark no info
    if ((cpuFd = fopen ("/proc/cpuinfo", "r")) == NULL) {
	syslog(LOG_NOTICE, "Unable to open /proc/cpuinfo") ;
	return;
    }

    while (fgets (line, 120, cpuFd) != NULL)
    	if (strncmp (line, "Revision", 8) == 0)
      	    break ;

    fclose (cpuFd) ;

    if (strncmp (line, "Revision", 8) != 0) {
    	syslog(LOG_NOTICE, "No \"Revision\" line");
	return;
    }

// Chomp trailing CR/NL

    for (c = &line [strlen (line) - 1] ; (*c == '\n') || (*c == '\r') ; --c)
    	*c = 0 ;
  
    if (debug)
	fprintf(stdout, "piBoardId: Revision string: %s\n", line) ;

// Need to work out if it's using the new or old encoding scheme:

// Scan to the first character of the revision number

    for (c = line ; *c ; ++c)
	if (*c == ':')
	    break ;

    if (*c != ':') {
	syslog(LOG_NOTICE, "Bogus \"Revision\" line (no colon)") ;
	return;
    }

// Chomp spaces

    ++c ;
    while (isspace (*c))
    	++c ;

    if (!isxdigit (*c))
    	syslog(LOG_NOTICE, "Bogus \"Revision\" line (no hex digit at start of revision)") ;

    revision = (unsigned int)strtol (c, NULL, 16) ; // Hex number with no leading 0x

// Check for new way:

    if ((revision &  (1 << 23)) != 0) {    // New way
	if (debug)
	    fprintf(stdout, "piBoardId: New Way: revision is: %08X\n", revision) ;

	bRev      = (revision & (0x0F <<  0)) >>  0 ;
	bType     = (revision & (0xFF <<  4)) >>  4 ;
	bProc     = (revision & (0x0F << 12)) >> 12 ;       // Not used for now.
	bMfg      = (revision & (0x0F << 16)) >> 16 ;
	bMem      = (revision & (0x07 << 20)) >> 20 ;
	bWarranty = (revision & (0x03 << 24)) != 0 ;
    
	*model    = bType ;
	*rev      = bRev ;
	*mem      = bMem ;
	*maker    = bMfg  ;
	*warranty = bWarranty ;

	if (debug)
	    fprintf(stdout, "piBoardId: rev: %d, type: %d, proc: %d, mfg: %d, mem: %d, warranty: %d\n", bRev, bType, bProc, bMfg, bMem, bWarranty) ;
    } else {                                 // Old way
	if (debug)
	    fprintf(stdout, "piBoardId: Old Way: revision is: %s\n", c) ;

	if (!isdigit (*c)) {
	    syslog(LOG_NOTICE, "Bogus \"Revision\" line (no digit at start of revision)") ;
	    return;
	}

// Make sure its long enough

	if (strlen (c) < 4) {
	    syslog(LOG_NOTICE, "Bogus \"Revision\" line (not long enough)") ;
	    return;
	}

// If longer than 4, we'll assume it's been overvolted

	*warranty = strlen (c) > 4 ;
  
// Extract last 4 characters:

	c = c + strlen (c) - 4 ;

// Fill out the replys as appropriate

    	/**/ if (strcmp (c, "0002") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1   ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }
    	else if (strcmp (c, "0003") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_1 ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }

    	else if (strcmp (c, "0004") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_2 ; *mem = 0 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "0005") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_2 ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }
    	else if (strcmp (c, "0006") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_2 ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }

    	else if (strcmp (c, "0007") == 0) { *model = PI_MODEL_A  ; *rev = PI_VERSION_1_2 ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }
    	else if (strcmp (c, "0008") == 0) { *model = PI_MODEL_A  ; *rev = PI_VERSION_1_2 ; *mem = 0 ; *maker = PI_MAKER_SONY ;  ; }
    	else if (strcmp (c, "0009") == 0) { *model = PI_MODEL_A  ; *rev = PI_VERSION_1_2 ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }

    	else if (strcmp (c, "000d") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_EGOMAN  ; }
    	else if (strcmp (c, "000e") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "000f") == 0) { *model = PI_MODEL_B  ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_EGOMAN  ; }

    	else if (strcmp (c, "0010") == 0) { *model = PI_MODEL_BP ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "0013") == 0) { *model = PI_MODEL_BP ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_EMBEST  ; }
    	else if (strcmp (c, "0016") == 0) { *model = PI_MODEL_BP ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "0019") == 0) { *model = PI_MODEL_BP ; *rev = PI_VERSION_1_2 ; *mem = 1 ; *maker = PI_MAKER_EGOMAN  ; }
    	else if (strcmp (c, "0011") == 0) { *model = PI_MODEL_CM ; *rev = PI_VERSION_1_1 ; *mem = 1 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "0014") == 0) { *model = PI_MODEL_CM ; *rev = PI_VERSION_1_1 ; *mem = 1 ; *maker = PI_MAKER_EMBEST  ; }
    	else if (strcmp (c, "0017") == 0) { *model = PI_MODEL_CM ; *rev = PI_VERSION_1_1 ; *mem = 1 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "001a") == 0) { *model = PI_MODEL_CM ; *rev = PI_VERSION_1_1 ; *mem = 1 ; *maker = PI_MAKER_EGOMAN  ; }

    	else if (strcmp (c, "0012") == 0) { *model = PI_MODEL_AP ; *rev = PI_VERSION_1_1 ; *mem = 0 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "0015") == 0) { *model = PI_MODEL_AP ; *rev = PI_VERSION_1_1 ; *mem = 1 ; *maker = PI_MAKER_EMBEST  ; }
    	else if (strcmp (c, "0018") == 0) { *model = PI_MODEL_AP ; *rev = PI_VERSION_1_1 ; *mem = 0 ; *maker = PI_MAKER_SONY    ; }
    	else if (strcmp (c, "001b") == 0) { *model = PI_MODEL_AP ; *rev = PI_VERSION_1_1 ; *mem = 0 ; *maker = PI_MAKER_EGOMAN  ; }

    	else                              { *model = -1          ; *rev = 0              ; *mem = 0 ; *maker = 0 ;               }
    }
}
#endif



char *payload_header(void)
{
    char	*tmp, buf[128];

    tmp = xstrcpy((char *)"{\"timestamp\":");
    sprintf(buf, "%ld", time(NULL));
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"seq\":");
    sprintf(buf, "%d", Sequence++);
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"metric\":");
    return tmp;
}



char *topic_base(char *msgtype)
{
    char	*tmp;

    tmp = xstrcpy((char *)"mbv1.0/fermenters/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, my_hostname);
    return tmp;
}



void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
{
    char	*topic = NULL;

    if (mqtt_connect_lost) {
	mqtt_connect_lost = FALSE;
	syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
    }

    if (!result) {
	topic = topic_base((char *)"NCMD");
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = topic_base((char *)"DCMD");
	topic = xstrcat(topic, (char *)"/#");
	mosquitto_subscribe(mosq, NULL, topic, 0);
	free(topic);
	topic = NULL;
	mqtt_status = STATUS_CONNACK_RECVD;
    } else {
	syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
    }
}



void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
{
    if (mqtt_my_shutdown) {
       syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
       mqtt_connected = FALSE;
    } else {
       /*
        * The remote server was brought down. We must keep running
        */
       syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
       mqtt_connect_lost = TRUE;
    }
}



void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
{
    mqtt_last_mid_sent = mid;
}



void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for (i = 1; i < qos_count; i++) {
	syslog(LOG_NOTICE, "     %d", granted_qos[i]);
    }
}



void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
{
//    if (debug)
//    	fprintf(stdout, "MQTT: %s\n", str);
}



void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
{
    char		*message_type, *message_node, *message_alias;
    units_list		*unit;
    prof_step           *step, *oldstep;
    struct json_object	*jobj, *metric, *val, *setpoint, *profile, *profile1, *steps, *step1;
    time_t		timestamp;
    int			timediff;

    if (message->payloadlen) {
	/*
	 * Process received commands
	 */
	strtok(message->topic, "/");	// Ignore mbv1.0
	strtok(NULL, "/");		// Ignore group_id
	message_type = strtok(NULL, "/");
	message_node = strtok(NULL, "/\0");
	message_alias = strtok(NULL, "\0");

	jobj = json_tokener_parse(message->payload);
	if (json_object_object_get_ex(jobj, "timestamp", &val)) {
	    timestamp = json_object_get_int(val);
	    timediff = (int)timestamp - time(NULL);
	    if ((timediff < 61) && (timediff > -61)) {
		if (json_object_object_get_ex(jobj, "metric", &metric)) {
		    if ((json_object_object_get_ex(metric, "Node Control/Reboot", &val)) && (strcmp(message_type, "NCMD") == 0)) {
			if (json_object_get_boolean(val) == true) {
			    syslog(LOG_NOTICE, "MQTT: `Node Control/Reboot' command");
			    /*
			     * Reboot. The erver process will restart which is handled
			     * in the main thread loop.
			     */
			    my_reboot = my_shutdown = TRUE;
			    return;
			}
		    }
		    if ((json_object_object_get_ex(metric, "Node Control/Rebirth", &val)) && (strcmp(message_type, "NCMD") == 0)) {
			if (json_object_get_boolean(val) == true) {
			    /*
			     * Resend all birth certificates.
			     */
			    syslog(LOG_NOTICE, "MQTT: `Node Control/Rebirth' command");
			    publishNData(true, 0);
			    publishDBirthAll();
			    return;
			}
		    }

		    /*
		     * DCMD, commands and configuration changes for a single fermenter.
		     */
		    if ((strcmp(message_type, "DCMD") == 0) && message_node && message_alias) {
			syslog(LOG_NOTICE, "%s", (char *)json_object_get_string(metric));
			for (unit = Config.units ; unit; unit = unit->next) {
			    if (strcmp(unit->alias, message_alias) == 0) {
				syslog(LOG_NOTICE, "MQTT: DCMD for %s/%s", (char *)message_node, (char *)message_alias);
			    	if (json_object_object_get_ex(metric, "stage", &val)) {
				    for (int i = 0; i < 4; i++) {
					if (strcmp((char *)json_object_get_string(val), UNITSTAGE[i]) == 0) {
				    	    if (unit->stage != i) {
						syslog(LOG_NOTICE, "DCMD change fermenter %s: stage to %s", message_alias, UNITSTAGE[i]);
						unit->mqtt_flag |= MQTT_FLAG_DATA;
				    	    	unit->stage = i;
						if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg)
						    unit->event_msg = xstrcpy((char *)UNITSTAGE[i]);
					    }
					    break;
					}
				    }
			    	}

				if (json_object_object_get_ex(metric, "mode", &val)) {
				    for (int i = 0; i < 5; i++) {
					if (strcmp((char *)json_object_get_string(val), UNITMODE[i]) == 0) {
					    if (unit->mode != i) {
						unit->mqtt_flag |= MQTT_FLAG_DATA;
						/* Initialize log if the unit is turned on */
						if ((unit->mode == UNITMODE_OFF) && (i != UNITMODE_OFF)) {
						    unit->mqtt_flag |= MQTT_FLAG_BIRTH;
						}
						if (i == UNITMODE_PROFILE) {
						    /* Do some checks and refuse profile mode cannot be set */
						    if (unit->profile_uuid == NULL) {
							syslog(LOG_NOTICE, "Fermenter unit %s refuse profile, not loaded", message_alias);
							break;
						    }
						}
						syslog(LOG_NOTICE, "DCMD change fermenter %s: mode to %s", message_alias, UNITMODE[i]);
						unit->mode = i;
						if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg)
						    unit->event_msg = xstrcpy((char *)UNITMODE[i]);
						/* Allways turn everything off after a mode change */
						unit->PID_cool->OutP = unit->PID_heat->OutP = 0.0;
						unit->PID_cool->Mode = unit->PID_heat->Mode = PID_MODE_NONE;
						unit->heater_state = unit->cooler_state = unit->fan_state = unit->light_state = unit->light_timer = 0;
						unit->heater_wait = unit->cooler_wait = unit->fan_wait = unit->light_wait = 0;
						device_out(unit->heater_address, unit->heater_state);
						device_out(unit->cooler_address, unit->cooler_state);
						device_out(unit->fan_address, unit->fan_state);
						device_out(unit->light_address, unit->light_state);
						if (unit->mode == UNITMODE_PROFILE) {
						    /*
						     * Set a sane default until it will be overruled by the
						     * main processing loop.
						     */
						    unit->prof_target_lo = unit->profile_inittemp_lo;
						    unit->prof_target_hi = unit->profile_inittemp_hi;;
						    unit->prof_fridge_mode = 0;
						    unit->prof_state = PROFILE_OFF;
						    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
						    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
						}
					    }
					    break;
					}
				    }
				}

				if (json_object_object_get_ex(metric, "setpoint", &setpoint)) {
				    if ((unit->mode == UNITMODE_FRIDGE) || (unit->mode == UNITMODE_BEER)) {
					/*
					 * Only set new setpoints if running in FRIDGE or in BEER mode.
					 */
				        if (json_object_object_get_ex(setpoint, "low", &val))
					    unit->PID_heat->SetP = json_object_get_double(val);
				    	if (json_object_object_get_ex(setpoint, "high", &val))
					    unit->PID_cool->SetP = json_object_get_double(val);
					if (unit->mode == UNITMODE_FRIDGE) {
					    unit->fridge_set_lo = unit->PID_heat->SetP;
					    unit->fridge_set_hi = unit->PID_cool->SetP;
					} else {
					    unit->beer_set_lo = unit->PID_heat->SetP;
					    unit->beer_set_hi = unit->PID_cool->SetP;
					}
					unit->mqtt_flag |= MQTT_FLAG_DATA;
					syslog(LOG_NOTICE, "DCMD change fermenter %s: setpoints %.1f %.1f", message_alias, unit->PID_heat->SetP, unit->PID_cool->SetP);
				    }
				}

				if ((json_object_object_get_ex(metric, "heater", &setpoint)) && (unit->mode == UNITMODE_NONE)) {
				    if (json_object_object_get_ex(setpoint, "state", &val)) {
					if (json_object_get_int(val) != unit->heater_state) {
					    unit->heater_state = json_object_get_int(val);
					    if (unit->heater_state)	// Safety
						    unit->cooler_state = 0;
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: heater_state to %d", message_alias, unit->heater_state);
					}
				    }
				}

				if ((json_object_object_get_ex(metric, "cooler", &setpoint)) && (unit->mode == UNITMODE_NONE)) {
				    if (json_object_object_get_ex(setpoint, "state", &val)) {
					if (json_object_get_int(val) != unit->cooler_state) {
					    unit->cooler_state = json_object_get_int(val);
					    if (unit->cooler_state)
					    	unit->heater_state = 0;
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: cooler_state to %d", message_alias, unit->cooler_state);
					}
				    }
				}

				if ((json_object_object_get_ex(metric, "fan", &setpoint)) && (unit->mode == UNITMODE_NONE)) {
				    if (json_object_object_get_ex(setpoint, "state", &val)) {
					if (json_object_get_int(val) != unit->fan_state) {
					    unit->fan_state = json_object_get_int(val);
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: fan_state to %d", message_alias, unit->fan_state);
					}
				    }
				}

				if (json_object_object_get_ex(metric, "light", &setpoint)) {
				    if (json_object_object_get_ex(setpoint, "state", &val)) {
					if (json_object_get_int(val) > 0) {
					    unit->light_timer = 300;	// 5 minutes
					    syslog(LOG_NOTICE, "DCMD set fermenter %s: light_timer 300", message_alias);
					    if (!unit->light_state) {
					    	unit->light_state = 1;
					    	unit->mqtt_flag |= MQTT_FLAG_DATA;
					    	syslog(LOG_NOTICE, "DCMD change fermenter %s: light_state to %d", message_alias, unit->light_state);
					    }
					}
				    }
				}

				if ((json_object_object_get_ex(metric, "product", &setpoint)) && (unit->mode == UNITMODE_OFF)) {
				    if (json_object_object_get_ex(setpoint, "code", &val)) {
					if (strcmp((char *)json_object_get_string(val), unit->product_code)) {
					    free(unit->product_code);
					    unit->product_code = xstrcpy((char *)json_object_get_string(val));
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: product_code to `%s'", message_alias, unit->product_code);
					}
				    }
				    if (json_object_object_get_ex(setpoint, "name", &val)) {
					if (strcmp((char *)json_object_get_string(val), unit->product_name)) {
					    free(unit->product_name);
					    unit->product_name = xstrcpy((char *)json_object_get_string(val));
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: product_name to `%s'", message_alias, unit->product_name);
					}
				    }
				    if (json_object_object_get_ex(setpoint, "uuid", &val)) {
					if (strcmp((char *)json_object_get_string(val), unit->product_uuid)) {
					    free(unit->product_uuid);
					    unit->product_uuid = xstrcpy((char *)json_object_get_string(val));
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: product_uuid to `%s'", message_alias, unit->product_uuid);
					}
				    }
				    if (json_object_object_get_ex(setpoint, "yeast_lo", &val)) {
					unit->yeast_lo = json_object_get_double(val);
					unit->mqtt_flag |= MQTT_FLAG_DATA;
					syslog(LOG_NOTICE, "DCMD change fermenter %s: yeast_lo to `%.1f'", message_alias, unit->yeast_lo);
				    }
				    if (json_object_object_get_ex(setpoint, "yeast_hi", &val)) {
					unit->yeast_hi = json_object_get_double(val);
					unit->mqtt_flag |= MQTT_FLAG_DATA;
					syslog(LOG_NOTICE, "DCMD change fermenter %s: yeast_hi to `%.1f'", message_alias, unit->yeast_hi);
				    }
				}

				if (json_object_object_get_ex(metric, "profile", &profile)) {
				    if (json_object_object_get_ex(profile, "command", &profile1)) {
					syslog(LOG_NOTICE, "profile command");
					if (unit->mode == UNITMODE_PROFILE) {
					    char *cmd = xstrcpy((char *)json_object_get_string(profile1));
					    if (! strcmp(cmd, (char *)"off")) {
						if (unit->prof_state == PROFILE_DONE) {
						    unit->prof_state = PROFILE_OFF;
						    syslog(LOG_NOTICE, "DCMD change fermenter `%s' profile to OFF", message_alias);
						    unit->mqtt_flag |= MQTT_FLAG_DATA;
						}
					    } else if (! strcmp(cmd, (char *)"pause")) {
					    	if (unit->prof_state == PROFILE_RUN) {
						    unit->prof_state = PROFILE_PAUSE;
						    syslog(LOG_NOTICE, "DCMD change fermenter `%s' profile to PAUSE", message_alias);
						    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    	} else if (unit->prof_state == PROFILE_PAUSE) {
						    unit->prof_state = PROFILE_RUN;
						    syslog(LOG_NOTICE, "DCMD change fermenter `%s' profile resume RUN", message_alias);
						    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    	}
				    	    } else if (! strcmp(cmd, (char *)"start")) {
						if (unit->prof_state == PROFILE_OFF) {
						    unit->prof_state = PROFILE_RUN;
						    unit->prof_started = time(NULL);
						    unit->prof_paused = unit->prof_primary_done = 0;
						    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
						    syslog(LOG_NOTICE, "DCMD change fermenter `%s' profile start RUN", message_alias);
						    unit->mqtt_flag |= MQTT_FLAG_DATA;
						    if (! unit->event_msg)
							unit->event_msg = xstrcpy((char *)"Profile start");
						}
				            } else if (! strcmp(cmd, (char *)"abort")) {
						if ((unit->prof_state == PROFILE_RUN) || (unit->prof_state == PROFILE_PAUSE)) {
						    unit->prof_state = PROFILE_OFF;
						    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
						    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
						    syslog(LOG_NOTICE, "DCMD change fermenter `%s' profile ABORT", message_alias);
						    unit->mqtt_flag |= MQTT_FLAG_DATA;
						    if (! unit->event_msg)
							unit->event_msg = xstrcpy((char *)"Profile abort");
						}
					    } else if (! strcmp(cmd, (char *)"done")) {
						if (unit->prof_state == PROFILE_DONE) {
						    unit->prof_state = PROFILE_OFF;
						    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
						    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
						    syslog(LOG_NOTICE, "DCMD change fermenter `%s' profile OFF", message_alias);
						    unit->mqtt_flag |= MQTT_FLAG_DATA;
						}
					    }
					    free(cmd);
					    cmd = NULL;
					}
				    } else if (json_object_object_get_ex(profile, "uuid", &profile1)) {
					if ((unit->prof_state == PROFILE_OFF) || (unit->prof_state == PROFILE_DONE) || (unit->prof_state == PROFILE_ABORT)) {
					    if (unit->profile_uuid)
						free(unit->profile_uuid);
					    if (unit->profile_name)
						free(unit->profile_name);
					    if (unit->profile_steps) {
						for (step = unit->profile_steps; step; step = oldstep) {
						    if (step->name)
							free(step->name);
						    oldstep = step->next;
						    free(step);
						}
					    }
					    unit->profile_steps = NULL;
					    unit->profile_duration = unit->profile_totalsteps = 0;
					    unit->profile_uuid = xstrcpy((char *)json_object_get_string(profile1));
					    if (json_object_object_get_ex(profile, "name", &val)) {
						unit->profile_name = xstrcpy((char *)json_object_get_string(val));
					    }
					    if (json_object_object_get_ex(profile, "inittemp", &setpoint)) {
						if (json_object_object_get_ex(setpoint, "low", &val)) {
						    unit->profile_inittemp_lo = json_object_get_double(val);
						}
						if (json_object_object_get_ex(setpoint, "high", &val)) {
						    unit->profile_inittemp_hi = json_object_get_double(val);
						}
					    }
					    if (json_object_object_get_ex(profile, "fridgemode", &val)) {
						unit->profile_fridge_mode = json_object_get_int(val);
						if (unit->profile_fridge_mode)
						    unit->profile_fridge_mode = 100;
					    }
					    if (json_object_object_get_ex(profile, "steps", &steps)) {
						int arraylen = json_object_array_length(steps);
						syslog(LOG_NOTICE, "profile new profile: start %d steps", arraylen);
						for (int i = 0; i < arraylen; i++) {
						    /*
						     * Parse the array of steps
						     */
						    step1 = json_object_array_get_idx(steps, i);
						    unit->profile_totalsteps++;

						    step = (prof_step *)malloc(sizeof(prof_step));
						    step->next = NULL;
						    step->name = NULL;
						    step->steptime = step->resttime = step->fridge_mode = 0;
						    step->target_lo = step->target_hi = 20.0;

						    if (json_object_object_get_ex(step1, "name", &val)) {
							step->name = xstrcpy((char *)json_object_get_string(val));
						    }
						    if (json_object_object_get_ex(step1, "steptime", &val)) {
							step->steptime = json_object_get_int(val);
							unit->profile_duration += step->steptime;
						    }
						    if (json_object_object_get_ex(step1, "resttime", &val)) {
							step->resttime = json_object_get_int(val);
							unit->profile_duration += step->resttime;
						    }
						    if (json_object_object_get_ex(step1, "fridgemode", &val)) {
							step->fridge_mode = json_object_get_int(val);
							if (step->fridge_mode)
							    step->fridge_mode = 100;
						    }
						    if (json_object_object_get_ex(step1, "target_lo", &val)) {
							step->target_lo = json_object_get_double(val);
						    }
						    if (json_object_object_get_ex(step1, "target_hi", &val)) {
							step->target_hi = json_object_get_double(val);
						    }

						    syslog(LOG_NOTICE, "profile new profile: add step %d", unit->profile_totalsteps);
						    if (unit->profile_steps == NULL) {
							unit->profile_steps = step;
						    } else {
							for (oldstep = unit->profile_steps; oldstep; oldstep = oldstep->next) {
							    if (oldstep->next == NULL) {
								oldstep->next = step;
								break;
							    }
							}
						    }
					    	}
					    }
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: install profile `%s'", message_alias, unit->profile_name);
					    wrconfig();
					}
				    } else {
					if ((unit->prof_state == PROFILE_OFF) || (unit->prof_state == PROFILE_DONE) || (unit->prof_state == PROFILE_ABORT)) {
					    syslog(LOG_NOTICE, "DCMD change fermenter %s: delete profile `%s'", message_alias, unit->profile_name);
					    if (unit->profile_uuid)
						free(unit->profile_uuid);
					    if (unit->profile_name)
						free(unit->profile_name);
					    unit->profile_uuid = unit->profile_name = NULL;
					    if (unit->profile_steps) {
						for (step = unit->profile_steps; step; step = oldstep) {
						    if (step->name)
							free(step->name);
						    oldstep = step->next;
						    free(step);
						}
					    }
					    unit->profile_steps = NULL;
					    unit->profile_inittemp_lo = unit->profile_inittemp_hi = 20.0;
					    unit->prof_percent = unit->profile_fridge_mode = 0;
					    unit->prof_state = PROFILE_OFF;
					    unit->profile_duration = unit->profile_totalsteps = 0;
					    unit->prof_started = unit->prof_paused = unit->prof_primary_done = (time_t)0;
					    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
					    unit->mqtt_flag |= MQTT_FLAG_DATA;
					    wrconfig();
					}
				    }
				}
			    }

			    if (unit->mqtt_flag) {
			    	if (unit->mqtt_flag & MQTT_FLAG_BIRTH) {
				    publishDBirth(unit);
			    	} else {
				    publishDData(unit);
			    	}
				if (unit->mqtt_flag & MQTT_FLAG_DEATH) {
				    publishDDeath(unit);
			    	}
			    }
			}
			return;
		    }
		    syslog(LOG_NOTICE, "MQTT: %s payload not understood\n", (char *)message->payload);
		    return;
		}
	    } else {
		syslog(LOG_NOTICE, "MQTT: got payload with timestamp %d seconds error", timediff);
		return;
	    }
	}

	syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
    } else {
	syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
    }
}



void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) {
    // publish the data
    if (payload)
        mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
    else
	mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
}



void pub_domoticz_temp(int idx, int value) {
    char	*dload = NULL;
    char	sidx[10], sval[20];

    if (idx == 0)
	return;

    sprintf(sidx, "%d", idx);
    sprintf(sval, "%.3f", value / 1000.0);

    dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
    dload = xstrcat(dload, sidx);
    dload = xstrcat(dload, (char *)",\"nvalue\":0,\"svalue\":\"");
    dload = xstrcat(dload, sval);
    dload = xstrcat(dload, (char *)"\"}");
    publisher(mosq, (char *)"domoticz/in", dload, false);
    free(dload);
    dload = NULL;
}



void pub_domoticz_output(int idx, int value) {
    char        *dload = NULL;
    char        sidx[10], sval[10];

    if (idx == 0)
	return;

    sprintf(sidx, "%d", idx);
    sprintf(sval, "%d", value);

    dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
    dload = xstrcat(dload, sidx);
    dload = xstrcat(dload, (char *)",\"nvalue\":");
    if (value >= 50)
	dload = xstrcat(dload, (char *)"1");
    else
	dload = xstrcat(dload, (char *)"0");
    dload = xstrcat(dload, (char *)",\"svalue\":\"");
    dload = xstrcat(dload, sval);
    dload = xstrcat(dload, (char *)"\"}");
    publisher(mosq, (char *)"domoticz/in", dload, false);
    free(dload);
    dload = NULL;
}



char *unit_data(units_list *unit, bool birth)
{
    char		*payload = NULL;
    char		buf[128];
    bool		comma = false;
    prof_step           *pstep;

    payload = xstrcpy((char *)"{");

    /*
     * Fixed unit values, never change these!
     */
    if (birth) {
    	payload = xstrcat(payload, (char *)"\"uuid\":\"");
    	payload = xstrcat(payload, unit->uuid);
    	payload = xstrcat(payload, (char *)"\",\"alias\":\"");
    	payload = xstrcat(payload, unit->alias);
	payload = xstrcat(payload, (char *)"\",");
    }

    /*
     * Product (beer) loaded information.
     */
    if ((unit->product_name && strlen(unit->product_name)) || 
	(unit->product_code && strlen(unit->product_code)) || 
	(unit->product_uuid && strlen(unit->product_uuid))) { 
	comma = false;
    	payload = xstrcat(payload, (char *)"\"product\":{");
	if (unit->product_uuid && strlen(unit->product_uuid) && strcmp((char *)"(null)", unit->product_uuid)) {
	    payload = xstrcat(payload, (char *)"\"uuid\":\"");
	    payload = xstrcat(payload, unit->product_uuid);
	    payload = xstrcat(payload, (char *)"\"");
	    comma = true;
	}
	if (unit->product_code && strlen(unit->product_code)) {
	    if (comma)
		payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, (char *)"\"code\":\"");
	    payload = xstrcat(payload, unit->product_code);
	    payload = xstrcat(payload, (char *)"\"");
	    comma = true;
	}
	if (unit->product_name && strlen(unit->product_name)) {
	    if (comma)
		payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, (char *)"\"name\":\"");
	    payload = xstrcat(payload, unit->product_name);
	    payload = xstrcat(payload, (char *)"\"");
	}
	sprintf(buf, "%.1f", unit->yeast_lo);
	payload = xstrcat(payload, (char *)",\"yeast_lo\":");
	payload = xstrcat(payload, buf);
	sprintf(buf, "%.1f", unit->yeast_hi);
	payload = xstrcat(payload, (char *)",\"yeast_hi\":");
	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)"}");
    }

    /*
     * Air temperature sensor
     */
    if (unit->air_address) {
	payload = xstrcat(payload, (char *)",\"air\":{\"address\":\"");
	payload = xstrcat(payload, unit->air_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]);
        payload = xstrcat(payload, (char *)"\",\"temperature\":");
        sprintf(buf, "%.3f", unit->air_temperature / 1000.0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_temp(unit->air_idx, unit->air_temperature);
    } else {
	payload = xstrcat(payload, (char *)",\"air\":null");
    }

    /*
     * Beer temperature sensor
     */
    if (unit->beer_address) {
	payload = xstrcat(payload, (char *)",\"beer\":{\"address\":\"");
	payload = xstrcat(payload, unit->beer_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]);
        payload = xstrcat(payload, (char *)"\",\"temperature\":");
        sprintf(buf, "%.3f", unit->beer_temperature / 1000.0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_temp(unit->beer_idx, unit->beer_temperature);
    } else {
	payload = xstrcat(payload, (char *)",\"beer\":null");
    }

    /*
     * External chiller temperature sensor
     */
    if (unit->chiller_address) {
	payload = xstrcat(payload, (char *)",\"chiller\":{\"address\":\"");
	payload = xstrcat(payload, unit->chiller_address);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)TEMPSTATE[unit->chiller_state]);
	payload = xstrcat(payload, (char *)"\",\"temperature\":");
	sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_temp(unit->chiller_idx, unit->chiller_temperature);
    } else {
	payload = xstrcat(payload, (char *)",\"chiller\":null");
    }

    /*
     * Heater control, power 0..100% and usage count.
     */
    if (unit->heater_address) {
	payload = xstrcat(payload, (char *)",\"heater\":{\"address\":\"");
	payload = xstrcat(payload, unit->heater_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->heater_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->heater_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_output(unit->heater_idx, unit->heater_state);
    } else {
	payload = xstrcat(payload, (char *)",\"heater\":null");
    }
    
    /*
     * Cooler control, power 0..100% and usage counter.
     */
    if (unit->cooler_address) {
	payload = xstrcat(payload, (char *)",\"cooler\":{\"address\":\"");
 	payload = xstrcat(payload, unit->cooler_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->cooler_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->cooler_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_output(unit->cooler_idx, unit->cooler_state);
    } else {
	payload = xstrcat(payload, (char *)",\"cooler\":null");
    }

    /*
     * Fan control, 0..100% and usage counter.
     */
    if (unit->fan_address) {
	payload = xstrcat(payload, (char *)",\"fan\":{\"address\":\"");
	payload = xstrcat(payload, unit->fan_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->fan_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->fan_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_output(unit->fan_idx, unit->fan_state);
    } else {
	payload = xstrcat(payload, (char *)",\"fan\":null");
    }

    /*
     * Interior lights control, 0..100% and usage counter.
     */
    if (unit->light_address) {
	payload = xstrcat(payload, (char *)",\"light\":{\"address\":\"");
	payload = xstrcat(payload, unit->light_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->light_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"usage\":");
	sprintf(buf, "%d", unit->light_usage);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_output(unit->light_idx, unit->light_state);
    } else {
	payload = xstrcat(payload, (char *)",\"light\":null");
    }

    /*
     * Door sensor.
     */
    if (unit->door_address) {
	payload = xstrcat(payload, (char *)",\"door\":{\"address\":\"");
	payload = xstrcat(payload, unit->door_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", (unit->door_state) ? 0:1);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_output(unit->door_idx, unit->door_state);
    } else {
	payload = xstrcat(payload, (char *)",\"door\":null");
    }

    /*
     * PSU status
     */
    if (unit->psu_address) {
	payload = xstrcat(payload, (char *)",\"psu\":{\"address\":\"");
	payload = xstrcat(payload, unit->psu_address);
	payload = xstrcat(payload, (char *)"\",\"state\":");
	sprintf(buf, "%d", unit->psu_state);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	pub_domoticz_output(unit->psu_idx, unit->psu_state);
    } else {
	payload = xstrcat(payload, (char *)",\"psu\":null");
    }

    /*
     * Working mode and setpoints
     */
    payload = xstrcat(payload, (char *)",\"stage\":\"");
    payload = xstrcat(payload, (char *)UNITSTAGE[unit->stage]);
    payload = xstrcat(payload, (char *)"\",\"mode\":\"");
    payload = xstrcat(payload, (char *)UNITMODE[unit->mode]);
    payload = xstrcat(payload, (char *)"\",\"setpoint\":{\"low\":");
    sprintf(buf, "%.1f", unit->PID_heat->SetP);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"high\":");
    sprintf(buf, "%.1f", unit->PID_cool->SetP);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"alarm\":");
    sprintf(buf, "%d", unit->alarm_flag);
    payload = xstrcat(payload, buf);

    /*
     * Loaded profile and state
     */
    if (unit->profile_uuid) {
	payload = xstrcat(payload, (char *)",\"profile\":{\"uuid\":\"");
	payload = xstrcat(payload, unit->profile_uuid);
	payload = xstrcat(payload, (char *)"\",\"name\":\"");
	payload = xstrcat(payload, unit->profile_name);
	payload = xstrcat(payload, (char *)"\",\"state\":\"");
	payload = xstrcat(payload, (char *)PROFSTATE[unit->prof_state]);
	payload = xstrcat(payload, (char *)"\",\"percent\":");
	sprintf(buf, "%d", unit->prof_percent);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"inittemp\":{\"low\":");
	sprintf(buf, "%.1f", unit->profile_inittemp_lo);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"high\":");
	sprintf(buf, "%.1f", unit->profile_inittemp_hi);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"},\"fridgemode\":");
	sprintf(buf, "%d", unit->profile_fridge_mode);
	payload = xstrcat(payload, buf);
	comma = false;
	if (unit->profile_steps) {
	    payload = xstrcat(payload, (char *)",\"steps\":[");
	    for (pstep = unit->profile_steps; pstep; pstep = pstep->next) {
		if (comma)
		    payload = xstrcat(payload, (char *)",");
		payload = xstrcat(payload, (char *)"{\"resttime\":");
		sprintf(buf, "%d", pstep->resttime);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"steptime\":");
		sprintf(buf, "%d", pstep->steptime);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"target\":{\"low\":");
		sprintf(buf, "%.1f", pstep->target_lo);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"high\":");
		sprintf(buf, "%.1f", pstep->target_hi);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)"},\"fridgemode\":");
		sprintf(buf, "%d", pstep->fridge_mode);
		payload = xstrcat(payload, buf);
		if (pstep->name) {
		    payload = xstrcat(payload, (char *)",\"name\":\"");
		    payload = xstrcat(payload, pstep->name);
		    payload = xstrcat(payload, (char *)"\"");
		}
		payload = xstrcat(payload, (char *)"}");
		comma = true;
	    }
	    payload = xstrcat(payload, (char *)"]");
	} else {
	    payload = xstrcat(payload, (char *)",\"steps\":null");
	}
	payload = xstrcat(payload, (char *)"}");
    } else {
	payload = xstrcat(payload, (char *)",\"profile\":null");
    }
    payload = xstrcat(payload, (char *)"}");

    return payload;
}



/**
 * @brief Publish DBIRTH for all active units. If there are no active units, don't
 *        publish anything. This function should be called at program start.
 */
void publishDBirthAll(void)
{
    char	*topic = NULL, *payload = NULL, *payloadu = NULL;
    units_list	*unit;
    int		comma = FALSE;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"units\":[");
    for (unit = Config.units; unit; unit = unit->next) {
//	if (unit->mode != UNITMODE_OFF) {
	    if (comma)
	    	payload = xstrcat(payload, (char *)",");
	    payloadu = unit_data(unit, true);
	    payload = xstrcat(payload, payloadu);
	    comma = TRUE;
	    free(payloadu);
	    payloadu = NULL;
//	}
    }
    if (comma) {	// Only publish if there is at least one unit active.
    	payload = xstrcat(payload, (char *)"]}}");
	topic = topic_base((char *)"DBIRTH");
    	publisher(mosq, topic, payload, true);
	free(topic);
	topic = NULL;
    }
    free(payload);
    payload = NULL;
}



void publishDData(units_list *unit)
{
    char	*payload = NULL, *payloadu = NULL, *topic = NULL;

    if (mqtt_use) {
	payload = payload_header();
	payloadu = unit_data(unit, false);
	payload = xstrcat(payload, payloadu);
	payload = xstrcat(payload, (char *)"}");
	topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, false);
	free(payload);
	payload = NULL;
	free(payloadu);
	payloadu = NULL;
	free(topic);
	topic = NULL;
    }
}



void publishDBirth(units_list *unit)
{
    char        *payload = NULL, *topic = NULL;

    if (mqtt_use) {
	payload = payload_header();
	payload = xstrcat(payload, unit_data(unit, true));
	payload = xstrcat(payload, (char *)"}");
	topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, true);
	free(payload);
	payload = NULL;
	free(topic);
	topic = NULL;
    }
}



void publishDDeath(units_list *unit)
{
    char        *topic = NULL;

    if (mqtt_use) {
	// First delete presistent DBIRTH topic
	topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
	topic = xstrcat(topic_base((char *)"DDEATH"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = NULL;
    }
}



void publishDLog(units_list *unit)
{
    char        buf[32], *payload = NULL, *topic = NULL;
    bool	comma = false;

    if (mqtt_use) {
	payload = payload_header();
	payload = xstrcat(payload, (char *)"{");

	if ((unit->product_name && strlen(unit->product_name)) || 
	    (unit->product_code && strlen(unit->product_code)) || 
	    (unit->product_uuid && strlen(unit->product_uuid))) {
	    comma = false;
	    payload = xstrcat(payload, (char *)"\"product\":{");
	    if (unit->product_uuid && strlen(unit->product_uuid) && strcmp((char *)"(null)", unit->product_uuid)) {
		payload = xstrcat(payload, (char *)"\"uuid\":\"");
		payload = xstrcat(payload, unit->product_uuid);
		payload = xstrcat(payload, (char *)"\"");
		comma = true;
	    }
	    if (unit->product_code && strlen(unit->product_code)) {
		if (comma)
		    payload = xstrcat(payload, (char *)",");
		payload = xstrcat(payload, (char *)"\"code\":\"");
		payload = xstrcat(payload, unit->product_code);
		payload = xstrcat(payload, (char *)"\"");
		comma = true;
	    }
	    if (unit->product_name && strlen(unit->product_name)) {
		if (comma)
		    payload = xstrcat(payload, (char *)",");
		payload = xstrcat(payload, (char *)"\"name\":\"");
		payload = xstrcat(payload, unit->product_name);
		payload = xstrcat(payload, (char *)"\"");
	    }
	    payload = xstrcat(payload, (char *)"}");
	    comma = true;
	}

	if (comma)
	    payload = xstrcat(payload, (char *)",");
	payload = xstrcat(payload, (char *)"\"stage\":\"");
	payload = xstrcat(payload, (char *)UNITSTAGE[unit->stage]);
	payload = xstrcat(payload, (char *)"\",\"mode\":\"");
	payload = xstrcat(payload, (char *)UNITMODE[unit->mode]);
	payload = xstrcat(payload, (char *)"\",\"temperature\":{");
	comma = false;
	if (unit->air_address) {
	    payload = xstrcat(payload, (char *)"\"air\":");
	    sprintf(buf, "%.3f", unit->air_temperature / 1000.0);
	    payload = xstrcat(payload, buf);
	    comma = true;
	}
	if (unit->beer_address) {
	    if (comma)
		payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, (char *)"\"beer\":");
	    sprintf(buf, "%.3f", unit->beer_temperature / 1000.0);
	    payload = xstrcat(payload, buf);
	    comma = true;
	}
	if (unit->chiller_address) {
	    if (comma)
		payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, (char *)"\"chiller\":");
	    sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0);
	    payload = xstrcat(payload, buf);
	    comma = true;
	}
	if (Config.temp_address) {
	    if (comma)
		payload = xstrcat(payload, (char *)",");
	    payload = xstrcat(payload, (char *)"\"room\":");
	    sprintf(buf, "%.1f", Config.temp_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	payload = xstrcat(payload, (char *)"},\"setpoint\":{\"low\":");
	sprintf(buf, "%.1f", unit->PID_heat->SetP);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"high\":");
	sprintf(buf, "%.1f", unit->PID_cool->SetP);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"}");
	if (unit->heater_address) {
	    payload = xstrcat(payload, (char *)",\"heater\":{\"power\":");
	    sprintf(buf, "%d", unit->heater_state);
	    payload = xstrcat(payload, buf);
	    payload = xstrcat(payload, (char *)",\"usage\":");
	    sprintf(buf, "%d", unit->heater_usage);
	    payload = xstrcat(payload, buf);
	    payload = xstrcat(payload, (char *)"}");
	}
	if (unit->cooler_address) {
	    payload = xstrcat(payload, (char *)",\"cooler\":{\"power\":");
	    sprintf(buf, "%d", unit->cooler_state);
	    payload = xstrcat(payload, buf);
	    payload = xstrcat(payload, (char *)",\"usage\":");
	    sprintf(buf, "%d", unit->cooler_usage);
	    payload = xstrcat(payload, buf);
	    payload = xstrcat(payload, (char *)"}");
	}
	if (unit->fan_address) {
	    payload = xstrcat(payload, (char *)",\"fan\":{\"power\":");
	    sprintf(buf, "%d", unit->fan_state);
	    payload = xstrcat(payload, buf);
	    payload = xstrcat(payload, (char *)",\"usage\":");
	    sprintf(buf, "%d", unit->fan_usage);
	    payload = xstrcat(payload, buf);
	    payload = xstrcat(payload, (char *)"}");
	}
	// sg
	if (unit->event_msg) {
	    payload = xstrcat(payload, (char *)",\"event\":\"");
	    payload = xstrcat(payload, unit->event_msg);
	    payload = xstrcat(payload, (char *)"\"");
	}
	payload = xstrcat(payload, (char *)",\"fermenter_uuid\":\"");
	payload = xstrcat(payload, unit->uuid);
	payload = xstrcat(payload, (char *)"\"}}");

	topic = xstrcat(topic_base((char *)"DLOG"), (char *)"/");
	topic = xstrcat(topic, unit->alias);
	publisher(mosq, topic, payload, false);
	free(payload);
	payload = NULL;
	free(topic);
	topic = NULL;
    }
}



void publishNData(bool birth, int flag)
{
    char		*topic = NULL, *payload = NULL, sidx[10], buf[64];
    struct utsname	ubuf;
    bool		comma = false;
    int			model, rev, mem, maker, warranty;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{");

    if (birth) {
	payload = xstrcat(payload, (char *)"\"uuid\":\"");
	payload = xstrcat(payload, Config.uuid);
	payload = xstrcat(payload, (char *)"\",");
#ifdef HAVE_WIRINGPI_H
	/*
	 * Get the info from the WiringPi libary
	 */
	piBoardId (&model, &rev, &mem, &maker, &warranty);
	payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Raspberry Pi ");
	payload = xstrcat(payload, (char *)piMakerNames[maker]);
	payload = xstrcat(payload, (char *)"\",\"hardwaremodel\":\"");
	payload = xstrcat(payload, (char *)piModelNames[model]);
	payload = xstrcat(payload, (char *)" rev ");
	payload = xstrcat(payload, (char *)piRevisionNames[rev]);
	payload = xstrcat(payload, (char *)"\"");
#else
	/*
	 * Get the info from the internal function
	 */
	piBoardId (&model, &rev, &mem, &maker, &warranty);
	if (model != -1) {
	    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Raspberry Pi ");
            payload = xstrcat(payload, (char *)piMakerNames[maker]);
            payload = xstrcat(payload, (char *)"\",\"hardwaremodel\":\"");
            payload = xstrcat(payload, (char *)piModelNames[model]);
            payload = xstrcat(payload, (char *)" rev ");
            payload = xstrcat(payload, (char *)piRevisionNames[rev]);
            payload = xstrcat(payload, (char *)"\"");
	} else if (uname(&ubuf) == 0) {
	    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"");
	    payload = xstrcat(payload, ubuf.machine);
	    payload = xstrcat(payload, (char *)"\",\"hardwaremodel\":\"Unknown\"");
	} else {
    	    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
	}
#endif
	if (uname(&ubuf) == 0) {
            payload = xstrcat(payload, (char *)",\"os\":\"");
	    payload = xstrcat(payload, ubuf.sysname);
	    payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
	    payload = xstrcat(payload, ubuf.release);
	    payload = xstrcat(payload, (char *)"\"");
    	} else {
	    payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
    	}

    	payload = xstrcat(payload, (char *)",\"FW\":\"");
    	payload = xstrcat(payload, (char *)VERSION);
    	payload = xstrcat(payload, (char *)"\"}");
	comma = true;
    }

    if (Config.temp_address || Config.hum_address) {
	if (comma)
	    payload = xstrcat(payload, (char *)",");
	payload = xstrcat(payload, (char *)"\"THB\":{");
	if (Config.temp_address) {
	    payload = xstrcat(payload, (char *)"\"temperature\":");
	    sprintf(buf, "%.1f", Config.temp_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	if (Config.temp_address && Config.hum_address)
	    payload = xstrcat(payload, (char *)",");
	if (Config.hum_address) {
	    payload = xstrcat(payload, (char *)"\"humidity\":");
	    sprintf(buf, "%.1f", Config.hum_value / 1000.0);
	    payload = xstrcat(payload, buf);
	}
	payload = xstrcat(payload, (char *)"}");
    }

    /*
     * Find our network information
     */
    FILE		*f;
    char		line[100], *ifname, *c, ip[NI_MAXHOST];
    struct ifaddrs	*ifaddr, *ifa;
    int			family, s;

    if (birth && (f = fopen("/proc/net/route" , "r"))) {
	while (fgets(line, 100, f)) {
	    ifname = strtok(line , " \t");
	    c = strtok(NULL , " \t");

	    // Take the entry with destination '00000000'
	    if (ifname != NULL && c != NULL && (strcmp(c , "00000000") == 0)) {

		if (getifaddrs(&ifaddr) == -1) {
		    syslog(LOG_NOTICE, "error getifaddrs error %d", errno);
		    goto neterr;
		}

		//Walk through linked list, maintaining head pointer so we can free list later
		for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
		    if (ifa->ifa_addr == NULL) {
			continue;
		    }

		    family = ifa->ifa_addr->sa_family;

		    if ((strcmp( ifa->ifa_name, ifname) == 0) && (family == AF_INET)) {
			s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), ip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
			if (s != 0) {
			    syslog(LOG_NOTICE, "getnameinfo() error=%d for %s", errno, ifname);
			    goto neterr;
			}
			payload = xstrcat(payload, (char *)",\"net\":{\"address\":\"");
			payload = xstrcat(payload, ip);
			payload = xstrcat(payload, (char *)"\",\"ifname\":\"");
			payload = xstrcat(payload, ifname);
			payload = xstrcat(payload, (char *)"\",\"rssi\":0}");
			// TODO: get rssi if wlan interface.
		    }
		}

		freeifaddrs(ifaddr);
	    }
	}
	fclose(f);
    }
neterr:

    payload = xstrcat(payload, (char *)"}}");
    if (birth) {
	topic = topic_base((char *)"NBIRTH");
    	publisher(mosq, topic, payload, true);
    } else {
	topic = topic_base((char *)"NDATA");
	publisher(mosq, topic, payload, false);
    }
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;

    if ((Config.temp_address || Config.hum_address) && Config.temp_hum_idx) {
	sprintf(sidx, "%d", Config.temp_hum_idx);
	sprintf(buf, "%.1f;%.1f;0", Config.temp_value / 1000.0, Config.hum_value / 1000.0);

	payload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":");
	payload = xstrcat(payload, sidx);
	payload = xstrcat(payload, (char *)",\"nvalue\":0,\"svalue\":\"");
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)"\"}");
	publisher(mosq, (char *)"domoticz/in", payload, false);
	free(payload);
	payload = NULL;
    }
}



void mqtt_connect(void)
{
    char	*id = NULL, *topic;
    char	err[1024];
    int		rc;

    /*
     * Initialize mosquitto communication
     */
    gethostname(my_hostname, 255);
    mosquitto_lib_init();
    id = xstrcpy((char *)"thermferm/");
    id = xstrcat(id, my_hostname);
    if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
       /*
        * Enforce maximum client id length of 23 characters
        */
       id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
    }

    mosq = mosquitto_new(id, TRUE, NULL);
    if (!mosq) {
       switch(errno) {
           case ENOMEM:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
               break;
           case EINVAL:
               syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
               break;
	   default:
		syslog(LOG_NOTICE, "MQTT: mosquitto_new: Unknown error %d", errno);
		break;
       }
       mosquitto_lib_cleanup();
       return;
    }
    free(id);
    id = NULL;

    /*
     * Set our will
     */
    topic = topic_base((char *)"NDEATH");
    if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) {
	if (rc > MOSQ_ERR_SUCCESS)
	    syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
        mosquitto_lib_cleanup();
        return;
    }
    free(topic);

    if (debug)
    	mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_max_inflight_messages_set(mosq, max_inflight);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_publish_callback_set(mosq, my_publish_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    if (Config.mqtt_username && Config.mqtt_password) {
	syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set(%s, %s)", Config.mqtt_username, Config.mqtt_password);
	if ((rc = mosquitto_username_pw_set(mosq, Config.mqtt_username, Config.mqtt_password))) {
	    switch(errno) {
           	case ENOMEM:
		    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of memory");
               	    break;
           	case EINVAL:
		    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Invalid id");
		    break;
           	default:
		    syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Unknown error %d", errno);
		    break;
       	    }
       	    mosquitto_lib_cleanup();
       	    return;
	}
    }

    if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
        if (rc == MOSQ_ERR_ERRNO) {
            strerror_r(errno, err, 1024);
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
        } else {
            syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
        }
        mosquitto_lib_cleanup();
	syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
    } else {
        mqtt_use = TRUE;
        syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);

        /*
         * Initialise is complete, report our presence state
         */
        mosquitto_loop_start(mosq);
	publishNData(true, 0);
    }
}



void mqtt_disconnect(void)
{
    int		rc;
    char	*topic;

    if (mqtt_use) {
        /*
         * Final publish 0 to clients/<hostname>/thermferm/state
	 * After that, remove the retained topic.
         */
        syslog(LOG_NOTICE, "MQTT disconnecting");
	topic = topic_base((char *)"DBIRTH");
	publisher(mosq, topic, NULL, true);	// Not always needed, but ...
	free(topic);
	topic = topic_base((char *)"DDEATH");
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = topic_base((char *)"NBIRTH");
	publisher(mosq, topic, NULL, true);
	free(topic);
	topic = topic_base((char *)"NDEATH");
	publisher(mosq, topic, NULL, true);
	free(topic);
        mqtt_last_mid = mqtt_mid_sent;
        mqtt_status = STATUS_WAITING;
	mqtt_my_shutdown = TRUE;

        do {
            if (mqtt_status == STATUS_WAITING) {
                if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
                    mosquitto_disconnect(mosq);
                    mqtt_disconnect_sent = TRUE;
                }
                usleep(100000);
            }
            rc = MOSQ_ERR_SUCCESS;
        } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);

        mosquitto_loop_stop(mosq, FALSE);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
	mqtt_use = FALSE;
	mqtt_status = STATUS_CONNECTING;
	mqtt_mid_sent = 0;
	mqtt_last_mid = -1;
	mqtt_last_mid_sent = -1;
	mqtt_connected = TRUE;
	mqtt_disconnect_sent = FALSE;
	mqtt_connect_lost = FALSE;
	mqtt_my_shutdown = FALSE;
	syslog(LOG_NOTICE, "MQTT disconnected");
    }
}

mercurial