--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/coolers/mosquitto.c Mon May 05 23:33:31 2014 +0200 @@ -0,0 +1,324 @@ +/***************************************************************************** + * Copyright (C) 2014 + * + * Michiel Broek <mbroek at mbse dot eu> + * + * This file is part of the mbsePi-apps + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * mbsePi-apps is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with EC-65K; see the file COPYING. If not, write to the Free + * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + *****************************************************************************/ + +#include "../lib/mbselib.h" +#include "coolers.h" + +#ifdef HAVE_WIRINGPI_H + + +#define STATUS_CONNECTING 0 +#define STATUS_CONNACK_RECVD 1 +#define STATUS_WAITING 2 + +/* Global variables for use in callbacks. */ +struct mosquitto *mymosq = NULL; +char *myhostname; +static int qos = 0; +static int status = STATUS_CONNECTING; +static int mid_sent = 0; +static int last_mid = -1; +static int last_mid_sent = -1; +static bool connected = true; +static bool disconnect_sent = false; +static bool connect_lost = false; + +extern bool shutdown; +extern bool debug; +extern sys_config Config; +extern int lcdHandle; +extern int lcdupdate; + + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result) +{ + if (connect_lost) { + connect_lost = false; + syslog(LOG_NOTICE, "Reconnect: %s", mosquitto_connack_string(result)); + } + + if (!result) { + status = STATUS_CONNACK_RECVD; + } else { + syslog(LOG_NOTICE, "my_connect_callback: %s\n", mosquitto_connack_string(result)); + } +} + + + +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + if (shutdown) { + syslog(LOG_NOTICE, "Acknowledged DISCONNECT from %s", Config.mosq_host); + connected = false; + } else { + /* + * The remove server was brought down. We must keep running + */ + syslog(LOG_NOTICE, "Received DISCONNECT from %s, connection lost", Config.mosq_host); + connect_lost = true; + } +} + + + +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) +{ + last_mid_sent = mid; +} + + + +void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) +{ + syslog(LOG_NOTICE, "MQTT: %s", str); + printf("MQTT: %s\n", str); +} + + + +int my_mosquitto_init(void) +{ + char *id = NULL, *state = NULL; + char buf[1024]; + int rc, keepalive = 60; + unsigned int max_inflight = 20; + char err[1024]; + w1_therm *tmp1, *old1; + rc_switch *tmp2, *old2; + char *alias; + + /* + * Initialize mosquitto communication + */ + mosquitto_lib_init(); + + gethostname(buf, 255); + myhostname = xstrcpy(buf); + + /* + * Build MQTT id + */ + id = xstrcpy((char *)"coolers/"); + id = xstrcat(id, myhostname); + if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { + /* + * Enforce maximum client id length of 23 characters + */ + id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; + } + + mymosq = mosquitto_new(id, true, NULL); + if(!mymosq) { + switch(errno) { + case ENOMEM: + syslog(LOG_NOTICE, "mosquitto_new: Out of memory"); + break; + case EINVAL: + syslog(LOG_NOTICE, "mosquitto_new: Invalid id"); + break; + } + mosquitto_lib_cleanup(); + return 1; + } + + if (debug) { + mosquitto_log_callback_set(mymosq, my_log_callback); + } + + /* + * Set our will + */ + state = xstrcpy((char *)"clients/"); + state = xstrcat(state, myhostname); + state = xstrcat(state, (char *)"/coolers/state"); + sprintf(buf, "0"); + if ((rc = mosquitto_will_set(mymosq, state, strlen(buf), buf, qos, true))) { + if (rc == MOSQ_ERR_INVAL) { + syslog(LOG_NOTICE, "mosquitto_will_set: input parameters invalid"); + } else if (rc == MOSQ_ERR_NOMEM) { + syslog(LOG_NOTICE, "mosquitto_will_set: Out of Memory"); + } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) { + syslog(LOG_NOTICE, "mosquitto_will_set: invalid payload size"); + } + mosquitto_lib_cleanup(); + return rc; + } + + mosquitto_max_inflight_messages_set(mymosq, max_inflight); + mosquitto_connect_callback_set(mymosq, my_connect_callback); + mosquitto_disconnect_callback_set(mymosq, my_disconnect_callback); + mosquitto_publish_callback_set(mymosq, my_publish_callback); + + if ((rc = mosquitto_connect(mymosq, Config.mosq_host, Config.mosq_port, keepalive))) { + if (rc == MOSQ_ERR_ERRNO) { + strerror_r(errno, err, 1024); + syslog(LOG_NOTICE, "mosquitto_connect: error: %s", err); + } else { + syslog(LOG_NOTICE, "mosquitto_connect: unable to connect (%d)", rc); + } + mosquitto_lib_cleanup(); + return rc; + } + syslog(LOG_NOTICE, "Connected with %s:%d", Config.mosq_host, Config.mosq_port); + + /* + * Initialise is complete, report our presence state + */ + mosquitto_loop_start(mymosq); + sprintf(buf, "1"); + rc = mosquitto_publish(mymosq, &mid_sent, state, strlen(buf), buf, qos, 1); + + /* + * Report alias names + */ + for (tmp1 = Config.w1therms; tmp1; tmp1 = old1) { + old1 = tmp1->next; + + alias = xstrcpy((char *)"/raw/"); + alias = xstrcat(alias, myhostname); + alias = xstrcat(alias, (char *)"/coolers/w1/"); + alias = xstrcat(alias, tmp1->master); + alias = xstrcat(alias, (char *)"/"); + alias = xstrcat(alias, tmp1->name); + alias = xstrcat(alias, (char *)"/alias"); + + sprintf(buf, "%s", tmp1->alias); + if ((rc = mosquitto_publish(mymosq, &mid_sent, alias, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "mainloop: error %d from mosquitto_publish", rc); + } + + free(alias); + alias = NULL; + } + + for (tmp2 = Config.rcswitch; tmp2; tmp2 = old2) { + old2 = tmp2->next; + + alias = xstrcpy((char *)"/raw/"); + alias = xstrcat(alias, myhostname); + alias = xstrcat(alias, (char *)"/coolers/rcswitch/"); + alias = xstrcat(alias, tmp2->address); + alias = xstrcat(alias, (char *)"/alias"); + + sprintf(buf, "%s", tmp2->alias); + if ((rc = mosquitto_publish(mymosq, &mid_sent, alias, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "mainloop: error %d from mosquitto_publish", rc); + } + + free(alias); + alias = NULL; + } + + return 0; +} + + + +int my_mosquitto_loop(void) +{ + w1_therm *tmp1, *old1; + char buf[1024], *alias, *state = NULL; + int rc; + + if (status == STATUS_CONNACK_RECVD) { + /* + * Here send our 1-wire sensors values + */ + for (tmp1 = Config.w1therms; tmp1; tmp1 = old1) { + old1 = tmp1->next; + + /* + * Build path and alias topic + */ + alias = xstrcpy((char *)"/raw/"); + alias = xstrcat(alias, myhostname); + alias = xstrcat(alias, (char *)"/coolers/w1/"); + alias = xstrcat(alias, tmp1->master); + alias = xstrcat(alias, (char *)"/"); + alias = xstrcat(alias, tmp1->name); + alias = xstrcat(alias, (char *)"/temperature"); + + if (tmp1->update) { + /* + * Temperature is changed and valid, update and publish this. + */ + sprintf(buf, "%.1f", tmp1->lastval / 1000.0); + if ((rc = mosquitto_publish(mymosq, &mid_sent, alias, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "mainloop: error %d from mosquitto_publish", rc); + } + tmp1->update = FALSE; + } + + free(alias); + alias = NULL; + } + + if (shutdown) { + /* + * Final publish 0 to clients/<hostname>/coolers/state + */ + sprintf(buf, "0"); + mosquitto_publish(mymosq, &mid_sent, state, strlen(buf), buf, qos, true); + last_mid = mid_sent; + status = STATUS_WAITING; + lcdClear(lcdHandle); + lcdPosition(lcdHandle, 0, 0); + lcdPuts(lcdHandle, "Shuting down ..."); + } + + usleep(100000); + + } else if (status == STATUS_WAITING) { + if (debug) + fprintf(stdout, (char *)"Waiting\n"); + if (last_mid_sent == last_mid && disconnect_sent == false) { + mosquitto_disconnect(mymosq); + disconnect_sent = true; + } + usleep(100000); + } + rc = MOSQ_ERR_SUCCESS; + + return (rc == MOSQ_ERR_SUCCESS && connected); +} + + + +void my_mosquitto_exit(void) +{ + mosquitto_loop_stop(mymosq, false); + mosquitto_destroy(mymosq); + mosquitto_lib_cleanup(); +} + + +#endif