--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thermferm/mosquitto.c Sun May 18 21:24:55 2014 +0200 @@ -0,0 +1,369 @@ +/***************************************************************************** + * Copyright (C) 2014 + * + * Michiel Broek <mbroek at mbse dot eu> + * + * This file is part of the mbsePi-apps + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * mbsePi-apps is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with EC-65K; see the file COPYING. If not, write to the Free + * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + *****************************************************************************/ + +#include "../lib/mbselib.h" +#include "mosquitto.h" + +#ifdef HAVE_WIRINGPI_H + + +#define STATUS_CONNECTING 0 +#define STATUS_CONNACK_RECVD 1 +#define STATUS_WAITING 2 + +/* Global variables for use in callbacks. */ +struct mosquitto *mymosq = NULL; +char *myhostname; +static int qos = 0; +static int status = STATUS_CONNECTING; +static int mid_sent = 0; +static int last_mid = -1; +static int last_mid_sent = -1; +static bool connected = true; +static bool disconnect_sent = false; +static bool connect_lost = false; + +extern bool my_shutdown; +extern bool debug; +extern sys_config Config; +extern int lcdHandle; +extern int lcdupdate; + + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result) +{ + if (connect_lost) { + connect_lost = false; + syslog(LOG_NOTICE, "Reconnect: %s", mosquitto_connack_string(result)); + } + + if (!result) { + status = STATUS_CONNACK_RECVD; + } else { + syslog(LOG_NOTICE, "my_connect_callback: %s\n", mosquitto_connack_string(result)); + } +} + + + +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + if (my_shutdown) { + syslog(LOG_NOTICE, "Acknowledged DISCONNECT from %s", Config.mosq_host); + connected = false; + } else { + /* + * The remove server was brought down. We must keep running + */ + syslog(LOG_NOTICE, "Received DISCONNECT from %s, connection lost", Config.mosq_host); + connect_lost = true; + } +} + + + +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) +{ + last_mid_sent = mid; +} + + + +void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) +{ + syslog(LOG_NOTICE, "MQTT: %s", str); + printf("MQTT: %s\n", str); +} + + + +int my_mosquitto_init(void) +{ + char *id = NULL, *state = NULL; + char buf[1024]; + int try, rc, keepalive = 60; + unsigned int max_inflight = 20; + char err[1024]; + w1_therm *tmp1, *old1; + rc_switch *tmp2, *old2; + char *alias; + + /* + * Initialize mosquitto communication + */ + mosquitto_lib_init(); + + gethostname(buf, 255); + myhostname = xstrcpy(buf); + + /* + * Build MQTT id + */ + id = xstrcpy((char *)"thermferm/"); + id = xstrcat(id, myhostname); + if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { + /* + * Enforce maximum client id length of 23 characters + */ + id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; + } + + mymosq = mosquitto_new(id, true, NULL); + if (!mymosq) { + switch(errno) { + case ENOMEM: + syslog(LOG_NOTICE, "mosquitto_new: Out of memory"); + break; + case EINVAL: + syslog(LOG_NOTICE, "mosquitto_new: Invalid id"); + break; + } + mosquitto_lib_cleanup(); + return 1; + } + + if (debug) { + mosquitto_log_callback_set(mymosq, my_log_callback); + } + + /* + * Set our will + */ + state = xstrcpy((char *)"clients/"); + state = xstrcat(state, myhostname); + state = xstrcat(state, (char *)"/thermferm/state"); + sprintf(buf, "0"); + if ((rc = mosquitto_will_set(mymosq, state, strlen(buf), buf, qos, true))) { + if (rc == MOSQ_ERR_INVAL) { + syslog(LOG_NOTICE, "mosquitto_will_set: input parameters invalid"); + } else if (rc == MOSQ_ERR_NOMEM) { + syslog(LOG_NOTICE, "mosquitto_will_set: Out of Memory"); + } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) { + syslog(LOG_NOTICE, "mosquitto_will_set: invalid payload size"); + } + mosquitto_lib_cleanup(); + return rc; + } + + mosquitto_max_inflight_messages_set(mymosq, max_inflight); + mosquitto_connect_callback_set(mymosq, my_connect_callback); + mosquitto_disconnect_callback_set(mymosq, my_disconnect_callback); + mosquitto_publish_callback_set(mymosq, my_publish_callback); + + try = 10; rc = -1; + while (try && rc) { + if ((rc = mosquitto_connect(mymosq, Config.mosq_host, Config.mosq_port, keepalive))) { + if (rc == MOSQ_ERR_ERRNO) { + strerror_r(errno, err, 1024); + syslog(LOG_NOTICE, "mosquitto_connect: error: %s, try=%d", err, 11-try); + } else { + syslog(LOG_NOTICE, "mosquitto_connect: unable to connect (%d)", rc); + } + sleep(2); + try--; + } + } + if (rc) { + syslog(LOG_NOTICE, "mosquitto_connect: too many tries, giving up"); + mosquitto_lib_cleanup(); + return rc; + } + syslog(LOG_NOTICE, "Connected with %s:%d", Config.mosq_host, Config.mosq_port); + + /* + * Initialise is complete, report our presence state + */ + mosquitto_loop_start(mymosq); + sprintf(buf, "1"); + rc = mosquitto_publish(mymosq, &mid_sent, state, strlen(buf), buf, qos, 1); + + /* + * Report alias names + */ + for (tmp1 = Config.w1therms; tmp1; tmp1 = old1) { + old1 = tmp1->next; + + alias = xstrcpy((char *)"/raw/"); + alias = xstrcat(alias, myhostname); + alias = xstrcat(alias, (char *)"/thermferm/w1/"); + alias = xstrcat(alias, tmp1->master); + alias = xstrcat(alias, (char *)"/"); + alias = xstrcat(alias, tmp1->name); + alias = xstrcat(alias, (char *)"/alias"); + + sprintf(buf, "%s", tmp1->alias); + if ((rc = mosquitto_publish(mymosq, &mid_sent, alias, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "mainloop: error %d from mosquitto_publish", rc); + } + + free(alias); + alias = NULL; + } + + for (tmp2 = Config.rcswitch; tmp2; tmp2 = old2) { + old2 = tmp2->next; + + alias = xstrcpy((char *)"/raw/"); + alias = xstrcat(alias, myhostname); + alias = xstrcat(alias, (char *)"/thermferm/rcswitch/"); + alias = xstrcat(alias, tmp2->address); + alias = xstrcat(alias, (char *)"/alias"); + + sprintf(buf, "%s", tmp2->alias); + if ((rc = mosquitto_publish(mymosq, &mid_sent, alias, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "my_mosquitto_init: error %d from mosquitto_publish", rc); + } + + free(alias); + alias = NULL; + my_mosquitto_switch(tmp2->address, 0); + } + + return 0; +} + + + +int my_mosquitto_switch(char *address, int state) +{ + char *cmd = NULL, buf[10]; + int rc; + + cmd = xstrcpy(address); + if (state) + cmd = xstrcat(cmd, (char *)",1"); + else + cmd = xstrcat(cmd, (char *)",0"); + rc = toggleSwitch(cmd); + if (debug) + fprintf(stdout, "Switch %s rc=%d\n", cmd, rc); + syslog(LOG_NOTICE, "Switch %s rc=%d", cmd, rc); + free(cmd); + + cmd = xstrcpy((char *)"/raw/"); + cmd = xstrcat(cmd, myhostname); + cmd = xstrcat(cmd, (char *)"/thermferm/rcswitch/"); + cmd = xstrcat(cmd, address); + cmd = xstrcat(cmd, (char *)"/state"); + sprintf(buf, "%d", state); + + if ((rc = mosquitto_publish(mymosq, &mid_sent, cmd, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "my_mosquitto_switch: error %d from mosquitto_publish", rc); + } + + free(cmd); + cmd = NULL; + + return rc; +} + + + +int my_mosquitto_loop(void) +{ + w1_therm *tmp1, *old1; + char buf[1024], *alias, *state = NULL; + int rc; + + if (status == STATUS_CONNACK_RECVD) { + /* + * Here send our 1-wire sensors values + */ + for (tmp1 = Config.w1therms; tmp1; tmp1 = old1) { + old1 = tmp1->next; + + if (tmp1->update) { + /* + * Build path and alias topic + */ + alias = xstrcpy((char *)"/raw/"); + alias = xstrcat(alias, myhostname); + alias = xstrcat(alias, (char *)"/thermferm/w1/"); + alias = xstrcat(alias, tmp1->master); + alias = xstrcat(alias, (char *)"/"); + alias = xstrcat(alias, tmp1->name); + alias = xstrcat(alias, (char *)"/temperature"); + + /* + * Publish the temperature. + */ + sprintf(buf, "%.1f", tmp1->lastval / 1000.0); + if ((rc = mosquitto_publish(mymosq, &mid_sent, alias, strlen(buf), buf, qos, 1))) { + if (rc == MOSQ_ERR_NO_CONN) + mosquitto_reconnect(mymosq); + else + syslog(LOG_NOTICE, "mainloop: error %d from mosquitto_publish", rc); + } + tmp1->update = FALSE; + lcdupdate = TRUE; + + free(alias); + alias = NULL; + } + } + + if (my_shutdown) { + /* + * Final publish 0 to clients/<hostname>/thermferm/state + */ + sprintf(buf, "0"); + mosquitto_publish(mymosq, &mid_sent, state, strlen(buf), buf, qos, true); + last_mid = mid_sent; + status = STATUS_WAITING; + mb_lcdClear(lcdHandle); +// lcdPosition(lcdHandle, 0, 0); + mb_lcdPuts(lcdHandle, "Shuting down ..."); + } + + } else if (status == STATUS_WAITING) { + if (debug) + fprintf(stdout, (char *)"Waiting\n"); + if (last_mid_sent == last_mid && disconnect_sent == false) { + mosquitto_disconnect(mymosq); + disconnect_sent = true; + } + } + rc = MOSQ_ERR_SUCCESS; + + return (rc == MOSQ_ERR_SUCCESS && connected); +} + + + +void my_mosquitto_exit(void) +{ + mosquitto_loop_stop(mymosq, false); + mosquitto_destroy(mymosq); + mosquitto_lib_cleanup(); +} + + +#endif