Mon, 02 May 2016 16:15:37 +0200
Created mqtt sourcefiles. Use flags to trigger publish messages. The main source does not know and does not care if MQTT messages will be sent. Version 0.5.5
/***************************************************************************** * Copyright (C) 2016 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the mbsePi-apps * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * mbsePi-apps is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *****************************************************************************/ #include "thermferm.h" #include "xutil.h" #include "mqtt.h" extern sys_config Config; extern int debug; #ifdef HAVE_MOSQUITTO_H /* Global variables for use in callbacks. */ int mqtt_qos = 0; int mqtt_status = STATUS_CONNECTING; int mqtt_mid_sent = 0; int mqtt_last_mid = -1; int mqtt_last_mid_sent = -1; int mqtt_connected = TRUE; int mqtt_disconnect_sent = FALSE; int mqtt_connect_lost = FALSE; int mqtt_my_shutdown = FALSE; int mqtt_use = FALSE; int keepalive = 60; unsigned int max_inflight = 20; struct mosquitto *mosq = NULL; char *state = NULL; char my_hostname[256]; void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { if (mqtt_connect_lost) { mqtt_connect_lost = FALSE; syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { mqtt_status = STATUS_CONNACK_RECVD; } else { syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) { if (mqtt_my_shutdown) { syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mosq_host); mqtt_connected = FALSE; } else { /* * The remote server was brought down. We must keep running */ syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mosq_host); mqtt_connect_lost = TRUE; } } void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) { mqtt_last_mid_sent = mid; } void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); if (debug) fprintf(stdout, "MQTT: %s\n", str); } #endif void mqtt_connect(void) { #ifdef HAVE_MOSQUITTO_H char *id = NULL; char err[1024]; int rc; /* * Initialize mosquitto communication */ gethostname(my_hostname, 255); mosquitto_lib_init(); id = xstrcpy((char *)"thermferm/"); id = xstrcat(id, my_hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { /* * Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } mosq = mosquitto_new(id, TRUE, NULL); if (!mosq) { switch(errno) { case ENOMEM: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); break; case EINVAL: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); break; } mosquitto_lib_cleanup(); return; } if (debug) { mosquitto_log_callback_set(mosq, my_log_callback); } /* * Set our will */ state = xstrcpy((char *)"clients/"); state = xstrcat(state, my_hostname); state = xstrcat(state, (char *)"/thermferm/state"); if ((rc = mosquitto_will_set(mosq, state, 1, (char *)"0", mqtt_qos, TRUE))) { if (rc == MOSQ_ERR_INVAL) { syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: input parameters invalid"); } else if (rc == MOSQ_ERR_NOMEM) { syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: Out of Memory"); } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) { syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: invalid payload size"); } mosquitto_lib_cleanup(); return; } mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); if ((rc = mosquitto_connect(mosq, Config.mosq_host, Config.mosq_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { strerror_r(errno, err, 1024); syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); } else { syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); } mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); } else { mqtt_use = TRUE; syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mosq_host, Config.mosq_port); /* * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); mosquitto_publish(mosq, &mqtt_mid_sent, state, 1, (char *)"1", mqtt_qos, 1); } #endif } void mqtt_disconnect(void) { #ifdef HAVE_MOSQUITTO_H int rc; char buf[128]; if (mqtt_use) { /* * Final publish 0 to clients/<hostname>/thermferm/state */ syslog(LOG_NOTICE, "MQTT disconnecting"); sprintf(buf, "0"); mosquitto_publish(mosq, &mqtt_mid_sent, state, strlen(buf), buf, mqtt_qos, true); mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = TRUE; do { if (mqtt_status == STATUS_WAITING) { if (debug) fprintf(stdout, (char *)"Waiting\n"); if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { mosquitto_disconnect(mosq); mqtt_disconnect_sent = TRUE; } usleep(100000); } rc = MOSQ_ERR_SUCCESS; } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); mosquitto_loop_stop(mosq, FALSE); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT disconnected"); } #endif } void mqtt_publish_int(char *uuid, char *tail, int value) { #ifdef HAVE_MOSQUITTO_H char topic[1024], buf[128]; if (mqtt_use) { snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail); snprintf(buf, 127, "%d", value); mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1); } #endif } void mqtt_publish_float(char *uuid, char *tail, float value, int decimals) { #ifdef HAVE_MOSQUITTO_H char topic[1024], buf[128]; if (mqtt_use) { snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail); snprintf(buf, 127, "%.*f", decimals, value); mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1); } #endif } void mqtt_publish_str(char *uuid, char *tail, char *value) { #ifdef HAVE_MOSQUITTO_H char topic[1024], buf[128]; if (mqtt_use) { snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail); snprintf(buf, 127, "%s", value); mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1); } #endif }